Skip to main content

zeroclaw_runtime/cron/
scheduler.rs

1use crate::cron::store::{RunCompletionAction, persist_run_completion_state, persist_run_result};
2use crate::cron::{
3    CronJob, DeliveryConfig, JobType, Schedule, SessionTarget, all_overdue_jobs, due_jobs,
4    next_run_for_schedule, sync_declarative_jobs,
5};
6use crate::security::SecurityPolicy;
7use anyhow::Result;
8use chrono::{DateTime, Utc};
9use futures_util::{StreamExt, stream};
10use std::process::Stdio;
11use std::sync::Arc;
12use tokio::process::Command;
13use tokio::time::{self, Duration};
14use zeroclaw_config::schema::Config;
15use zeroclaw_config::schema::{CronJobDecl, CronScheduleDecl};
16use zeroclaw_log::Instrument;
17use zeroclaw_memory::{MEMORY_CONTEXT_CLOSE, MEMORY_CONTEXT_OPEN};
18
19const MIN_POLL_SECONDS: u64 = 5;
20const SHELL_JOB_TIMEOUT_SECS: u64 = 120;
21const SCHEDULER_COMPONENT: &str = "scheduler";
22
23/// Type alias for the optional broadcast sender used to push cron results
24/// to connected dashboard/SSE clients.
25pub type EventBroadcast = Option<tokio::sync::broadcast::Sender<serde_json::Value>>;
26
27#[derive(Clone, Copy)]
28pub enum CronDeliveryContext {
29    Scheduled,
30    ToolManual,
31    GatewayManual,
32}
33
34impl CronDeliveryContext {
35    fn failure_message(self, best_effort: bool) -> &'static str {
36        match (self, best_effort) {
37            (Self::Scheduled, true) => "Cron delivery failed (best_effort)",
38            (Self::Scheduled, false) => "Cron delivery failed",
39            (Self::ToolManual, true) => "cron_run delivery failed (best_effort)",
40            (Self::ToolManual, false) => "cron_run delivery failed",
41            (Self::GatewayManual, true) => "manual cron trigger delivery failed (best_effort)",
42            (Self::GatewayManual, false) => "manual cron trigger delivery failed",
43        }
44    }
45}
46
47pub struct CronDeliveryOutcome {
48    pub success: bool,
49    pub status: String,
50    pub output: String,
51}
52
53pub async fn deliver_and_classify_run_result(
54    config: &Config,
55    job: &CronJob,
56    mut success: bool,
57    mut output: String,
58    context: CronDeliveryContext,
59) -> CronDeliveryOutcome {
60    let mut status = if success { "ok" } else { "error" }.to_string();
61
62    if let Err(e) = deliver_if_configured(config, job, &output).await {
63        // Cron add-time accepts dangling delivery refs (the job's channel
64        // may not be provisioned yet); the loudly-logged warn here is
65        // the scheduler-side half of that contract. Manual trigger paths
66        // share this classifier so status history cannot drift again.
67        let channel = job.delivery.channel.as_deref().unwrap_or("");
68        let target = job.delivery.to.as_deref().unwrap_or("");
69        let delivery_error = e.to_string();
70
71        if job.delivery.best_effort {
72            ::zeroclaw_log::record!(
73                WARN,
74                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
75                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
76                    .with_attrs(::serde_json::json!({
77                        "job_id": job.id,
78                        "agent_alias": job.agent_alias,
79                        "channel": channel,
80                        "target": target,
81                        "error": delivery_error
82                    })),
83                context.failure_message(true)
84            );
85            if success {
86                status = "degraded".to_string();
87            }
88        } else {
89            success = false;
90            ::zeroclaw_log::record!(
91                WARN,
92                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
93                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
94                    .with_attrs(::serde_json::json!({
95                        "job_id": job.id,
96                        "agent_alias": job.agent_alias,
97                        "channel": channel,
98                        "target": target,
99                        "error": delivery_error
100                    })),
101                context.failure_message(false)
102            );
103            status = "error".to_string();
104        }
105
106        if output.trim().is_empty() {
107            output = format!("delivery failed: {delivery_error}");
108        } else {
109            output.push_str("\n\ndelivery failed: ");
110            output.push_str(&delivery_error);
111        }
112    }
113
114    CronDeliveryOutcome {
115        success,
116        status,
117        output,
118    }
119}
120
121pub async fn run(config: Config, event_tx: EventBroadcast) -> Result<()> {
122    let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS);
123    let mut interval = time::interval(Duration::from_secs(poll_secs));
124    interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
125
126    crate::health::mark_component_ok(SCHEDULER_COMPONENT);
127
128    // ── Declarative job sync: reconcile config-defined jobs with the DB.
129    let mut jobs_with_builtin = config.cron.clone();
130    if let Some(ref schedule_cron) = config.backup.schedule_cron {
131        let backup_job = CronJobDecl {
132            name: Some("Scheduled backup".to_string()),
133            job_type: "shell".to_string(),
134            schedule: CronScheduleDecl::Cron {
135                expr: schedule_cron.clone(),
136                tz: config.backup.schedule_timezone.clone(),
137            },
138            command: Some("backup create".to_string()),
139            prompt: None,
140            enabled: true,
141            model: None,
142            allowed_tools: None,
143            uses_memory: true,
144            session_target: None,
145            delivery: None,
146        };
147        ::zeroclaw_log::record!(
148            DEBUG,
149            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
150                .with_attrs(::serde_json::json!({"schedule": schedule_cron})),
151            "Synthesizing builtin backup cron job from config.backup.schedule_cron"
152        );
153        jobs_with_builtin.insert("__builtin_backup".to_string(), backup_job);
154    }
155
156    match sync_declarative_jobs(&config, &jobs_with_builtin) {
157        Ok(()) => {
158            if !jobs_with_builtin.is_empty() {
159                ::zeroclaw_log::record!(
160                    INFO,
161                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
162                        .with_attrs(::serde_json::json!({"count": jobs_with_builtin.len()})),
163                    "Synced declarative cron jobs from config"
164                );
165            }
166        }
167        Err(e) => ::zeroclaw_log::record!(
168            WARN,
169            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
170                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
171                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
172            "Failed to sync declarative cron jobs"
173        ),
174    }
175
176    // ── Startup catch-up: run ALL overdue jobs before entering the
177    //    normal polling loop. The regular loop is capped by `max_tasks`,
178    //    which could leave some overdue jobs waiting across many cycles
179    //    if the machine was off for a while. The catch-up phase fetches
180    //    without the `max_tasks` limit so every missed job fires once.
181    //    Controlled by `[scheduler] catch_up_on_startup` (default: true).
182    if config.scheduler.catch_up_on_startup {
183        catch_up_overdue_jobs(&config, &event_tx).await;
184    } else {
185        ::zeroclaw_log::record!(
186            INFO,
187            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
188            "Scheduler startup: catch-up disabled by config"
189        );
190    }
191
192    loop {
193        interval.tick().await;
194        // Keep scheduler liveness fresh even when there are no due jobs.
195        crate::health::mark_component_ok(SCHEDULER_COMPONENT);
196
197        let jobs = match due_jobs(&config, Utc::now()) {
198            Ok(jobs) => jobs,
199            Err(e) => {
200                crate::health::mark_component_error(SCHEDULER_COMPONENT, e.to_string());
201                ::zeroclaw_log::record!(
202                    WARN,
203                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
204                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
205                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
206                    "Scheduler query failed"
207                );
208                continue;
209            }
210        };
211
212        process_due_jobs(&config, jobs, SCHEDULER_COMPONENT, &event_tx).await;
213    }
214}
215
216/// Resolve which agent owns a given cron job. Lookup order:
217///
218/// 1. The row's persisted `agent_alias` field, when it names a
219///    configured agent.
220/// 2. Reverse-resolve via `[agents.<x>].cron_jobs` (declarative path:
221///    every alias that lists the cron alias claims ownership).
222///
223/// Returns `None` when neither resolves. Callers (process_due_jobs,
224/// execute_job_now) log and skip the job rather than crashing the
225/// scheduler loop.
226fn resolve_owning_agent<'a>(config: &'a Config, job: &CronJob) -> Option<&'a str> {
227    if !job.agent_alias.is_empty()
228        && let Some((alias, _)) = config
229            .agents
230            .iter()
231            .find(|(alias, _)| alias.as_str() == job.agent_alias)
232    {
233        return Some(alias.as_str());
234    }
235    config.agent_for_cron_job(&job.id)
236}
237
238/// Fetch **all** overdue jobs (ignoring `max_tasks`) and execute them.
239///
240/// Called once at scheduler startup so that jobs missed during downtime
241/// (e.g. late boot, daemon restart) are caught up immediately.
242async fn catch_up_overdue_jobs(config: &Config, event_tx: &EventBroadcast) {
243    let now = Utc::now();
244    let jobs = match all_overdue_jobs(config, now) {
245        Ok(jobs) => jobs,
246        Err(e) => {
247            ::zeroclaw_log::record!(
248                WARN,
249                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
250                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
251                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
252                "Startup catch-up query failed"
253            );
254            return;
255        }
256    };
257
258    if jobs.is_empty() {
259        ::zeroclaw_log::record!(
260            INFO,
261            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
262            "Scheduler startup: no overdue jobs to catch up"
263        );
264        return;
265    }
266
267    ::zeroclaw_log::record!(
268        INFO,
269        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
270            .with_attrs(::serde_json::json!({"count": jobs.len()})),
271        "Scheduler startup: catching up overdue jobs"
272    );
273
274    process_due_jobs(config, jobs, SCHEDULER_COMPONENT, event_tx).await;
275
276    ::zeroclaw_log::record!(
277        INFO,
278        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
279        "Scheduler startup: catch-up complete"
280    );
281}
282
283pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) {
284    use zeroclaw_log::Instrument;
285    let Some(agent_alias) = resolve_owning_agent(config, job) else {
286        return (
287            false,
288            format!(
289                "cron job {id:?} has no owning agent; add the alias to an [agents.<x>].cron_jobs list",
290                id = job.id
291            ),
292        );
293    };
294    let agent_alias = agent_alias.to_string();
295    let security = match SecurityPolicy::for_agent(config, &agent_alias) {
296        Ok(s) => s,
297        Err(e) => return (false, format!("agent {agent_alias} risk profile: {e}")),
298    };
299    let span = zeroclaw_log::attribution_span!(job);
300    Box::pin(execute_job_with_retry(config, &security, &agent_alias, job))
301        .instrument(span)
302        .await
303}
304
305async fn execute_job_with_retry(
306    config: &Config,
307    security: &SecurityPolicy,
308    agent_alias: &str,
309    job: &CronJob,
310) -> (bool, String) {
311    let mut last_output = String::new();
312    let retries = config.reliability.scheduler_retries;
313    let mut backoff_ms = config.reliability.provider_backoff_ms.max(200);
314
315    for attempt in 0..=retries {
316        let (success, output) = match job.job_type {
317            JobType::Shell => run_job_command(config, security, job).await,
318            JobType::Agent => Box::pin(run_agent_job(config, security, agent_alias, job)).await,
319        };
320        last_output = output;
321
322        if success {
323            return (true, last_output);
324        }
325
326        if last_output.starts_with("blocked by security policy:") {
327            // Deterministic policy violations are not retryable.
328            return (false, last_output);
329        }
330
331        if attempt < retries {
332            let jitter_ms = u64::from(Utc::now().timestamp_subsec_millis() % 250);
333            time::sleep(Duration::from_millis(backoff_ms + jitter_ms)).await;
334            backoff_ms = (backoff_ms.saturating_mul(2)).min(30_000);
335        }
336    }
337
338    (false, last_output)
339}
340
341async fn process_due_jobs(
342    config: &Config,
343    jobs: Vec<CronJob>,
344    component: &str,
345    event_tx: &EventBroadcast,
346) {
347    // Refresh scheduler health on every successful poll cycle, including idle cycles.
348    crate::health::mark_component_ok(component);
349
350    let max_concurrent = config.scheduler.max_concurrent.max(1);
351    let mut in_flight = stream::iter(jobs.into_iter().filter_map(|job| {
352        // Resolve owning agent per-job. Skip orphans with a warning so a
353        // mis-configured job can't take down the scheduler loop.
354        let Some(agent_alias) = resolve_owning_agent(config, &job) else {
355            ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"job_id": job.id})), "Cron job has no owning agent; add the alias to an [agents.<x>].cron_jobs list");
356            return None;
357        };
358        let agent_alias = agent_alias.to_owned();
359        let security = match SecurityPolicy::for_agent(config, &agent_alias) {
360            Ok(s) => Arc::new(s),
361            Err(e) => {
362                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"job_id": job.id, "agent": agent_alias, "error": format!("{}", e)})), "Cron job: failed to build SecurityPolicy for owning agent");
363                return None;
364            }
365        };
366        let config = config.clone();
367        let component = component.to_owned();
368        Some(async move {
369            Box::pin(execute_and_persist_job(
370                &config,
371                security.as_ref(),
372                &agent_alias,
373                &job,
374                &component,
375            ))
376            .await
377        })
378    }))
379    .buffer_unordered(max_concurrent);
380
381    while let Some((job_id, success, output)) = in_flight.next().await {
382        if !success {
383            ::zeroclaw_log::record!(
384                WARN,
385                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
386                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
387                    .with_attrs(::serde_json::json!({"job_id": job_id, "output": output})),
388                "Scheduler job '' failed: "
389            );
390        }
391        // Broadcast cron result to dashboard/SSE clients.
392        if let Some(tx) = event_tx {
393            let _ = tx.send(serde_json::json!({
394                "type": "cron_result",
395                "job_id": job_id,
396                "success": success,
397                "output": output,
398                "timestamp": chrono::Utc::now().to_rfc3339(),
399            }));
400        }
401    }
402}
403
404async fn execute_and_persist_job(
405    config: &Config,
406    security: &SecurityPolicy,
407    agent_alias: &str,
408    job: &CronJob,
409    component: &str,
410) -> (String, bool, String) {
411    crate::health::mark_component_ok(component);
412    warn_if_high_frequency_agent_job(job);
413
414    let started_at = Utc::now();
415    let span = zeroclaw_log::attribution_span!(job);
416    let (success, output) = Box::pin(execute_job_with_retry(config, security, agent_alias, job))
417        .instrument(span)
418        .await;
419    let finished_at = Utc::now();
420    let success = Box::pin(persist_job_result(
421        config,
422        job,
423        success,
424        &output,
425        started_at,
426        finished_at,
427    ))
428    .await;
429
430    (job.id.clone(), success, output)
431}
432
433async fn run_agent_job(
434    config: &Config,
435    security: &SecurityPolicy,
436    agent_alias: &str,
437    job: &CronJob,
438) -> (bool, String) {
439    // Cron is one of two SubAgent spawn sites; the other is the
440    // agent-loop `spawn_subagent` tool. Both funnel through
441    // `SubAgentSpawn::for_agent` so permission inheritance, tracing
442    // span shape, and audit attribution stay uniform across spawn
443    // sites.
444    let subagent_ctx = match crate::subagent::SubAgentSpawn::for_agent(config, agent_alias)
445        .and_then(|spawn| spawn.build(crate::subagent::SubAgentOverrides::default()))
446    {
447        Ok(ctx) => ctx,
448        Err(e) => return (false, format!("subagent spawn failed: {e:#}")),
449    };
450
451    if !security.can_act() {
452        return (
453            false,
454            "blocked by security policy: autonomy is read-only".to_string(),
455        );
456    }
457
458    if security.is_rate_limited() {
459        return (
460            false,
461            "blocked by security policy: rate limit exceeded".to_string(),
462        );
463    }
464
465    if !security.record_action() {
466        return (
467            false,
468            "blocked by security policy: action budget exhausted".to_string(),
469        );
470    }
471    let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string());
472    let prompt = job.prompt.clone().unwrap_or_default();
473
474    // Recall relevant memories so cron jobs have context awareness.
475    // Skipped when `job.uses_memory` is false (e.g. stateless digest jobs).
476    // Exclude `Conversation` memories to prevent chat context from
477    // leaking into scheduled executions. Routes through
478    // the cron-owning agent's per-agent memory wrapper so the
479    // recall is scoped to that agent's bound + allowlisted rows.
480    let memory_context = if !job.uses_memory {
481        String::new()
482    } else {
483        match zeroclaw_memory::create_memory_for_agent(
484            config,
485            agent_alias,
486            config
487                .model_provider_for_agent(agent_alias)
488                .and_then(|e| e.api_key.as_deref()),
489        )
490        .await
491        {
492            Ok(mem) => match mem.recall(&prompt, 5, None, None, None).await {
493                Ok(entries) if !entries.is_empty() => {
494                    let ctx: String = entries
495                        .iter()
496                        .filter(|e| {
497                            !matches!(
498                                e.category,
499                                zeroclaw_memory::traits::MemoryCategory::Conversation
500                            )
501                        })
502                        .map(|e| format!("- {}: {}", e.key, e.content))
503                        .collect::<Vec<_>>()
504                        .join("\n");
505                    if ctx.is_empty() {
506                        String::new()
507                    } else {
508                        format!("{MEMORY_CONTEXT_OPEN}\n{ctx}\n{MEMORY_CONTEXT_CLOSE}\n\n")
509                    }
510                }
511                _ => String::new(),
512            },
513            Err(_) => String::new(),
514        }
515    };
516
517    let prefixed_prompt = format!("{memory_context}[cron:{} {name}] {prompt}", job.id);
518    let model_override = job.model.clone();
519
520    let mut cron_config = config.clone();
521    cron_config.memory.auto_save = false;
522
523    // Assign a unique session ID so memories written during this run can be
524    // purged atomically if the run fails (prevents snowball accumulation).
525    // Doubles as the SubAgent run_id in the tracing span so a failed
526    // memory purge can be correlated with its sub-run.
527    let run_session_id = uuid::Uuid::new_v4().to_string();
528    let session_path = std::path::PathBuf::from(format!("cron-{run_session_id}"));
529
530    let subagent_span = zeroclaw_log::info_span!(
531        "subagent",
532        category = "cron",
533        agent_alias = %agent_alias,
534        cron_job_id = %job.id,
535        run_id = %run_session_id,
536        spawn_site = "cron",
537    );
538
539    // Pass the validated SubAgent context as run-time overrides so the
540    // policy that came back from `SubAgentSpawn::build` reaches the
541    // agent loop. Without this the loop reconstructs from config and
542    // any future caller-supplied narrowing override would silently
543    // collapse back to the parent's verbatim policy.
544    //
545    // `is_subagent: false` is explicit (not `..Default::default()`) so
546    // a future refactor that flips the default can't quietly promote
547    // every cron-launched agent to a depth-1 subagent — they're
548    // top-level runs by design, despite riding through SubAgentSpawn.
549    let run_overrides = crate::agent::loop_::AgentRunOverrides {
550        security: Some(subagent_ctx.policy.clone()),
551        memory: None,
552        is_subagent: false,
553    };
554    let run_result = match job.session_target {
555        SessionTarget::Main | SessionTarget::Isolated => {
556            Box::pin(
557                crate::agent::run(
558                    cron_config,
559                    agent_alias,
560                    Some(prefixed_prompt),
561                    None,
562                    model_override,
563                    config
564                        .model_provider_for_agent(agent_alias)
565                        .and_then(|e| e.temperature),
566                    vec![],
567                    false,
568                    Some(session_path.clone()),
569                    job.allowed_tools.clone(),
570                    run_overrides,
571                )
572                .instrument(subagent_span),
573            )
574            .await
575        }
576    };
577
578    match run_result {
579        Ok(response) => (
580            true,
581            if response.trim().is_empty() {
582                "agent job executed".to_string()
583            } else {
584                response
585            },
586        ),
587        Err(e) => {
588            // Purge memories written during this failed run so they don't
589            // pollute future recall and cause context snowball. Routes
590            // through the cron-owning agent's per-agent memory wrapper
591            // so the purge stays scoped to the agent that wrote them.
592            // Sanitize the session key so it matches what the runtime
593            // writes via the orchestrator session-key sanitizer.
594            let mem_session_key = zeroclaw_api::session_keys::sanitize_session_key(&format!(
595                "cli:{}",
596                session_path.display()
597            ));
598            if let Ok(mem) = zeroclaw_memory::create_memory_for_agent(
599                config,
600                agent_alias,
601                config
602                    .model_provider_for_agent(agent_alias)
603                    .and_then(|e| e.api_key.as_deref()),
604            )
605            .await
606            {
607                let _ = mem.purge_session(&mem_session_key).await;
608            }
609            (false, format!("agent job failed: {e}"))
610        }
611    }
612}
613
614async fn persist_job_result(
615    config: &Config,
616    job: &CronJob,
617    success: bool,
618    output: &str,
619    started_at: DateTime<Utc>,
620    finished_at: DateTime<Utc>,
621) -> bool {
622    let duration_ms = (finished_at - started_at).num_milliseconds();
623    let outcome = deliver_and_classify_run_result(
624        config,
625        job,
626        success,
627        output.to_string(),
628        CronDeliveryContext::Scheduled,
629    )
630    .await;
631
632    let action = if is_one_shot_auto_delete(job) && outcome.success {
633        RunCompletionAction::Delete
634    } else if matches!(job.schedule, Schedule::At { .. }) {
635        RunCompletionAction::Disable
636    } else {
637        RunCompletionAction::Reschedule
638    };
639
640    let job_state_at = Utc::now();
641    if let Err(e) = persist_run_result(
642        config,
643        job,
644        started_at,
645        finished_at,
646        job_state_at,
647        &outcome.status,
648        Some(&outcome.output),
649        duration_ms,
650        action,
651    ) {
652        ::zeroclaw_log::record!(
653            WARN,
654            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
655                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
656                .with_attrs(::serde_json::json!({"e": e.to_string()})),
657            "Failed to persist scheduler run result: "
658        );
659
660        if action == RunCompletionAction::Delete {
661            // Best-effort fallback for the legacy behavior: a successful
662            // auto-delete one-shot should not be picked up again if the
663            // combined history+state transaction fails while inserting or
664            // pruning the run row.
665            if let Err(disable_err) = persist_run_completion_state(
666                config,
667                job,
668                job_state_at,
669                &outcome.status,
670                Some(&outcome.output),
671                RunCompletionAction::Disable,
672            ) {
673                ::zeroclaw_log::record!(
674                    WARN,
675                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
676                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
677                        .with_attrs(::serde_json::json!({"disable_err": disable_err.to_string()})),
678                    "Failed to disable one-shot cron job after history persistence failure: "
679                );
680            }
681        } else {
682            // For recurring jobs and non-delete one-shots, keep the scheduler
683            // moving even if run-history persistence fails.
684            if let Err(state_err) = persist_run_completion_state(
685                config,
686                job,
687                job_state_at,
688                &outcome.status,
689                Some(&outcome.output),
690                action,
691            ) {
692                ::zeroclaw_log::record!(
693                    WARN,
694                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
695                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
696                        .with_attrs(::serde_json::json!({"state_err": state_err.to_string()})),
697                    "Failed to update cron job state after history persistence failure: "
698                );
699            }
700        }
701    }
702
703    outcome.success
704}
705
706fn is_one_shot_auto_delete(job: &CronJob) -> bool {
707    job.delete_after_run && matches!(job.schedule, Schedule::At { .. })
708}
709
710fn is_high_frequency_agent_job(job: &CronJob) -> bool {
711    if !matches!(job.job_type, JobType::Agent) {
712        return false;
713    }
714    match &job.schedule {
715        Schedule::Every { every_ms } => *every_ms < 5 * 60 * 1000,
716        Schedule::Cron { .. } => {
717            let now = Utc::now();
718            next_run_for_schedule(&job.schedule, now)
719                .and_then(|a| next_run_for_schedule(&job.schedule, a).map(|b| (a, b)))
720                .map(|(a, b)| (b - a).num_minutes() < 5)
721                .unwrap_or(false)
722        }
723        Schedule::At { .. } => false,
724    }
725}
726
727fn warn_if_high_frequency_agent_job(job: &CronJob) {
728    if is_high_frequency_agent_job(job) {
729        ::zeroclaw_log::record!(
730            WARN,
731            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
732                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
733            &format!(
734                "Cron agent job '{}' is scheduled more frequently than every 5 minutes",
735                job.id
736            )
737        );
738    }
739}
740
741async fn deliver_if_configured(config: &Config, job: &CronJob, output: &str) -> Result<()> {
742    let delivery: &DeliveryConfig = &job.delivery;
743    if !delivery.mode.eq_ignore_ascii_case("announce") {
744        return Ok(());
745    }
746
747    let channel = delivery.channel.as_deref().ok_or_else(|| {
748        ::zeroclaw_log::record!(
749            WARN,
750            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
751                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
752                .with_attrs(::serde_json::json!({"field": "channel"})),
753            "cron delivery announce refused: required field missing"
754        );
755        anyhow::Error::msg("delivery.channel is required for announce mode")
756    })?;
757    let target = delivery.to.as_deref().ok_or_else(|| {
758        ::zeroclaw_log::record!(
759            WARN,
760            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
761                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
762                .with_attrs(::serde_json::json!({"field": "to"})),
763            "cron delivery announce refused: required field missing"
764        );
765        anyhow::Error::msg("delivery.to is required for announce mode")
766    })?;
767
768    deliver_announcement(
769        config,
770        channel,
771        target,
772        delivery.thread_id.as_deref(),
773        output,
774    )
775    .await
776}
777
778/// Delivery function type — takes owned values so the returned future is 'static.
779/// The fourth `Option<String>` is the optional thread/conversation id propagated
780/// to channels whose outbound `thread_id` is distinct from the recipient (webhook).
781pub type DeliveryFn = Box<
782    dyn Fn(
783            Config,
784            String,
785            String,
786            Option<String>,
787            String,
788        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
789        + Send
790        + Sync,
791>;
792
793/// Global delivery function, injected by the binary crate at startup.
794static DELIVERY_FN: std::sync::OnceLock<DeliveryFn> = std::sync::OnceLock::new();
795
796/// Register the channel delivery function. Called once at startup by the binary.
797pub fn register_delivery_fn(f: DeliveryFn) {
798    let _ = DELIVERY_FN.set(f);
799}
800
801pub async fn deliver_announcement(
802    config: &Config,
803    channel: &str,
804    target: &str,
805    thread_id: Option<&str>,
806    output: &str,
807) -> Result<()> {
808    if let Some(f) = DELIVERY_FN.get() {
809        f(
810            config.clone(),
811            channel.to_string(),
812            target.to_string(),
813            thread_id.map(str::to_string),
814            output.to_string(),
815        )
816        .await
817    } else {
818        // No handler registered: this is a runtime-level state (the binary
819        // hasn't called `register_delivery_fn`), not a per-job failure.
820        // Returning `Err` here would force every announce-mode job to set
821        // `best_effort=true` just to survive a system that legitimately has
822        // no delivery wired (e.g. headless test runs, gateway-only deployments
823        // where channel orchestration lives elsewhere).
824        //
825        // We log loudly via `tracing::warn` so operators see the dropped
826        // delivery in their logs, then return `Ok(())` so `persist_job_result`
827        // records the job execution itself as successful. Operators that
828        // actively rely on delivery wire a handler at startup; absence is a
829        // configuration signal, not a delivery error.
830        ::zeroclaw_log::record!(
831            WARN,
832            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
833                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
834                .with_attrs(::serde_json::json!({"channel": channel, "target": target})),
835            "Cron delivery skipped: no delivery handler registered \
836             (register_delivery_fn was not called by the binary)"
837        );
838        Ok(())
839    }
840}
841
842async fn run_job_command(
843    config: &Config,
844    security: &SecurityPolicy,
845    job: &CronJob,
846) -> (bool, String) {
847    run_job_command_with_timeout(
848        config,
849        security,
850        job,
851        Duration::from_secs(SHELL_JOB_TIMEOUT_SECS),
852    )
853    .await
854}
855
856async fn run_job_command_with_timeout(
857    config: &Config,
858    security: &SecurityPolicy,
859    job: &CronJob,
860    timeout: Duration,
861) -> (bool, String) {
862    if !security.can_act() {
863        return (
864            false,
865            "blocked by security policy: autonomy is read-only".to_string(),
866        );
867    }
868
869    if security.is_rate_limited() {
870        return (
871            false,
872            "blocked by security policy: rate limit exceeded".to_string(),
873        );
874    }
875
876    // Unified command validation: allowlist + risk + path checks in one call.
877    // Jobs created via the validated helpers were already checked at creation
878    // time, but we re-validate at execution time to catch policy changes and
879    // manually-edited job stores.
880    let approved = false; // scheduler runs are never pre-approved
881    if let Err(error) =
882        crate::cron::validate_shell_command_with_security(security, &job.command, approved)
883    {
884        return (false, error.to_string());
885    }
886
887    if let Some(path) = security.forbidden_path_argument(&job.command) {
888        return (
889            false,
890            format!("blocked by security policy: forbidden path argument: {path}"),
891        );
892    }
893
894    if !security.record_action() {
895        return (
896            false,
897            "blocked by security policy: action budget exhausted".to_string(),
898        );
899    }
900
901    let child = match build_cron_shell_command(&job.command, &config.data_dir) {
902        Ok(mut cmd) => match cmd.spawn() {
903            Ok(child) => child,
904            Err(e) => return (false, format!("spawn error: {e}")),
905        },
906        Err(e) => return (false, format!("shell setup error: {e}")),
907    };
908
909    match time::timeout(timeout, child.wait_with_output()).await {
910        Ok(Ok(output)) => {
911            let stdout = String::from_utf8_lossy(&output.stdout);
912            let stderr = String::from_utf8_lossy(&output.stderr);
913            let combined = format!(
914                "status={}\nstdout:\n{}\nstderr:\n{}",
915                output.status,
916                stdout.trim(),
917                stderr.trim()
918            );
919            (output.status.success(), combined)
920        }
921        Ok(Err(e)) => (false, format!("spawn error: {e}")),
922        Err(_) => (
923            false,
924            format!("job timed out after {}s", timeout.as_secs_f64()),
925        ),
926    }
927}
928
929/// Build a shell `Command` for cron job execution.
930///
931/// Uses `sh -c <command>` (non-login shell). On Windows, ZeroClaw users
932/// typically have Git Bash installed which provides `sh` in PATH, and
933/// cron commands are written with Unix shell syntax. The previous `-lc`
934/// (login shell) flag was dropped: login shells load the full user
935/// profile on every invocation which is slow and may cause side effects.
936///
937/// The command is configured with:
938/// - `current_dir` set to the workspace
939/// - `stdin` piped to `/dev/null` (no interactive input)
940/// - `stdout` and `stderr` piped for capture
941/// - `kill_on_drop(true)` for safe timeout handling
942fn build_cron_shell_command(
943    command: &str,
944    workspace_dir: &std::path::Path,
945) -> anyhow::Result<Command> {
946    let mut cmd = Command::new("sh");
947    cmd.arg("-c")
948        .arg(command)
949        .current_dir(workspace_dir)
950        .stdin(Stdio::null())
951        .stdout(Stdio::piped())
952        .stderr(Stdio::piped())
953        .kill_on_drop(true);
954
955    Ok(cmd)
956}
957
958#[cfg(test)]
959mod tests {
960    use super::*;
961    use crate::cron::{self, DeliveryConfig};
962    use crate::security::SecurityPolicy;
963    use chrono::{Duration as ChronoDuration, Utc};
964    use tempfile::TempDir;
965    use zeroclaw_config::schema::Config;
966
967    const TEST_AGENT: &str = "test-agent";
968
969    async fn test_config(tmp: &TempDir) -> Config {
970        let mut config = Config {
971            data_dir: tmp.path().join("data"),
972            config_path: tmp.path().join("config.toml"),
973            ..Config::default()
974        };
975        config.risk_profiles.insert(
976            TEST_AGENT.to_string(),
977            zeroclaw_config::schema::RiskProfileConfig::default(),
978        );
979        config.runtime_profiles.insert(
980            TEST_AGENT.to_string(),
981            zeroclaw_config::schema::RuntimeProfileConfig::default(),
982        );
983        config.providers.models.openrouter.insert(
984            TEST_AGENT.to_string(),
985            zeroclaw_config::schema::OpenRouterModelProviderConfig::default(),
986        );
987        config.agents.insert(
988            TEST_AGENT.to_string(),
989            zeroclaw_config::schema::AliasedAgentConfig {
990                model_provider: format!("openrouter.{TEST_AGENT}").into(),
991                risk_profile: TEST_AGENT.to_string(),
992                runtime_profile: TEST_AGENT.to_string(),
993                ..Default::default()
994            },
995        );
996        tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
997        config
998    }
999
1000    fn test_security(config: &Config) -> SecurityPolicy {
1001        SecurityPolicy::for_agent(config, TEST_AGENT).expect("test-agent has resolvable profiles")
1002    }
1003
1004    fn test_job(command: &str) -> CronJob {
1005        CronJob {
1006            id: "test-job".into(),
1007            expression: "* * * * *".into(),
1008            schedule: crate::cron::Schedule::Cron {
1009                expr: "* * * * *".into(),
1010                tz: None,
1011            },
1012            command: command.into(),
1013            prompt: None,
1014            name: None,
1015            job_type: JobType::Shell,
1016            session_target: SessionTarget::Isolated,
1017            model: None,
1018            agent_alias: TEST_AGENT.into(),
1019            enabled: true,
1020            delivery: DeliveryConfig::default(),
1021            delete_after_run: false,
1022            allowed_tools: None,
1023            uses_memory: true,
1024            source: "imperative".into(),
1025            created_at: Utc::now(),
1026            next_run: Utc::now(),
1027            last_run: None,
1028            last_status: None,
1029            last_output: None,
1030        }
1031    }
1032
1033    fn unique_component(prefix: &str) -> String {
1034        format!("{prefix}-{}", uuid::Uuid::new_v4())
1035    }
1036
1037    fn agent_job_with_schedule(schedule: crate::cron::Schedule) -> CronJob {
1038        CronJob {
1039            job_type: JobType::Agent,
1040            schedule,
1041            ..test_job("echo test")
1042        }
1043    }
1044
1045    #[test]
1046    fn high_frequency_daily_cron_is_not_flagged() {
1047        // `0 6 * * *` fires once per day — must never warn regardless of when the check runs
1048        let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1049            expr: "0 6 * * *".into(),
1050            tz: Some("America/Chicago".into()),
1051        });
1052        assert!(!is_high_frequency_agent_job(&job));
1053    }
1054
1055    #[test]
1056    fn high_frequency_every_4min_cron_is_flagged() {
1057        let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1058            expr: "*/4 * * * *".into(),
1059            tz: None,
1060        });
1061        assert!(is_high_frequency_agent_job(&job));
1062    }
1063
1064    #[test]
1065    fn high_frequency_every_5min_cron_is_not_flagged() {
1066        // Exactly 5 minutes is acceptable (threshold is strictly less than 5)
1067        let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1068            expr: "*/5 * * * *".into(),
1069            tz: None,
1070        });
1071        assert!(!is_high_frequency_agent_job(&job));
1072    }
1073
1074    #[test]
1075    fn high_frequency_every_interval_below_threshold_is_flagged() {
1076        let job = agent_job_with_schedule(crate::cron::Schedule::Every {
1077            every_ms: 4 * 60 * 1000, // 4 minutes
1078        });
1079        assert!(is_high_frequency_agent_job(&job));
1080    }
1081
1082    #[test]
1083    fn high_frequency_every_interval_at_threshold_is_not_flagged() {
1084        let job = agent_job_with_schedule(crate::cron::Schedule::Every {
1085            every_ms: 5 * 60 * 1000, // exactly 5 minutes
1086        });
1087        assert!(!is_high_frequency_agent_job(&job));
1088    }
1089
1090    #[test]
1091    fn high_frequency_shell_job_is_never_flagged() {
1092        // Shell jobs are exempt regardless of frequency
1093        let job = CronJob {
1094            job_type: JobType::Shell,
1095            schedule: crate::cron::Schedule::Every {
1096                every_ms: 60 * 1000, // 1 minute
1097            },
1098            ..test_job("echo test")
1099        };
1100        assert!(!is_high_frequency_agent_job(&job));
1101    }
1102
1103    #[tokio::test]
1104    async fn run_job_command_success() {
1105        let tmp = TempDir::new().unwrap();
1106        let config = test_config(&tmp).await;
1107        let job = test_job("echo scheduler-ok");
1108        let security = test_security(&config);
1109
1110        let (success, output) = run_job_command(&config, &security, &job).await;
1111        assert!(success);
1112        assert!(output.contains("scheduler-ok"));
1113        assert!(output.contains("status=exit status: 0"));
1114    }
1115
1116    #[tokio::test]
1117    async fn run_job_command_failure() {
1118        let tmp = TempDir::new().unwrap();
1119        let config = test_config(&tmp).await;
1120        let job = test_job("ls definitely_missing_file_for_scheduler_test");
1121        let security = test_security(&config);
1122
1123        let (success, output) = run_job_command(&config, &security, &job).await;
1124        assert!(!success);
1125        assert!(output.contains("definitely_missing_file_for_scheduler_test"));
1126        assert!(output.contains("status=exit status:"));
1127    }
1128
1129    #[tokio::test]
1130    async fn run_job_command_times_out() {
1131        let tmp = TempDir::new().unwrap();
1132        let mut config = test_config(&tmp).await;
1133        config
1134            .risk_profiles
1135            .entry(TEST_AGENT.into())
1136            .or_default()
1137            .allowed_commands = vec!["sleep".into()];
1138        let job = test_job("sleep 1");
1139        let security = test_security(&config);
1140
1141        let (success, output) =
1142            run_job_command_with_timeout(&config, &security, &job, Duration::from_millis(50)).await;
1143        assert!(!success);
1144        assert!(output.contains("job timed out after"));
1145    }
1146
1147    #[tokio::test]
1148    async fn run_job_command_blocks_disallowed_command() {
1149        let tmp = TempDir::new().unwrap();
1150        let mut config = test_config(&tmp).await;
1151        config
1152            .risk_profiles
1153            .entry(TEST_AGENT.into())
1154            .or_default()
1155            .allowed_commands = vec!["echo".into()];
1156        let job = test_job("curl https://evil.example");
1157        let security = test_security(&config);
1158
1159        let (success, output) = run_job_command(&config, &security, &job).await;
1160        assert!(!success);
1161        assert!(output.contains("blocked by security policy"));
1162        assert!(output.to_lowercase().contains("not allowed"));
1163    }
1164
1165    #[tokio::test]
1166    async fn run_job_command_blocks_forbidden_path_argument() {
1167        let tmp = TempDir::new().unwrap();
1168        let mut config = test_config(&tmp).await;
1169        config
1170            .risk_profiles
1171            .entry(TEST_AGENT.into())
1172            .or_default()
1173            .allowed_commands = vec!["cat".into()];
1174        let job = test_job("cat /etc/passwd");
1175        let security = test_security(&config);
1176
1177        let (success, output) = run_job_command(&config, &security, &job).await;
1178        assert!(!success);
1179        assert!(output.contains("blocked by security policy"));
1180        assert!(output.contains("forbidden path argument"));
1181        assert!(output.contains("/etc/passwd"));
1182    }
1183
1184    #[tokio::test]
1185    async fn run_job_command_blocks_forbidden_option_assignment_path_argument() {
1186        let tmp = TempDir::new().unwrap();
1187        let mut config = test_config(&tmp).await;
1188        config
1189            .risk_profiles
1190            .entry(TEST_AGENT.into())
1191            .or_default()
1192            .allowed_commands = vec!["grep".into()];
1193        let job = test_job("grep --file=/etc/passwd root ./src");
1194        let security = test_security(&config);
1195
1196        let (success, output) = run_job_command(&config, &security, &job).await;
1197        assert!(!success);
1198        assert!(output.contains("blocked by security policy"));
1199        assert!(output.contains("forbidden path argument"));
1200        assert!(output.contains("/etc/passwd"));
1201    }
1202
1203    #[tokio::test]
1204    async fn run_job_command_blocks_forbidden_short_option_attached_path_argument() {
1205        let tmp = TempDir::new().unwrap();
1206        let mut config = test_config(&tmp).await;
1207        config
1208            .risk_profiles
1209            .entry(TEST_AGENT.into())
1210            .or_default()
1211            .allowed_commands = vec!["grep".into()];
1212        let job = test_job("grep -f/etc/passwd root ./src");
1213        let security = test_security(&config);
1214
1215        let (success, output) = run_job_command(&config, &security, &job).await;
1216        assert!(!success);
1217        assert!(output.contains("blocked by security policy"));
1218        assert!(output.contains("forbidden path argument"));
1219        assert!(output.contains("/etc/passwd"));
1220    }
1221
1222    #[tokio::test]
1223    async fn run_job_command_blocks_tilde_user_path_argument() {
1224        let tmp = TempDir::new().unwrap();
1225        let mut config = test_config(&tmp).await;
1226        config
1227            .risk_profiles
1228            .entry(TEST_AGENT.into())
1229            .or_default()
1230            .allowed_commands = vec!["cat".into()];
1231        let job = test_job("cat ~root/.ssh/id_rsa");
1232        let security = test_security(&config);
1233
1234        let (success, output) = run_job_command(&config, &security, &job).await;
1235        assert!(!success);
1236        assert!(output.contains("blocked by security policy"));
1237        assert!(output.contains("forbidden path argument"));
1238        assert!(output.contains("~root/.ssh/id_rsa"));
1239    }
1240
1241    #[tokio::test]
1242    async fn run_job_command_blocks_input_redirection_path_bypass() {
1243        let tmp = TempDir::new().unwrap();
1244        let mut config = test_config(&tmp).await;
1245        config
1246            .risk_profiles
1247            .entry(TEST_AGENT.into())
1248            .or_default()
1249            .allowed_commands = vec!["cat".into()];
1250        let job = test_job("cat </etc/passwd");
1251        let security = test_security(&config);
1252
1253        let (success, output) = run_job_command(&config, &security, &job).await;
1254        assert!(!success);
1255        assert!(output.contains("blocked by security policy"));
1256        assert!(output.to_lowercase().contains("not allowed"));
1257    }
1258
1259    #[tokio::test]
1260    async fn run_job_command_blocks_readonly_mode() {
1261        let tmp = TempDir::new().unwrap();
1262        let mut config = test_config(&tmp).await;
1263        config
1264            .risk_profiles
1265            .entry(TEST_AGENT.into())
1266            .or_default()
1267            .level = crate::security::AutonomyLevel::ReadOnly;
1268        let job = test_job("echo should-not-run");
1269        let security = test_security(&config);
1270
1271        let (success, output) = run_job_command(&config, &security, &job).await;
1272        assert!(!success);
1273        assert!(output.contains("blocked by security policy"));
1274        assert!(output.contains("read-only"));
1275    }
1276
1277    #[tokio::test]
1278    async fn run_job_command_blocks_rate_limited() {
1279        let tmp = TempDir::new().unwrap();
1280        let mut config = test_config(&tmp).await;
1281        config
1282            .runtime_profiles
1283            .entry(TEST_AGENT.into())
1284            .or_default()
1285            .max_actions_per_hour = 0;
1286        let job = test_job("echo should-not-run");
1287        let security = test_security(&config);
1288
1289        let (success, output) = run_job_command(&config, &security, &job).await;
1290        assert!(!success);
1291        assert!(output.contains("blocked by security policy"));
1292        assert!(output.contains("rate limit exceeded"));
1293    }
1294
1295    #[tokio::test]
1296    async fn execute_job_with_retry_recovers_after_first_failure() {
1297        let tmp = TempDir::new().unwrap();
1298        let mut config = test_config(&tmp).await;
1299        config.reliability.scheduler_retries = 1;
1300        config.reliability.provider_backoff_ms = 1;
1301        config
1302            .risk_profiles
1303            .entry(TEST_AGENT.into())
1304            .or_default()
1305            .allowed_commands = vec!["sh".into()];
1306        let security = test_security(&config);
1307
1308        tokio::fs::write(
1309            config.data_dir.join("retry-once.sh"),
1310            "#!/bin/sh\nif [ -f retry-ok.flag ]; then\n  echo recovered\n  exit 0\nfi\ntouch retry-ok.flag\nexit 1\n",
1311        )
1312        .await
1313        .unwrap();
1314        let job = test_job("sh ./retry-once.sh");
1315
1316        let (success, output) = Box::pin(execute_job_with_retry(
1317            &config,
1318            &security,
1319            "test-agent",
1320            &job,
1321        ))
1322        .await;
1323        assert!(success);
1324        assert!(output.contains("recovered"));
1325    }
1326
1327    #[tokio::test]
1328    async fn execute_job_with_retry_exhausts_attempts() {
1329        let tmp = TempDir::new().unwrap();
1330        let mut config = test_config(&tmp).await;
1331        config.reliability.scheduler_retries = 1;
1332        config.reliability.provider_backoff_ms = 1;
1333        let security = test_security(&config);
1334
1335        let job = test_job("ls always_missing_for_retry_test");
1336
1337        let (success, output) = Box::pin(execute_job_with_retry(
1338            &config,
1339            &security,
1340            "test-agent",
1341            &job,
1342        ))
1343        .await;
1344        assert!(!success);
1345        assert!(output.contains("always_missing_for_retry_test"));
1346    }
1347
1348    #[tokio::test]
1349    async fn run_agent_job_returns_error_without_provider_key() {
1350        let tmp = TempDir::new().unwrap();
1351        let config = test_config(&tmp).await;
1352        let mut job = test_job("");
1353        job.job_type = JobType::Agent;
1354        job.prompt = Some("Say hello".into());
1355        let security = test_security(&config);
1356
1357        let (success, output) =
1358            Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1359        assert!(!success);
1360        assert!(output.contains("agent job failed:"));
1361    }
1362
1363    #[tokio::test]
1364    async fn run_agent_job_blocks_readonly_mode() {
1365        let tmp = TempDir::new().unwrap();
1366        let mut config = test_config(&tmp).await;
1367        config
1368            .risk_profiles
1369            .entry(TEST_AGENT.into())
1370            .or_default()
1371            .level = crate::security::AutonomyLevel::ReadOnly;
1372        let mut job = test_job("");
1373        job.job_type = JobType::Agent;
1374        job.prompt = Some("Say hello".into());
1375        let security = test_security(&config);
1376
1377        let (success, output) =
1378            Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1379        assert!(!success);
1380        assert!(output.contains("blocked by security policy"));
1381        assert!(output.contains("read-only"));
1382    }
1383
1384    #[tokio::test]
1385    async fn run_agent_job_blocks_rate_limited() {
1386        let tmp = TempDir::new().unwrap();
1387        let mut config = test_config(&tmp).await;
1388        config
1389            .runtime_profiles
1390            .entry(TEST_AGENT.into())
1391            .or_default()
1392            .max_actions_per_hour = 0;
1393        let mut job = test_job("");
1394        job.job_type = JobType::Agent;
1395        job.prompt = Some("Say hello".into());
1396        let security = test_security(&config);
1397
1398        let (success, output) =
1399            Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1400        assert!(!success);
1401        assert!(output.contains("blocked by security policy"));
1402        assert!(output.contains("rate limit exceeded"));
1403    }
1404
1405    #[tokio::test]
1406    async fn process_due_jobs_marks_component_ok_even_when_idle() {
1407        let tmp = TempDir::new().unwrap();
1408        let config = test_config(&tmp).await;
1409        let component = unique_component("scheduler-idle");
1410
1411        crate::health::mark_component_error(&component, "pre-existing error");
1412        process_due_jobs(&config, Vec::new(), &component, &None).await;
1413
1414        let snapshot = crate::health::snapshot_json();
1415        let entry = &snapshot["components"][component.as_str()];
1416        assert_eq!(entry["status"], "ok");
1417        assert!(entry["last_ok"].as_str().is_some());
1418        assert!(entry["last_error"].is_null());
1419    }
1420
1421    #[tokio::test]
1422    async fn process_due_jobs_failure_does_not_mark_component_unhealthy() {
1423        let tmp = TempDir::new().unwrap();
1424        let config = test_config(&tmp).await;
1425        let job = test_job("ls definitely_missing_file_for_scheduler_component_health_test");
1426        let component = unique_component("scheduler-fail");
1427
1428        crate::health::mark_component_ok(&component);
1429        process_due_jobs(&config, vec![job], &component, &None).await;
1430
1431        let snapshot = crate::health::snapshot_json();
1432        let entry = &snapshot["components"][component.as_str()];
1433        assert_eq!(entry["status"], "ok");
1434    }
1435
1436    #[tokio::test]
1437    async fn persist_job_result_records_run_and_reschedules_shell_job() {
1438        let tmp = TempDir::new().unwrap();
1439        let config = test_config(&tmp).await;
1440        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1441        let started = Utc::now();
1442        let finished = started + ChronoDuration::milliseconds(10);
1443
1444        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1445        assert!(success);
1446
1447        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1448        assert_eq!(runs.len(), 1);
1449        let updated = cron::get_job(&config, &job.id).unwrap();
1450        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1451    }
1452
1453    #[tokio::test]
1454    async fn persist_job_result_uses_one_write_connection_for_recurring_job() {
1455        let tmp = TempDir::new().unwrap();
1456        let config = test_config(&tmp).await;
1457        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1458        let started = Utc::now();
1459        let finished = started + ChronoDuration::milliseconds(10);
1460
1461        crate::cron::store::reset_write_connection_count_for_tests(&config);
1462        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1463
1464        assert!(success);
1465        assert_eq!(
1466            crate::cron::store::write_connection_count_for_tests(&config),
1467            1
1468        );
1469    }
1470
1471    #[tokio::test]
1472    async fn persist_job_result_prunes_run_history_and_updates_last_fields() {
1473        let tmp = TempDir::new().unwrap();
1474        let mut config = test_config(&tmp).await;
1475        config.scheduler.max_run_history = 2;
1476        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1477        let base = Utc::now();
1478
1479        for idx in 0..3 {
1480            let started = base + ChronoDuration::seconds(idx);
1481            let finished = started + ChronoDuration::milliseconds(10);
1482            let output = format!("run-{idx}");
1483
1484            let success = persist_job_result(&config, &job, true, &output, started, finished).await;
1485            assert!(success);
1486        }
1487
1488        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1489        assert_eq!(runs.len(), 2);
1490        assert_eq!(runs[0].output.as_deref(), Some("run-2"));
1491        assert_eq!(runs[1].output.as_deref(), Some("run-1"));
1492
1493        let updated = cron::get_job(&config, &job.id).unwrap();
1494        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1495        assert_eq!(updated.last_output.as_deref(), Some("run-2"));
1496        assert!(updated.last_run.is_some());
1497    }
1498
1499    #[tokio::test]
1500    async fn persist_job_result_rolls_back_run_history_when_job_state_update_fails() {
1501        let tmp = TempDir::new().unwrap();
1502        let config = test_config(&tmp).await;
1503        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1504        let original_next_run = job.next_run;
1505        let started = Utc::now();
1506        let finished = started + ChronoDuration::milliseconds(10);
1507
1508        let conn =
1509            rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1510        conn.execute_batch(
1511            "CREATE TRIGGER fail_cron_job_update
1512             BEFORE UPDATE ON cron_jobs
1513             BEGIN
1514                 SELECT RAISE(ABORT, 'blocked update');
1515             END;",
1516        )
1517        .unwrap();
1518        drop(conn);
1519
1520        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1521
1522        assert!(success);
1523        assert!(cron::list_runs(&config, &job.id, 10).unwrap().is_empty());
1524
1525        let stored = cron::get_job(&config, &job.id).unwrap();
1526        assert_eq!(stored.next_run, original_next_run);
1527        assert!(stored.last_run.is_none());
1528        assert!(stored.last_status.is_none());
1529        assert!(stored.last_output.is_none());
1530    }
1531
1532    #[tokio::test]
1533    async fn persist_job_result_success_deletes_one_shot() {
1534        let tmp = TempDir::new().unwrap();
1535        let config = test_config(&tmp).await;
1536        let at = Utc::now() + ChronoDuration::minutes(10);
1537        let job = cron::add_agent_job(
1538            &config,
1539            TEST_AGENT,
1540            Some("one-shot".into()),
1541            crate::cron::Schedule::At { at },
1542            "Hello",
1543            SessionTarget::Isolated,
1544            None,
1545            None,
1546            true,
1547            None,
1548        )
1549        .unwrap();
1550        let started = Utc::now();
1551        let finished = started + ChronoDuration::milliseconds(10);
1552
1553        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1554        assert!(success);
1555        let lookup = cron::get_job(&config, &job.id);
1556        assert!(lookup.is_err());
1557    }
1558
1559    #[tokio::test]
1560    async fn persist_job_result_failure_disables_one_shot() {
1561        let tmp = TempDir::new().unwrap();
1562        let config = test_config(&tmp).await;
1563        let at = Utc::now() + ChronoDuration::minutes(10);
1564        let job = cron::add_agent_job(
1565            &config,
1566            TEST_AGENT,
1567            Some("one-shot".into()),
1568            crate::cron::Schedule::At { at },
1569            "Hello",
1570            SessionTarget::Isolated,
1571            None,
1572            None,
1573            true,
1574            None,
1575        )
1576        .unwrap();
1577        let started = Utc::now();
1578        let finished = started + ChronoDuration::milliseconds(10);
1579
1580        let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1581        assert!(!success);
1582        let updated = cron::get_job(&config, &job.id).unwrap();
1583        assert!(!updated.enabled);
1584        assert_eq!(updated.last_status.as_deref(), Some("error"));
1585    }
1586
1587    #[tokio::test]
1588    async fn persist_job_result_uses_one_write_connection_for_failed_one_shot_disable() {
1589        let tmp = TempDir::new().unwrap();
1590        let config = test_config(&tmp).await;
1591        let at = Utc::now() + ChronoDuration::minutes(10);
1592        let job = cron::add_agent_job(
1593            &config,
1594            "test-agent",
1595            Some("one-shot".into()),
1596            crate::cron::Schedule::At { at },
1597            "Hello",
1598            SessionTarget::Isolated,
1599            None,
1600            None,
1601            true,
1602            None,
1603        )
1604        .unwrap();
1605        let started = Utc::now();
1606        let finished = started + ChronoDuration::milliseconds(10);
1607
1608        crate::cron::store::reset_write_connection_count_for_tests(&config);
1609        let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1610
1611        assert!(!success);
1612        assert_eq!(
1613            crate::cron::store::write_connection_count_for_tests(&config),
1614            1
1615        );
1616    }
1617
1618    #[tokio::test]
1619    async fn persist_job_result_falls_back_to_state_update_when_history_prune_fails() {
1620        let tmp = TempDir::new().unwrap();
1621        let mut config = test_config(&tmp).await;
1622        config.scheduler.max_run_history = 1;
1623        let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1624        let original_next_run = job.next_run;
1625        let seed_started = Utc::now() - ChronoDuration::minutes(20);
1626        let seed_finished = seed_started + ChronoDuration::milliseconds(10);
1627        let started = Utc::now();
1628        let finished = started + ChronoDuration::milliseconds(10);
1629
1630        let conn =
1631            rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1632        conn.execute(
1633            "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
1634             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1635            rusqlite::params![
1636                job.id,
1637                seed_started.to_rfc3339(),
1638                seed_finished.to_rfc3339(),
1639                "seed",
1640                "seed",
1641                10,
1642            ],
1643        )
1644        .unwrap();
1645        conn.execute_batch(
1646            "CREATE TRIGGER fail_cron_run_prune
1647             BEFORE DELETE ON cron_runs
1648             BEGIN
1649                 SELECT RAISE(ABORT, 'blocked prune');
1650             END;",
1651        )
1652        .unwrap();
1653        drop(conn);
1654
1655        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1656        assert!(success);
1657
1658        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1659        assert_eq!(runs.len(), 1);
1660        assert_eq!(runs[0].status, "seed");
1661
1662        let updated = cron::get_job(&config, &job.id).unwrap();
1663        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1664        assert_eq!(updated.last_output.as_deref(), Some("ok"));
1665        assert!(updated.last_run.is_some());
1666        assert!(updated.next_run >= original_next_run);
1667    }
1668
1669    #[tokio::test]
1670    async fn persist_job_result_falls_back_to_disable_when_auto_delete_history_insert_fails() {
1671        let tmp = TempDir::new().unwrap();
1672        let config = test_config(&tmp).await;
1673        let at = Utc::now() + ChronoDuration::minutes(10);
1674        let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1675        assert!(job.delete_after_run);
1676        let started = Utc::now();
1677        let finished = started + ChronoDuration::milliseconds(10);
1678
1679        let conn =
1680            rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1681        conn.execute_batch(
1682            "CREATE TRIGGER fail_cron_run_insert
1683             BEFORE INSERT ON cron_runs
1684             BEGIN
1685                 SELECT RAISE(ABORT, 'blocked insert');
1686             END;",
1687        )
1688        .unwrap();
1689        drop(conn);
1690
1691        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1692        assert!(success);
1693
1694        let updated = cron::get_job(&config, &job.id).unwrap();
1695        assert!(!updated.enabled);
1696        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1697        assert_eq!(updated.last_output.as_deref(), Some("ok"));
1698        assert!(cron::list_runs(&config, &job.id, 10).unwrap().is_empty());
1699    }
1700
1701    #[tokio::test]
1702    async fn persist_job_result_success_deletes_one_shot_shell_job() {
1703        let tmp = TempDir::new().unwrap();
1704        let config = test_config(&tmp).await;
1705        let at = Utc::now() + ChronoDuration::minutes(10);
1706        let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1707        assert!(job.delete_after_run);
1708        let started = Utc::now();
1709        let finished = started + ChronoDuration::milliseconds(10);
1710
1711        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1712        assert!(success);
1713        let lookup = cron::get_job(&config, &job.id);
1714        assert!(lookup.is_err());
1715    }
1716
1717    #[tokio::test]
1718    async fn persist_job_result_failure_disables_one_shot_shell_job() {
1719        let tmp = TempDir::new().unwrap();
1720        let config = test_config(&tmp).await;
1721        let at = Utc::now() + ChronoDuration::minutes(10);
1722        let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1723        assert!(job.delete_after_run);
1724        let started = Utc::now();
1725        let finished = started + ChronoDuration::milliseconds(10);
1726
1727        let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1728        assert!(!success);
1729        let updated = cron::get_job(&config, &job.id).unwrap();
1730        assert!(!updated.enabled);
1731        assert_eq!(updated.last_status.as_deref(), Some("error"));
1732    }
1733
1734    #[tokio::test]
1735    async fn persist_job_result_delivery_stubbed_succeeds() {
1736        // Delivery is stubbed (moved to zeroclaw-channels orchestrator).
1737        // This test verifies the stub returns Ok, so persist_job_result succeeds.
1738        let tmp = TempDir::new().unwrap();
1739        let config = test_config(&tmp).await;
1740        let job = cron::add_agent_job(
1741            &config,
1742            TEST_AGENT,
1743            Some("announce-job".into()),
1744            crate::cron::Schedule::Cron {
1745                expr: "*/5 * * * *".into(),
1746                tz: None,
1747            },
1748            "deliver this",
1749            SessionTarget::Isolated,
1750            None,
1751            Some(DeliveryConfig {
1752                mode: "announce".into(),
1753                channel: Some("telegram".into()),
1754                to: Some("123456".into()),
1755                thread_id: None,
1756                best_effort: false,
1757            }),
1758            false,
1759            None,
1760        )
1761        .unwrap();
1762        let started = Utc::now();
1763        let finished = started + ChronoDuration::milliseconds(10);
1764
1765        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1766        assert!(success);
1767
1768        let updated = cron::get_job(&config, &job.id).unwrap();
1769        assert!(updated.enabled);
1770        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1771
1772        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1773        assert_eq!(runs.len(), 1);
1774        assert_eq!(runs[0].status, "ok");
1775    }
1776
1777    #[tokio::test]
1778    async fn persist_job_result_delivery_failure_best_effort_marks_degraded() {
1779        let tmp = TempDir::new().unwrap();
1780        let config = test_config(&tmp).await;
1781        register_delivery_fn(Box::new(
1782            |_config, channel, _target, _thread_id, _output| {
1783                Box::pin(async move {
1784                    if channel == "fail-delivery" {
1785                        anyhow::bail!("synthetic delivery failure");
1786                    }
1787                    Ok(())
1788                })
1789            },
1790        ));
1791        let mut job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1792        job.delivery = DeliveryConfig {
1793            mode: "announce".into(),
1794            channel: Some("fail-delivery".into()),
1795            to: Some("123456".into()),
1796            thread_id: None,
1797            best_effort: true,
1798        };
1799        let started = Utc::now();
1800        let finished = started + ChronoDuration::milliseconds(10);
1801
1802        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1803        assert!(success);
1804
1805        let updated = cron::get_job(&config, &job.id).unwrap();
1806        assert!(updated.enabled);
1807        assert_eq!(updated.last_status.as_deref(), Some("degraded"));
1808        assert!(
1809            updated
1810                .last_output
1811                .as_deref()
1812                .unwrap_or_default()
1813                .contains("delivery failed:")
1814        );
1815
1816        let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1817        assert_eq!(runs.len(), 1);
1818        assert_eq!(runs[0].status, "degraded");
1819    }
1820
1821    #[tokio::test]
1822    async fn delivery_failure_classification_preserves_empty_output_evidence() {
1823        let tmp = TempDir::new().unwrap();
1824        let config = test_config(&tmp).await;
1825        register_delivery_fn(Box::new(
1826            |_config, channel, _target, _thread_id, _output| {
1827                Box::pin(async move {
1828                    if channel == "fail-delivery" {
1829                        anyhow::bail!("synthetic delivery failure");
1830                    }
1831                    Ok(())
1832                })
1833            },
1834        ));
1835        let mut job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1836        job.delivery = DeliveryConfig {
1837            mode: "announce".into(),
1838            channel: Some("fail-delivery".into()),
1839            to: Some("123456".into()),
1840            thread_id: None,
1841            best_effort: true,
1842        };
1843
1844        let outcome = deliver_and_classify_run_result(
1845            &config,
1846            &job,
1847            true,
1848            String::new(),
1849            CronDeliveryContext::Scheduled,
1850        )
1851        .await;
1852
1853        assert!(outcome.success);
1854        assert_eq!(outcome.status, "degraded");
1855        assert!(outcome.output.starts_with("delivery failed:"));
1856    }
1857
1858    #[tokio::test]
1859    async fn persist_job_result_at_schedule_without_delete_after_run_is_disabled() {
1860        let tmp = TempDir::new().unwrap();
1861        let config = test_config(&tmp).await;
1862        let at = Utc::now() + ChronoDuration::minutes(10);
1863        let job = cron::add_agent_job(
1864            &config,
1865            TEST_AGENT,
1866            Some("at-no-autodelete".into()),
1867            crate::cron::Schedule::At { at },
1868            "Hello",
1869            SessionTarget::Isolated,
1870            None,
1871            None,
1872            false,
1873            None,
1874        )
1875        .unwrap();
1876        assert!(!job.delete_after_run);
1877
1878        let started = Utc::now();
1879        let finished = started + ChronoDuration::milliseconds(10);
1880        let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1881        assert!(success);
1882
1883        // After reschedule_after_run, At schedule jobs should be disabled
1884        // to prevent re-execution with a past next_run timestamp.
1885        let updated = cron::get_job(&config, &job.id).unwrap();
1886        assert!(
1887            !updated.enabled,
1888            "At schedule job should be disabled after execution via reschedule"
1889        );
1890        assert_eq!(updated.last_status.as_deref(), Some("ok"));
1891    }
1892
1893    #[tokio::test]
1894    async fn deliver_if_configured_handles_none_mode() {
1895        let tmp = TempDir::new().unwrap();
1896        let config = test_config(&tmp).await;
1897        let job = test_job("echo ok");
1898
1899        // Default delivery mode is not "announce", so should be a no-op.
1900        assert!(deliver_if_configured(&config, &job, "x").await.is_ok());
1901    }
1902
1903    #[tokio::test]
1904    async fn deliver_announcement_returns_ok_when_no_handler_registered() {
1905        let tmp = TempDir::new().unwrap();
1906        let config = test_config(&tmp).await;
1907        // No registered handler is a runtime-level state, not a delivery
1908        // failure. The caller (persist_job_result) should record the job
1909        // execution as successful; the missing handler is logged via
1910        // tracing::warn for operator visibility.
1911        deliver_announcement(&config, "telegram", "chat-id", None, "payload")
1912            .await
1913            .expect("missing delivery handler should be Ok with a warn log");
1914    }
1915
1916    #[test]
1917    fn build_cron_shell_command_uses_sh_non_login() {
1918        let workspace = std::env::temp_dir();
1919        let cmd = build_cron_shell_command("echo cron-test", &workspace).unwrap();
1920        let debug = format!("{cmd:?}");
1921        assert!(debug.contains("echo cron-test"));
1922        assert!(debug.contains("\"sh\""), "should use sh: {debug}");
1923        // Must NOT use login shell (-l) — login shells load full profile
1924        // and are slow/unpredictable for cron jobs.
1925        assert!(
1926            !debug.contains("\"-lc\""),
1927            "must not use login shell: {debug}"
1928        );
1929    }
1930
1931    #[tokio::test]
1932    async fn build_cron_shell_command_executes_successfully() {
1933        let workspace = std::env::temp_dir();
1934        let mut cmd = build_cron_shell_command("echo cron-ok", &workspace).unwrap();
1935        let output = cmd.output().await.unwrap();
1936        assert!(output.status.success());
1937        let stdout = String::from_utf8_lossy(&output.stdout);
1938        assert!(stdout.contains("cron-ok"));
1939    }
1940
1941    #[tokio::test]
1942    async fn catch_up_queries_all_overdue_jobs_ignoring_max_tasks() {
1943        let tmp = TempDir::new().unwrap();
1944        let mut config = test_config(&tmp).await;
1945        config.scheduler.max_tasks = 1; // limit normal polling to 1
1946
1947        // Create 3 jobs with "every minute" schedule
1948        for i in 0..3 {
1949            let _ = cron::add_job(
1950                &config,
1951                "test-agent",
1952                "* * * * *",
1953                &format!("echo catchup-{i}"),
1954            )
1955            .unwrap();
1956        }
1957
1958        // Verify normal due_jobs is limited to max_tasks=1
1959        let far_future = Utc::now() + ChronoDuration::days(1);
1960        let due = cron::due_jobs(&config, far_future).unwrap();
1961        assert_eq!(due.len(), 1, "due_jobs must respect max_tasks");
1962
1963        // all_overdue_jobs ignores the limit
1964        let overdue = cron::all_overdue_jobs(&config, far_future).unwrap();
1965        assert_eq!(overdue.len(), 3, "all_overdue_jobs must return all");
1966    }
1967
1968    // scan_and_redact_output tests moved to zeroclaw-channels orchestrator
1969
1970    // ── Broadcast / EventBroadcast tests ─────────────────────────────
1971
1972    #[tokio::test]
1973    async fn broadcast_sends_cron_result_on_success() {
1974        let tmp = TempDir::new().unwrap();
1975        let mut config = test_config(&tmp).await;
1976        let job = test_job("echo broadcast-ok");
1977        // Bind the synthetic test job to test-agent so process_due_jobs's
1978        // owning-agent lookup succeeds (jobs without an owner are skipped).
1979        config
1980            .agents
1981            .get_mut("test-agent")
1982            .unwrap()
1983            .cron_jobs
1984            .push(job.id.clone());
1985        let component = unique_component("broadcast-ok");
1986
1987        let (tx, mut rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
1988        let event_tx: EventBroadcast = Some(tx);
1989
1990        process_due_jobs(&config, vec![job], &component, &event_tx).await;
1991
1992        let event = rx.try_recv().expect("should receive a broadcast event");
1993        assert_eq!(event["type"], "cron_result");
1994        assert_eq!(event["job_id"], "test-job");
1995        assert_eq!(event["success"], true);
1996        assert!(event["output"].as_str().unwrap().contains("broadcast-ok"));
1997        assert!(event["timestamp"].as_str().is_some());
1998    }
1999
2000    #[tokio::test]
2001    async fn broadcast_sends_cron_result_on_failure() {
2002        let tmp = TempDir::new().unwrap();
2003        let mut config = test_config(&tmp).await;
2004        let job = test_job("ls definitely_missing_file_for_broadcast_fail_test");
2005        config
2006            .agents
2007            .get_mut("test-agent")
2008            .unwrap()
2009            .cron_jobs
2010            .push(job.id.clone());
2011        let component = unique_component("broadcast-fail");
2012
2013        let (tx, mut rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2014        let event_tx: EventBroadcast = Some(tx);
2015
2016        process_due_jobs(&config, vec![job], &component, &event_tx).await;
2017
2018        let event = rx.try_recv().expect("should receive a broadcast event");
2019        assert_eq!(event["type"], "cron_result");
2020        assert_eq!(event["job_id"], "test-job");
2021        assert_eq!(event["success"], false);
2022        assert!(event["timestamp"].as_str().is_some());
2023    }
2024
2025    #[tokio::test]
2026    async fn broadcast_none_skips_without_error() {
2027        let tmp = TempDir::new().unwrap();
2028        let config = test_config(&tmp).await;
2029        let job = test_job("echo no-broadcast");
2030        let component = unique_component("broadcast-none");
2031
2032        // event_tx = None — should complete without panic.
2033        process_due_jobs(&config, vec![job], &component, &None).await;
2034    }
2035
2036    #[tokio::test]
2037    async fn broadcast_handles_no_subscribers() {
2038        let tmp = TempDir::new().unwrap();
2039        let config = test_config(&tmp).await;
2040        let job = test_job("echo no-subscribers");
2041        let component = unique_component("broadcast-no-sub");
2042
2043        let (tx, _) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2044        // Drop the only receiver immediately — `let _ = tx.send(...)` in
2045        // process_due_jobs must not panic when there are no subscribers.
2046        let event_tx: EventBroadcast = Some(tx);
2047
2048        process_due_jobs(&config, vec![job], &component, &event_tx).await;
2049        // If we got here without panic, the test passes.
2050    }
2051}