1use crate::cron::store::{RunCompletionAction, persist_run_completion_state, persist_run_result};
2use crate::cron::{
3 CronJob, DeliveryConfig, JobType, Schedule, SessionTarget, all_overdue_jobs, due_jobs,
4 next_run_for_schedule, skip_missed_run, sync_declarative_jobs,
5};
6use crate::security::SecurityPolicy;
7use anyhow::Result;
8use chrono::{DateTime, Utc};
9use futures_util::{StreamExt, stream};
10use std::process::Stdio;
11use std::sync::Arc;
12use tokio::process::Command;
13use tokio::time::{self, Duration};
14use zeroclaw_config::schema::Config;
15use zeroclaw_config::schema::{CronJobDecl, CronScheduleDecl};
16use zeroclaw_log::Instrument;
17use zeroclaw_memory::{MEMORY_CONTEXT_CLOSE, MEMORY_CONTEXT_OPEN};
18
19const MIN_POLL_SECONDS: u64 = 5;
20const SHELL_JOB_TIMEOUT_SECS: u64 = 120;
21const SCHEDULER_COMPONENT: &str = "scheduler";
22const CRON_AGENT_DEFAULT_EXCLUDED_TOOLS: &[&str] = &[
23 "cron_add",
24 "cron_update",
25 "cron_remove",
26 "cron_run",
27 "schedule",
28];
29
30pub type EventBroadcast = Option<tokio::sync::broadcast::Sender<serde_json::Value>>;
33
34#[derive(Clone, Copy)]
35pub enum CronDeliveryContext {
36 Scheduled,
37 ToolManual,
38 GatewayManual,
39}
40
41impl CronDeliveryContext {
42 fn failure_message(self, best_effort: bool) -> &'static str {
43 match (self, best_effort) {
44 (Self::Scheduled, true) => "Cron delivery failed (best_effort)",
45 (Self::Scheduled, false) => "Cron delivery failed",
46 (Self::ToolManual, true) => "cron_run delivery failed (best_effort)",
47 (Self::ToolManual, false) => "cron_run delivery failed",
48 (Self::GatewayManual, true) => "manual cron trigger delivery failed (best_effort)",
49 (Self::GatewayManual, false) => "manual cron trigger delivery failed",
50 }
51 }
52}
53
54pub struct CronDeliveryOutcome {
55 pub success: bool,
56 pub status: String,
57 pub output: String,
58}
59
60pub async fn deliver_and_classify_run_result(
61 config: &Config,
62 job: &CronJob,
63 mut success: bool,
64 mut output: String,
65 context: CronDeliveryContext,
66) -> CronDeliveryOutcome {
67 let mut status = if success { "ok" } else { "error" }.to_string();
68
69 if let Err(e) = deliver_if_configured(config, job, &output).await {
70 let channel = job.delivery.channel.as_deref().unwrap_or("");
75 let target = job.delivery.to.as_deref().unwrap_or("");
76 let delivery_error = e.to_string();
77
78 if job.delivery.best_effort {
79 ::zeroclaw_log::record!(
80 WARN,
81 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
82 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
83 .with_attrs(::serde_json::json!({
84 "job_id": job.id,
85 "agent_alias": job.agent_alias,
86 "channel": channel,
87 "target": target,
88 "error": delivery_error
89 })),
90 context.failure_message(true)
91 );
92 if success {
93 status = "degraded".to_string();
94 }
95 } else {
96 success = false;
97 ::zeroclaw_log::record!(
98 WARN,
99 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
100 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
101 .with_attrs(::serde_json::json!({
102 "job_id": job.id,
103 "agent_alias": job.agent_alias,
104 "channel": channel,
105 "target": target,
106 "error": delivery_error
107 })),
108 context.failure_message(false)
109 );
110 status = "error".to_string();
111 }
112
113 if output.trim().is_empty() {
114 output = format!("delivery failed: {delivery_error}");
115 } else {
116 output.push_str("\n\ndelivery failed: ");
117 output.push_str(&delivery_error);
118 }
119 }
120
121 CronDeliveryOutcome {
122 success,
123 status,
124 output,
125 }
126}
127
128pub async fn run(config: Config, event_tx: EventBroadcast) -> Result<()> {
129 let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS);
130 let mut interval = time::interval(Duration::from_secs(poll_secs));
131 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
132
133 crate::health::mark_component_ok(SCHEDULER_COMPONENT);
134
135 let mut jobs_with_builtin = config.cron.clone();
137 if let Some(ref schedule_cron) = config.backup.schedule_cron {
138 let backup_job = CronJobDecl {
139 name: Some("Scheduled backup".to_string()),
140 job_type: "shell".to_string(),
141 schedule: CronScheduleDecl::Cron {
142 expr: schedule_cron.clone(),
143 tz: config.backup.schedule_timezone.clone(),
144 },
145 command: Some("backup create".to_string()),
146 prompt: None,
147 enabled: true,
148 model: None,
149 allowed_tools: None,
150 uses_memory: true,
151 session_target: None,
152 delivery: None,
153 };
154 ::zeroclaw_log::record!(
155 DEBUG,
156 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
157 .with_attrs(::serde_json::json!({"schedule": schedule_cron})),
158 "Synthesizing builtin backup cron job from config.backup.schedule_cron"
159 );
160 jobs_with_builtin.insert("__builtin_backup".to_string(), backup_job);
161 }
162
163 match sync_declarative_jobs(&config, &jobs_with_builtin) {
164 Ok(()) => {
165 if !jobs_with_builtin.is_empty() {
166 ::zeroclaw_log::record!(
167 INFO,
168 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
169 .with_attrs(::serde_json::json!({"count": jobs_with_builtin.len()})),
170 "Synced declarative cron jobs from config"
171 );
172 }
173 }
174 Err(e) => ::zeroclaw_log::record!(
175 WARN,
176 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
177 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
178 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
179 "Failed to sync declarative cron jobs"
180 ),
181 }
182
183 if config.scheduler.catch_up_on_startup {
190 catch_up_overdue_jobs(&config, &event_tx).await;
191 } else {
192 ::zeroclaw_log::record!(
193 INFO,
194 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
195 "Scheduler startup: catch-up disabled by config"
196 );
197 skip_missed_jobs_on_startup(&config).await;
198 }
199
200 loop {
201 interval.tick().await;
202 crate::health::mark_component_ok(SCHEDULER_COMPONENT);
204
205 let jobs = match due_jobs(&config, Utc::now()) {
206 Ok(jobs) => jobs,
207 Err(e) => {
208 crate::health::mark_component_error(SCHEDULER_COMPONENT, e.to_string());
209 ::zeroclaw_log::record!(
210 WARN,
211 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
212 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
213 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
214 "Scheduler query failed"
215 );
216 continue;
217 }
218 };
219
220 process_due_jobs(&config, jobs, SCHEDULER_COMPONENT, &event_tx).await;
221 }
222}
223
224fn resolve_owning_agent<'a>(config: &'a Config, job: &CronJob) -> Option<&'a str> {
235 if !job.agent_alias.is_empty()
236 && let Some((alias, _)) = config
237 .agents
238 .iter()
239 .find(|(alias, _)| alias.as_str() == job.agent_alias)
240 {
241 return Some(alias.as_str());
242 }
243 config.agent_for_cron_job(&job.id)
244}
245
246async fn catch_up_overdue_jobs(config: &Config, event_tx: &EventBroadcast) {
251 let now = Utc::now();
252 let jobs = match all_overdue_jobs(config, now) {
253 Ok(jobs) => jobs,
254 Err(e) => {
255 ::zeroclaw_log::record!(
256 WARN,
257 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
258 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
259 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
260 "Startup catch-up query failed"
261 );
262 return;
263 }
264 };
265
266 if jobs.is_empty() {
267 ::zeroclaw_log::record!(
268 INFO,
269 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
270 "Scheduler startup: no overdue jobs to catch up"
271 );
272 return;
273 }
274
275 ::zeroclaw_log::record!(
276 INFO,
277 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
278 .with_attrs(::serde_json::json!({"count": jobs.len()})),
279 "Scheduler startup: catching up overdue jobs"
280 );
281
282 process_due_jobs(config, jobs, SCHEDULER_COMPONENT, event_tx).await;
283
284 ::zeroclaw_log::record!(
285 INFO,
286 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
287 "Scheduler startup: catch-up complete"
288 );
289}
290
291async fn skip_missed_jobs_on_startup(config: &Config) {
300 let now = Utc::now();
301 let jobs = match all_overdue_jobs(config, now) {
302 Ok(jobs) => jobs,
303 Err(e) => {
304 ::zeroclaw_log::record!(
305 WARN,
306 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
307 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
308 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
309 "Scheduler startup skip: query failed",
310 );
311 return;
312 }
313 };
314
315 if jobs.is_empty() {
316 ::zeroclaw_log::record!(
317 INFO,
318 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
319 "Scheduler startup skip: no overdue jobs to advance",
320 );
321 return;
322 }
323
324 let mut skipped_recurring: u64 = 0;
325 let mut skipped_oneshot: u64 = 0;
326
327 for job in &jobs {
328 let is_oneshot = matches!(job.schedule, Schedule::At { .. });
329 match skip_missed_run(config, job, now) {
330 Ok(()) => {
331 if is_oneshot {
332 skipped_oneshot += 1;
333 } else {
334 skipped_recurring += 1;
335 }
336 }
337 Err(e) => {
338 ::zeroclaw_log::record!(
339 WARN,
340 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
341 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
342 .with_attrs(::serde_json::json!({
343 "job_id": job.id,
344 "error": format!("{}", e),
345 })),
346 "Scheduler startup skip: failed to advance job",
347 );
348 }
349 }
350 }
351
352 ::zeroclaw_log::record!(
353 INFO,
354 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
355 ::serde_json::json!({
356 "total": jobs.len(),
357 "skipped_recurring": skipped_recurring,
358 "skipped_oneshot": skipped_oneshot,
359 })
360 ),
361 "Scheduler startup skip: advanced overdue jobs without executing",
362 );
363}
364
365pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) {
366 use zeroclaw_log::Instrument;
367 let Some(agent_alias) = resolve_owning_agent(config, job) else {
368 return (
369 false,
370 format!(
371 "cron job {id:?} has no owning agent; add the alias to an [agents.<x>].cron_jobs list",
372 id = job.id
373 ),
374 );
375 };
376 let agent_alias = agent_alias.to_string();
377 let security = match SecurityPolicy::for_agent(config, &agent_alias) {
378 Ok(s) => s,
379 Err(e) => return (false, format!("agent {agent_alias} risk profile: {e}")),
380 };
381 let span = zeroclaw_log::attribution_span!(job);
382 Box::pin(execute_job_with_retry(config, &security, &agent_alias, job))
383 .instrument(span)
384 .await
385}
386
387fn cron_agent_run_security_policy(base: &SecurityPolicy, job: &CronJob) -> SecurityPolicy {
388 let mut policy = base.clone();
389 if !matches!(job.job_type, JobType::Agent) || job.allowed_tools.is_some() {
390 return policy;
391 }
392
393 let excluded = policy.excluded_tools.get_or_insert_with(Vec::new);
394 for tool in CRON_AGENT_DEFAULT_EXCLUDED_TOOLS {
395 if !excluded.iter().any(|existing| existing == tool) {
396 excluded.push((*tool).to_string());
397 }
398 }
399 policy
400}
401
402async fn execute_job_with_retry(
403 config: &Config,
404 security: &SecurityPolicy,
405 agent_alias: &str,
406 job: &CronJob,
407) -> (bool, String) {
408 let mut last_output = String::new();
409 let retries = config.reliability.scheduler_retries;
410 let mut backoff_ms = config.reliability.provider_backoff_ms.max(200);
411
412 for attempt in 0..=retries {
413 let (success, output) = match job.job_type {
414 JobType::Shell => run_job_command(config, security, job).await,
415 JobType::Agent => Box::pin(run_agent_job(config, security, agent_alias, job)).await,
416 };
417 last_output = output;
418
419 if success {
420 return (true, last_output);
421 }
422
423 if last_output.starts_with("blocked by security policy:") {
424 return (false, last_output);
426 }
427
428 if attempt < retries {
429 let jitter_ms = u64::from(Utc::now().timestamp_subsec_millis() % 250);
430 time::sleep(Duration::from_millis(backoff_ms + jitter_ms)).await;
431 backoff_ms = (backoff_ms.saturating_mul(2)).min(30_000);
432 }
433 }
434
435 (false, last_output)
436}
437
438async fn process_due_jobs(
439 config: &Config,
440 jobs: Vec<CronJob>,
441 component: &str,
442 event_tx: &EventBroadcast,
443) {
444 crate::health::mark_component_ok(component);
446
447 let max_concurrent = config.scheduler.max_concurrent.max(1);
448 let mut in_flight = stream::iter(jobs.into_iter().filter_map(|job| {
449 let Some(agent_alias) = resolve_owning_agent(config, &job) else {
452 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"job_id": job.id})), "Cron job has no owning agent; add the alias to an [agents.<x>].cron_jobs list");
453 return None;
454 };
455 let agent_alias = agent_alias.to_owned();
456 let security = match SecurityPolicy::for_agent(config, &agent_alias) {
457 Ok(s) => Arc::new(s),
458 Err(e) => {
459 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"job_id": job.id, "agent": agent_alias, "error": format!("{}", e)})), "Cron job: failed to build SecurityPolicy for owning agent");
460 return None;
461 }
462 };
463 let config = config.clone();
464 let component = component.to_owned();
465 Some(async move {
466 Box::pin(execute_and_persist_job(
467 &config,
468 security.as_ref(),
469 &agent_alias,
470 &job,
471 &component,
472 ))
473 .await
474 })
475 }))
476 .buffer_unordered(max_concurrent);
477
478 while let Some((job_id, success, output)) = in_flight.next().await {
479 if !success {
480 ::zeroclaw_log::record!(
481 WARN,
482 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
483 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
484 .with_attrs(::serde_json::json!({"job_id": job_id, "output": output})),
485 "Scheduler job '' failed: "
486 );
487 }
488 if let Some(tx) = event_tx {
490 let _ = tx.send(serde_json::json!({
491 "type": "cron_result",
492 "job_id": job_id,
493 "success": success,
494 "output": output,
495 "timestamp": chrono::Utc::now().to_rfc3339(),
496 }));
497 }
498 }
499}
500
501async fn execute_and_persist_job(
502 config: &Config,
503 security: &SecurityPolicy,
504 agent_alias: &str,
505 job: &CronJob,
506 component: &str,
507) -> (String, bool, String) {
508 crate::health::mark_component_ok(component);
509 warn_if_high_frequency_agent_job(job);
510
511 let started_at = Utc::now();
512 let span = zeroclaw_log::attribution_span!(job);
513 let (success, output) = Box::pin(execute_job_with_retry(config, security, agent_alias, job))
514 .instrument(span)
515 .await;
516 let finished_at = Utc::now();
517 let success = Box::pin(persist_job_result(
518 config,
519 job,
520 success,
521 &output,
522 started_at,
523 finished_at,
524 ))
525 .await;
526
527 (job.id.clone(), success, output)
528}
529
530async fn run_agent_job(
531 config: &Config,
532 security: &SecurityPolicy,
533 agent_alias: &str,
534 job: &CronJob,
535) -> (bool, String) {
536 let subagent_ctx = match crate::subagent::SubAgentSpawn::for_agent(config, agent_alias)
542 .and_then(|spawn| spawn.build(crate::subagent::SubAgentOverrides::default()))
543 {
544 Ok(ctx) => ctx,
545 Err(e) => return (false, format!("subagent spawn failed: {e:#}")),
546 };
547
548 if !security.can_act() {
549 return (
550 false,
551 "blocked by security policy: autonomy is read-only".to_string(),
552 );
553 }
554
555 if security.is_rate_limited() {
556 return (
557 false,
558 "blocked by security policy: rate limit exceeded".to_string(),
559 );
560 }
561
562 if !security.record_action() {
563 return (
564 false,
565 "blocked by security policy: action budget exhausted".to_string(),
566 );
567 }
568 let name = job.name.clone().unwrap_or_else(|| "cron-job".to_string());
569 let prompt = job.prompt.clone().unwrap_or_default();
570
571 let memory_context = if !job.uses_memory {
578 String::new()
579 } else {
580 match zeroclaw_memory::create_memory_for_agent(
581 config,
582 agent_alias,
583 config
584 .model_provider_for_agent(agent_alias)
585 .and_then(|e| e.api_key.as_deref()),
586 )
587 .await
588 {
589 Ok(mem) => match mem.recall(&prompt, 5, None, None, None).await {
590 Ok(entries) if !entries.is_empty() => {
591 let ctx: String = entries
592 .iter()
593 .filter(|e| {
594 !matches!(
595 e.category,
596 zeroclaw_memory::traits::MemoryCategory::Conversation
597 )
598 })
599 .map(|e| format!("- {}: {}", e.key, e.content))
600 .collect::<Vec<_>>()
601 .join("\n");
602 if ctx.is_empty() {
603 String::new()
604 } else {
605 format!("{MEMORY_CONTEXT_OPEN}\n{ctx}\n{MEMORY_CONTEXT_CLOSE}\n\n")
606 }
607 }
608 _ => String::new(),
609 },
610 Err(_) => String::new(),
611 }
612 };
613
614 let prefixed_prompt = format!("{memory_context}[cron:{} {name}] {prompt}", job.id);
615 let model_override = job.model.clone();
616
617 let mut cron_config = config.clone();
618 cron_config.memory.auto_save = false;
619
620 let run_session_id = uuid::Uuid::new_v4().to_string();
625 let session_path = std::path::PathBuf::from(format!("cron-{run_session_id}"));
626
627 let subagent_span = zeroclaw_log::info_span!(
628 "subagent",
629 category = "cron",
630 agent_alias = %agent_alias,
631 cron_job_id = %job.id,
632 run_id = %run_session_id,
633 spawn_site = "cron",
634 );
635
636 let run_security = cron_agent_run_security_policy(subagent_ctx.policy.as_ref(), job);
647 let run_overrides = crate::agent::loop_::AgentRunOverrides {
648 security: Some(Arc::new(run_security)),
649 memory: None,
650 is_subagent: false,
651 };
652 let run_result = match job.session_target {
653 SessionTarget::Main | SessionTarget::Isolated => {
654 Box::pin(
655 crate::agent::run(
656 cron_config,
657 agent_alias,
658 Some(prefixed_prompt),
659 None,
660 model_override,
661 config
662 .model_provider_for_agent(agent_alias)
663 .and_then(|e| e.temperature),
664 vec![],
665 false,
666 Some(session_path.clone()),
667 job.allowed_tools.clone(),
668 run_overrides,
669 )
670 .instrument(subagent_span),
671 )
672 .await
673 }
674 };
675
676 match run_result {
677 Ok(response) => (
678 true,
679 if response.trim().is_empty() {
680 "agent job executed".to_string()
681 } else {
682 response
683 },
684 ),
685 Err(e) => {
686 let mem_session_key = zeroclaw_api::session_keys::sanitize_session_key(&format!(
693 "cli:{}",
694 session_path.display()
695 ));
696 if let Ok(mem) = zeroclaw_memory::create_memory_for_agent(
697 config,
698 agent_alias,
699 config
700 .model_provider_for_agent(agent_alias)
701 .and_then(|e| e.api_key.as_deref()),
702 )
703 .await
704 {
705 let _ = mem.purge_session(&mem_session_key).await;
706 }
707 (false, format!("agent job failed: {e}"))
708 }
709 }
710}
711
712async fn persist_job_result(
713 config: &Config,
714 job: &CronJob,
715 success: bool,
716 output: &str,
717 started_at: DateTime<Utc>,
718 finished_at: DateTime<Utc>,
719) -> bool {
720 let duration_ms = (finished_at - started_at).num_milliseconds();
721 let outcome = deliver_and_classify_run_result(
722 config,
723 job,
724 success,
725 output.to_string(),
726 CronDeliveryContext::Scheduled,
727 )
728 .await;
729
730 let action = if is_one_shot_auto_delete(job) && outcome.success {
731 RunCompletionAction::Delete
732 } else if matches!(job.schedule, Schedule::At { .. }) {
733 RunCompletionAction::Disable
734 } else {
735 RunCompletionAction::Reschedule
736 };
737
738 let job_state_at = Utc::now();
739 if let Err(e) = persist_run_result(
740 config,
741 job,
742 started_at,
743 finished_at,
744 job_state_at,
745 &outcome.status,
746 Some(&outcome.output),
747 duration_ms,
748 action,
749 ) {
750 ::zeroclaw_log::record!(
751 WARN,
752 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
753 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
754 .with_attrs(::serde_json::json!({"e": e.to_string()})),
755 "Failed to persist scheduler run result: "
756 );
757
758 if action == RunCompletionAction::Delete {
759 if let Err(disable_err) = persist_run_completion_state(
764 config,
765 job,
766 job_state_at,
767 &outcome.status,
768 Some(&outcome.output),
769 RunCompletionAction::Disable,
770 ) {
771 ::zeroclaw_log::record!(
772 WARN,
773 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
774 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
775 .with_attrs(::serde_json::json!({"disable_err": disable_err.to_string()})),
776 "Failed to disable one-shot cron job after history persistence failure: "
777 );
778 }
779 } else {
780 if let Err(state_err) = persist_run_completion_state(
783 config,
784 job,
785 job_state_at,
786 &outcome.status,
787 Some(&outcome.output),
788 action,
789 ) {
790 ::zeroclaw_log::record!(
791 WARN,
792 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
793 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
794 .with_attrs(::serde_json::json!({"state_err": state_err.to_string()})),
795 "Failed to update cron job state after history persistence failure: "
796 );
797 }
798 }
799 }
800
801 outcome.success
802}
803
804fn is_one_shot_auto_delete(job: &CronJob) -> bool {
805 job.delete_after_run && matches!(job.schedule, Schedule::At { .. })
806}
807
808fn is_high_frequency_agent_job(job: &CronJob) -> bool {
809 if !matches!(job.job_type, JobType::Agent) {
810 return false;
811 }
812 match &job.schedule {
813 Schedule::Every { every_ms } => *every_ms < 5 * 60 * 1000,
814 Schedule::Cron { .. } => {
815 let now = Utc::now();
816 next_run_for_schedule(&job.schedule, now)
817 .and_then(|a| next_run_for_schedule(&job.schedule, a).map(|b| (a, b)))
818 .map(|(a, b)| (b - a).num_minutes() < 5)
819 .unwrap_or(false)
820 }
821 Schedule::At { .. } => false,
822 }
823}
824
825fn warn_if_high_frequency_agent_job(job: &CronJob) {
826 if is_high_frequency_agent_job(job) {
827 ::zeroclaw_log::record!(
828 WARN,
829 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
830 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
831 &format!(
832 "Cron agent job '{}' is scheduled more frequently than every 5 minutes",
833 job.id
834 )
835 );
836 }
837}
838
839async fn deliver_if_configured(config: &Config, job: &CronJob, output: &str) -> Result<()> {
840 let delivery: &DeliveryConfig = &job.delivery;
841 if !delivery.mode.eq_ignore_ascii_case("announce") {
842 return Ok(());
843 }
844
845 let channel = delivery.channel.as_deref().ok_or_else(|| {
846 ::zeroclaw_log::record!(
847 WARN,
848 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
849 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
850 .with_attrs(::serde_json::json!({"field": "channel"})),
851 "cron delivery announce refused: required field missing"
852 );
853 anyhow::Error::msg("delivery.channel is required for announce mode")
854 })?;
855 let target = delivery.to.as_deref().ok_or_else(|| {
856 ::zeroclaw_log::record!(
857 WARN,
858 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
859 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
860 .with_attrs(::serde_json::json!({"field": "to"})),
861 "cron delivery announce refused: required field missing"
862 );
863 anyhow::Error::msg("delivery.to is required for announce mode")
864 })?;
865
866 deliver_announcement(
867 config,
868 channel,
869 target,
870 delivery.thread_id.as_deref(),
871 output,
872 )
873 .await
874}
875
876pub type DeliveryFn = Box<
880 dyn Fn(
881 Config,
882 String,
883 String,
884 Option<String>,
885 String,
886 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
887 + Send
888 + Sync,
889>;
890
891static DELIVERY_FN: std::sync::OnceLock<DeliveryFn> = std::sync::OnceLock::new();
893
894pub fn register_delivery_fn(f: DeliveryFn) {
896 let _ = DELIVERY_FN.set(f);
897}
898
899pub async fn deliver_announcement(
900 config: &Config,
901 channel: &str,
902 target: &str,
903 thread_id: Option<&str>,
904 output: &str,
905) -> Result<()> {
906 if let Some(f) = DELIVERY_FN.get() {
907 f(
908 config.clone(),
909 channel.to_string(),
910 target.to_string(),
911 thread_id.map(str::to_string),
912 output.to_string(),
913 )
914 .await
915 } else {
916 ::zeroclaw_log::record!(
929 WARN,
930 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
931 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
932 .with_attrs(::serde_json::json!({"channel": channel, "target": target})),
933 "Cron delivery skipped: no delivery handler registered \
934 (register_delivery_fn was not called by the binary)"
935 );
936 Ok(())
937 }
938}
939
940async fn run_job_command(
941 config: &Config,
942 security: &SecurityPolicy,
943 job: &CronJob,
944) -> (bool, String) {
945 run_job_command_with_timeout(
946 config,
947 security,
948 job,
949 Duration::from_secs(SHELL_JOB_TIMEOUT_SECS),
950 )
951 .await
952}
953
954async fn run_job_command_with_timeout(
955 config: &Config,
956 security: &SecurityPolicy,
957 job: &CronJob,
958 timeout: Duration,
959) -> (bool, String) {
960 if !security.can_act() {
961 return (
962 false,
963 "blocked by security policy: autonomy is read-only".to_string(),
964 );
965 }
966
967 if security.is_rate_limited() {
968 return (
969 false,
970 "blocked by security policy: rate limit exceeded".to_string(),
971 );
972 }
973
974 let approved = false; if let Err(error) =
980 crate::cron::validate_shell_command_with_security(security, &job.command, approved)
981 {
982 return (false, error.to_string());
983 }
984
985 if let Some(path) = security.forbidden_path_argument(&job.command) {
986 return (
987 false,
988 format!("blocked by security policy: forbidden path argument: {path}"),
989 );
990 }
991
992 if !security.record_action() {
993 return (
994 false,
995 "blocked by security policy: action budget exhausted".to_string(),
996 );
997 }
998
999 let child = match build_cron_shell_command(&job.command, &config.data_dir) {
1000 Ok(mut cmd) => match cmd.spawn() {
1001 Ok(child) => child,
1002 Err(e) => return (false, format!("spawn error: {e}")),
1003 },
1004 Err(e) => return (false, format!("shell setup error: {e}")),
1005 };
1006
1007 match time::timeout(timeout, child.wait_with_output()).await {
1008 Ok(Ok(output)) => {
1009 let stdout = String::from_utf8_lossy(&output.stdout);
1010 let stderr = String::from_utf8_lossy(&output.stderr);
1011 let combined = format!(
1012 "status={}\nstdout:\n{}\nstderr:\n{}",
1013 output.status,
1014 stdout.trim(),
1015 stderr.trim()
1016 );
1017 (output.status.success(), combined)
1018 }
1019 Ok(Err(e)) => (false, format!("spawn error: {e}")),
1020 Err(_) => (
1021 false,
1022 format!("job timed out after {}s", timeout.as_secs_f64()),
1023 ),
1024 }
1025}
1026
1027fn build_cron_shell_command(
1041 command: &str,
1042 workspace_dir: &std::path::Path,
1043) -> anyhow::Result<Command> {
1044 let mut cmd = Command::new("sh");
1045 cmd.arg("-c")
1046 .arg(command)
1047 .current_dir(workspace_dir)
1048 .stdin(Stdio::null())
1049 .stdout(Stdio::piped())
1050 .stderr(Stdio::piped())
1051 .kill_on_drop(true);
1052
1053 Ok(cmd)
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058 use super::*;
1059 use crate::cron::{self, DeliveryConfig};
1060 use crate::security::SecurityPolicy;
1061 use chrono::{Duration as ChronoDuration, Utc};
1062 use tempfile::TempDir;
1063 use zeroclaw_config::schema::Config;
1064
1065 const TEST_AGENT: &str = "test-agent";
1066
1067 async fn test_config(tmp: &TempDir) -> Config {
1068 let mut config = Config {
1069 data_dir: tmp.path().join("data"),
1070 config_path: tmp.path().join("config.toml"),
1071 ..Config::default()
1072 };
1073 config.risk_profiles.insert(
1074 TEST_AGENT.to_string(),
1075 zeroclaw_config::schema::RiskProfileConfig::default(),
1076 );
1077 config.runtime_profiles.insert(
1078 TEST_AGENT.to_string(),
1079 zeroclaw_config::schema::RuntimeProfileConfig::default(),
1080 );
1081 config.providers.models.openrouter.insert(
1082 TEST_AGENT.to_string(),
1083 zeroclaw_config::schema::OpenRouterModelProviderConfig::default(),
1084 );
1085 config.agents.insert(
1086 TEST_AGENT.to_string(),
1087 zeroclaw_config::schema::AliasedAgentConfig {
1088 model_provider: format!("openrouter.{TEST_AGENT}").into(),
1089 risk_profile: TEST_AGENT.to_string(),
1090 runtime_profile: TEST_AGENT.to_string(),
1091 ..Default::default()
1092 },
1093 );
1094 tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
1095 config
1096 }
1097
1098 fn test_security(config: &Config) -> SecurityPolicy {
1099 SecurityPolicy::for_agent(config, TEST_AGENT).expect("test-agent has resolvable profiles")
1100 }
1101
1102 fn test_job(command: &str) -> CronJob {
1103 CronJob {
1104 id: "test-job".into(),
1105 expression: "* * * * *".into(),
1106 schedule: crate::cron::Schedule::Cron {
1107 expr: "* * * * *".into(),
1108 tz: None,
1109 },
1110 command: command.into(),
1111 prompt: None,
1112 name: None,
1113 job_type: JobType::Shell,
1114 session_target: SessionTarget::Isolated,
1115 model: None,
1116 agent_alias: TEST_AGENT.into(),
1117 enabled: true,
1118 delivery: DeliveryConfig::default(),
1119 delete_after_run: false,
1120 allowed_tools: None,
1121 uses_memory: true,
1122 source: "imperative".into(),
1123 created_at: Utc::now(),
1124 next_run: Utc::now(),
1125 last_run: None,
1126 last_status: None,
1127 last_output: None,
1128 }
1129 }
1130
1131 fn unique_component(prefix: &str) -> String {
1132 format!("{prefix}-{}", uuid::Uuid::new_v4())
1133 }
1134
1135 fn agent_job_with_schedule(schedule: crate::cron::Schedule) -> CronJob {
1136 CronJob {
1137 job_type: JobType::Agent,
1138 schedule,
1139 ..test_job("echo test")
1140 }
1141 }
1142
1143 #[test]
1144 fn high_frequency_daily_cron_is_not_flagged() {
1145 let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1147 expr: "0 6 * * *".into(),
1148 tz: Some("America/Chicago".into()),
1149 });
1150 assert!(!is_high_frequency_agent_job(&job));
1151 }
1152
1153 #[test]
1154 fn high_frequency_every_4min_cron_is_flagged() {
1155 let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1156 expr: "*/4 * * * *".into(),
1157 tz: None,
1158 });
1159 assert!(is_high_frequency_agent_job(&job));
1160 }
1161
1162 #[test]
1163 fn high_frequency_every_5min_cron_is_not_flagged() {
1164 let job = agent_job_with_schedule(crate::cron::Schedule::Cron {
1166 expr: "*/5 * * * *".into(),
1167 tz: None,
1168 });
1169 assert!(!is_high_frequency_agent_job(&job));
1170 }
1171
1172 #[test]
1173 fn high_frequency_every_interval_below_threshold_is_flagged() {
1174 let job = agent_job_with_schedule(crate::cron::Schedule::Every {
1175 every_ms: 4 * 60 * 1000, });
1177 assert!(is_high_frequency_agent_job(&job));
1178 }
1179
1180 #[test]
1181 fn high_frequency_every_interval_at_threshold_is_not_flagged() {
1182 let job = agent_job_with_schedule(crate::cron::Schedule::Every {
1183 every_ms: 5 * 60 * 1000, });
1185 assert!(!is_high_frequency_agent_job(&job));
1186 }
1187
1188 #[test]
1189 fn high_frequency_shell_job_is_never_flagged() {
1190 let job = CronJob {
1192 job_type: JobType::Shell,
1193 schedule: crate::cron::Schedule::Every {
1194 every_ms: 60 * 1000, },
1196 ..test_job("echo test")
1197 };
1198 assert!(!is_high_frequency_agent_job(&job));
1199 }
1200
1201 #[test]
1202 fn cron_agent_run_security_policy_excludes_scheduler_mutation_tools_by_default() {
1203 let security = SecurityPolicy::default();
1204 let mut job = test_job("");
1205 job.job_type = JobType::Agent;
1206 job.allowed_tools = None;
1207
1208 let policy = cron_agent_run_security_policy(&security, &job);
1209
1210 for tool in [
1211 "cron_add",
1212 "cron_update",
1213 "cron_remove",
1214 "cron_run",
1215 "schedule",
1216 ] {
1217 assert!(
1218 !policy.is_tool_allowed(tool),
1219 "{tool} must be excluded from default cron agent runs"
1220 );
1221 }
1222 assert!(
1223 policy.is_tool_allowed("http_request"),
1224 "non-scheduler tools remain available when the base policy is unrestricted"
1225 );
1226 }
1227
1228 #[test]
1229 fn cron_agent_run_security_policy_respects_explicit_allowed_tools() {
1230 let security = SecurityPolicy::default();
1231 let mut job = test_job("");
1232 job.job_type = JobType::Agent;
1233 job.allowed_tools = Some(vec!["cron_add".into()]);
1234
1235 let policy = cron_agent_run_security_policy(&security, &job);
1236
1237 assert!(
1238 policy.is_tool_allowed("cron_add"),
1239 "explicit cron job allowed_tools should remain the override for intentional scheduler automation"
1240 );
1241 }
1242
1243 #[tokio::test]
1244 async fn run_job_command_success() {
1245 let tmp = TempDir::new().unwrap();
1246 let config = test_config(&tmp).await;
1247 let job = test_job("echo scheduler-ok");
1248 let security = test_security(&config);
1249
1250 let (success, output) = run_job_command(&config, &security, &job).await;
1251 assert!(success);
1252 assert!(output.contains("scheduler-ok"));
1253 assert!(output.contains("status=exit status: 0"));
1254 }
1255
1256 #[tokio::test]
1257 async fn run_job_command_failure() {
1258 let tmp = TempDir::new().unwrap();
1259 let config = test_config(&tmp).await;
1260 let job = test_job("ls definitely_missing_file_for_scheduler_test");
1261 let security = test_security(&config);
1262
1263 let (success, output) = run_job_command(&config, &security, &job).await;
1264 assert!(!success);
1265 assert!(output.contains("definitely_missing_file_for_scheduler_test"));
1266 assert!(output.contains("status=exit status:"));
1267 }
1268
1269 #[tokio::test]
1270 async fn run_job_command_times_out() {
1271 let tmp = TempDir::new().unwrap();
1272 let mut config = test_config(&tmp).await;
1273 config
1274 .risk_profiles
1275 .entry(TEST_AGENT.into())
1276 .or_default()
1277 .allowed_commands = vec!["sleep".into()];
1278 let job = test_job("sleep 1");
1279 let security = test_security(&config);
1280
1281 let (success, output) =
1282 run_job_command_with_timeout(&config, &security, &job, Duration::from_millis(50)).await;
1283 assert!(!success);
1284 assert!(output.contains("job timed out after"));
1285 }
1286
1287 #[tokio::test]
1288 async fn run_job_command_blocks_disallowed_command() {
1289 let tmp = TempDir::new().unwrap();
1290 let mut config = test_config(&tmp).await;
1291 config
1292 .risk_profiles
1293 .entry(TEST_AGENT.into())
1294 .or_default()
1295 .allowed_commands = vec!["echo".into()];
1296 let job = test_job("curl https://evil.example");
1297 let security = test_security(&config);
1298
1299 let (success, output) = run_job_command(&config, &security, &job).await;
1300 assert!(!success);
1301 assert!(output.contains("blocked by security policy"));
1302 assert!(output.to_lowercase().contains("not allowed"));
1303 }
1304
1305 #[tokio::test]
1306 async fn run_job_command_blocks_forbidden_path_argument() {
1307 let tmp = TempDir::new().unwrap();
1308 let mut config = test_config(&tmp).await;
1309 config
1310 .risk_profiles
1311 .entry(TEST_AGENT.into())
1312 .or_default()
1313 .allowed_commands = vec!["cat".into()];
1314 let job = test_job("cat /etc/passwd");
1315 let security = test_security(&config);
1316
1317 let (success, output) = run_job_command(&config, &security, &job).await;
1318 assert!(!success);
1319 assert!(output.contains("blocked by security policy"));
1320 assert!(output.contains("forbidden path argument"));
1321 assert!(output.contains("/etc/passwd"));
1322 }
1323
1324 #[tokio::test]
1325 async fn run_job_command_blocks_forbidden_option_assignment_path_argument() {
1326 let tmp = TempDir::new().unwrap();
1327 let mut config = test_config(&tmp).await;
1328 config
1329 .risk_profiles
1330 .entry(TEST_AGENT.into())
1331 .or_default()
1332 .allowed_commands = vec!["grep".into()];
1333 let job = test_job("grep --file=/etc/passwd root ./src");
1334 let security = test_security(&config);
1335
1336 let (success, output) = run_job_command(&config, &security, &job).await;
1337 assert!(!success);
1338 assert!(output.contains("blocked by security policy"));
1339 assert!(output.contains("forbidden path argument"));
1340 assert!(output.contains("/etc/passwd"));
1341 }
1342
1343 #[tokio::test]
1344 async fn run_job_command_blocks_forbidden_short_option_attached_path_argument() {
1345 let tmp = TempDir::new().unwrap();
1346 let mut config = test_config(&tmp).await;
1347 config
1348 .risk_profiles
1349 .entry(TEST_AGENT.into())
1350 .or_default()
1351 .allowed_commands = vec!["grep".into()];
1352 let job = test_job("grep -f/etc/passwd root ./src");
1353 let security = test_security(&config);
1354
1355 let (success, output) = run_job_command(&config, &security, &job).await;
1356 assert!(!success);
1357 assert!(output.contains("blocked by security policy"));
1358 assert!(output.contains("forbidden path argument"));
1359 assert!(output.contains("/etc/passwd"));
1360 }
1361
1362 #[tokio::test]
1363 async fn run_job_command_blocks_tilde_user_path_argument() {
1364 let tmp = TempDir::new().unwrap();
1365 let mut config = test_config(&tmp).await;
1366 config
1367 .risk_profiles
1368 .entry(TEST_AGENT.into())
1369 .or_default()
1370 .allowed_commands = vec!["cat".into()];
1371 let job = test_job("cat ~root/.ssh/id_rsa");
1372 let security = test_security(&config);
1373
1374 let (success, output) = run_job_command(&config, &security, &job).await;
1375 assert!(!success);
1376 assert!(output.contains("blocked by security policy"));
1377 assert!(output.contains("forbidden path argument"));
1378 assert!(output.contains("~root/.ssh/id_rsa"));
1379 }
1380
1381 #[tokio::test]
1382 async fn run_job_command_blocks_input_redirection_path_bypass() {
1383 let tmp = TempDir::new().unwrap();
1384 let mut config = test_config(&tmp).await;
1385 config
1386 .risk_profiles
1387 .entry(TEST_AGENT.into())
1388 .or_default()
1389 .allowed_commands = vec!["cat".into()];
1390 let job = test_job("cat </etc/passwd");
1391 let security = test_security(&config);
1392
1393 let (success, output) = run_job_command(&config, &security, &job).await;
1394 assert!(!success);
1395 assert!(output.contains("blocked by security policy"));
1396 assert!(output.to_lowercase().contains("not allowed"));
1397 }
1398
1399 #[tokio::test]
1400 async fn run_job_command_blocks_readonly_mode() {
1401 let tmp = TempDir::new().unwrap();
1402 let mut config = test_config(&tmp).await;
1403 config
1404 .risk_profiles
1405 .entry(TEST_AGENT.into())
1406 .or_default()
1407 .level = crate::security::AutonomyLevel::ReadOnly;
1408 let job = test_job("echo should-not-run");
1409 let security = test_security(&config);
1410
1411 let (success, output) = run_job_command(&config, &security, &job).await;
1412 assert!(!success);
1413 assert!(output.contains("blocked by security policy"));
1414 assert!(output.contains("read-only"));
1415 }
1416
1417 #[tokio::test]
1418 async fn run_job_command_blocks_rate_limited() {
1419 let tmp = TempDir::new().unwrap();
1420 let mut config = test_config(&tmp).await;
1421 config
1422 .runtime_profiles
1423 .entry(TEST_AGENT.into())
1424 .or_default()
1425 .max_actions_per_hour = 0;
1426 let job = test_job("echo should-not-run");
1427 let security = test_security(&config);
1428
1429 let (success, output) = run_job_command(&config, &security, &job).await;
1430 assert!(!success);
1431 assert!(output.contains("blocked by security policy"));
1432 assert!(output.contains("rate limit exceeded"));
1433 }
1434
1435 #[tokio::test]
1436 async fn execute_job_with_retry_recovers_after_first_failure() {
1437 let tmp = TempDir::new().unwrap();
1438 let mut config = test_config(&tmp).await;
1439 config.reliability.scheduler_retries = 1;
1440 config.reliability.provider_backoff_ms = 1;
1441 config
1442 .risk_profiles
1443 .entry(TEST_AGENT.into())
1444 .or_default()
1445 .allowed_commands = vec!["sh".into()];
1446 let security = test_security(&config);
1447
1448 tokio::fs::write(
1449 config.data_dir.join("retry-once.sh"),
1450 "#!/bin/sh\nif [ -f retry-ok.flag ]; then\n echo recovered\n exit 0\nfi\ntouch retry-ok.flag\nexit 1\n",
1451 )
1452 .await
1453 .unwrap();
1454 let job = test_job("sh ./retry-once.sh");
1455
1456 let (success, output) = Box::pin(execute_job_with_retry(
1457 &config,
1458 &security,
1459 "test-agent",
1460 &job,
1461 ))
1462 .await;
1463 assert!(success);
1464 assert!(output.contains("recovered"));
1465 }
1466
1467 #[tokio::test]
1468 async fn execute_job_with_retry_exhausts_attempts() {
1469 let tmp = TempDir::new().unwrap();
1470 let mut config = test_config(&tmp).await;
1471 config.reliability.scheduler_retries = 1;
1472 config.reliability.provider_backoff_ms = 1;
1473 let security = test_security(&config);
1474
1475 let job = test_job("ls always_missing_for_retry_test");
1476
1477 let (success, output) = Box::pin(execute_job_with_retry(
1478 &config,
1479 &security,
1480 "test-agent",
1481 &job,
1482 ))
1483 .await;
1484 assert!(!success);
1485 assert!(output.contains("always_missing_for_retry_test"));
1486 }
1487
1488 #[tokio::test]
1489 async fn run_agent_job_returns_error_without_provider_key() {
1490 let tmp = TempDir::new().unwrap();
1491 let config = test_config(&tmp).await;
1492 let mut job = test_job("");
1493 job.job_type = JobType::Agent;
1494 job.prompt = Some("Say hello".into());
1495 let security = test_security(&config);
1496
1497 let (success, output) =
1498 Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1499 assert!(!success);
1500 assert!(output.contains("agent job failed:"));
1501 }
1502
1503 #[tokio::test]
1504 async fn run_agent_job_blocks_readonly_mode() {
1505 let tmp = TempDir::new().unwrap();
1506 let mut config = test_config(&tmp).await;
1507 config
1508 .risk_profiles
1509 .entry(TEST_AGENT.into())
1510 .or_default()
1511 .level = crate::security::AutonomyLevel::ReadOnly;
1512 let mut job = test_job("");
1513 job.job_type = JobType::Agent;
1514 job.prompt = Some("Say hello".into());
1515 let security = test_security(&config);
1516
1517 let (success, output) =
1518 Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1519 assert!(!success);
1520 assert!(output.contains("blocked by security policy"));
1521 assert!(output.contains("read-only"));
1522 }
1523
1524 #[tokio::test]
1525 async fn run_agent_job_blocks_rate_limited() {
1526 let tmp = TempDir::new().unwrap();
1527 let mut config = test_config(&tmp).await;
1528 config
1529 .runtime_profiles
1530 .entry(TEST_AGENT.into())
1531 .or_default()
1532 .max_actions_per_hour = 0;
1533 let mut job = test_job("");
1534 job.job_type = JobType::Agent;
1535 job.prompt = Some("Say hello".into());
1536 let security = test_security(&config);
1537
1538 let (success, output) =
1539 Box::pin(run_agent_job(&config, &security, "test-agent", &job)).await;
1540 assert!(!success);
1541 assert!(output.contains("blocked by security policy"));
1542 assert!(output.contains("rate limit exceeded"));
1543 }
1544
1545 #[tokio::test]
1546 async fn process_due_jobs_marks_component_ok_even_when_idle() {
1547 let tmp = TempDir::new().unwrap();
1548 let config = test_config(&tmp).await;
1549 let component = unique_component("scheduler-idle");
1550
1551 crate::health::mark_component_error(&component, "pre-existing error");
1552 process_due_jobs(&config, Vec::new(), &component, &None).await;
1553
1554 let snapshot = crate::health::snapshot_json();
1555 let entry = &snapshot["components"][component.as_str()];
1556 assert_eq!(entry["status"], "ok");
1557 assert!(entry["last_ok"].as_str().is_some());
1558 assert!(entry["last_error"].is_null());
1559 }
1560
1561 #[tokio::test]
1562 async fn process_due_jobs_failure_does_not_mark_component_unhealthy() {
1563 let tmp = TempDir::new().unwrap();
1564 let config = test_config(&tmp).await;
1565 let job = test_job("ls definitely_missing_file_for_scheduler_component_health_test");
1566 let component = unique_component("scheduler-fail");
1567
1568 crate::health::mark_component_ok(&component);
1569 process_due_jobs(&config, vec![job], &component, &None).await;
1570
1571 let snapshot = crate::health::snapshot_json();
1572 let entry = &snapshot["components"][component.as_str()];
1573 assert_eq!(entry["status"], "ok");
1574 }
1575
1576 #[tokio::test]
1577 async fn persist_job_result_records_run_and_reschedules_shell_job() {
1578 let tmp = TempDir::new().unwrap();
1579 let config = test_config(&tmp).await;
1580 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1581 let started = Utc::now();
1582 let finished = started + ChronoDuration::milliseconds(10);
1583
1584 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1585 assert!(success);
1586
1587 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1588 assert_eq!(runs.len(), 1);
1589 let updated = cron::get_job(&config, &job.id).unwrap();
1590 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1591 }
1592
1593 #[tokio::test]
1594 async fn persist_job_result_uses_one_write_connection_for_recurring_job() {
1595 let tmp = TempDir::new().unwrap();
1596 let config = test_config(&tmp).await;
1597 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1598 let started = Utc::now();
1599 let finished = started + ChronoDuration::milliseconds(10);
1600
1601 crate::cron::store::reset_write_connection_count_for_tests(&config);
1602 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1603
1604 assert!(success);
1605 assert_eq!(
1606 crate::cron::store::write_connection_count_for_tests(&config),
1607 1
1608 );
1609 }
1610
1611 #[tokio::test]
1612 async fn persist_job_result_prunes_run_history_and_updates_last_fields() {
1613 let tmp = TempDir::new().unwrap();
1614 let mut config = test_config(&tmp).await;
1615 config.scheduler.max_run_history = 2;
1616 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1617 let base = Utc::now();
1618
1619 for idx in 0..3 {
1620 let started = base + ChronoDuration::seconds(idx);
1621 let finished = started + ChronoDuration::milliseconds(10);
1622 let output = format!("run-{idx}");
1623
1624 let success = persist_job_result(&config, &job, true, &output, started, finished).await;
1625 assert!(success);
1626 }
1627
1628 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1629 assert_eq!(runs.len(), 2);
1630 assert_eq!(runs[0].output.as_deref(), Some("run-2"));
1631 assert_eq!(runs[1].output.as_deref(), Some("run-1"));
1632
1633 let updated = cron::get_job(&config, &job.id).unwrap();
1634 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1635 assert_eq!(updated.last_output.as_deref(), Some("run-2"));
1636 assert!(updated.last_run.is_some());
1637 }
1638
1639 #[tokio::test]
1640 async fn persist_job_result_rolls_back_run_history_when_job_state_update_fails() {
1641 let tmp = TempDir::new().unwrap();
1642 let config = test_config(&tmp).await;
1643 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1644 let original_next_run = job.next_run;
1645 let started = Utc::now();
1646 let finished = started + ChronoDuration::milliseconds(10);
1647
1648 let conn =
1649 rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1650 conn.execute_batch(
1651 "CREATE TRIGGER fail_cron_job_update
1652 BEFORE UPDATE ON cron_jobs
1653 BEGIN
1654 SELECT RAISE(ABORT, 'blocked update');
1655 END;",
1656 )
1657 .unwrap();
1658 drop(conn);
1659
1660 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1661
1662 assert!(success);
1663 assert!(cron::list_runs(&config, &job.id, 10).unwrap().is_empty());
1664
1665 let stored = cron::get_job(&config, &job.id).unwrap();
1666 assert_eq!(stored.next_run, original_next_run);
1667 assert!(stored.last_run.is_none());
1668 assert!(stored.last_status.is_none());
1669 assert!(stored.last_output.is_none());
1670 }
1671
1672 #[tokio::test]
1673 async fn persist_job_result_success_deletes_one_shot() {
1674 let tmp = TempDir::new().unwrap();
1675 let config = test_config(&tmp).await;
1676 let at = Utc::now() + ChronoDuration::minutes(10);
1677 let job = cron::add_agent_job(
1678 &config,
1679 TEST_AGENT,
1680 Some("one-shot".into()),
1681 crate::cron::Schedule::At { at },
1682 "Hello",
1683 SessionTarget::Isolated,
1684 None,
1685 None,
1686 true,
1687 None,
1688 )
1689 .unwrap();
1690 let started = Utc::now();
1691 let finished = started + ChronoDuration::milliseconds(10);
1692
1693 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1694 assert!(success);
1695 let lookup = cron::get_job(&config, &job.id);
1696 assert!(lookup.is_err());
1697 }
1698
1699 #[tokio::test]
1700 async fn persist_job_result_failure_disables_one_shot() {
1701 let tmp = TempDir::new().unwrap();
1702 let config = test_config(&tmp).await;
1703 let at = Utc::now() + ChronoDuration::minutes(10);
1704 let job = cron::add_agent_job(
1705 &config,
1706 TEST_AGENT,
1707 Some("one-shot".into()),
1708 crate::cron::Schedule::At { at },
1709 "Hello",
1710 SessionTarget::Isolated,
1711 None,
1712 None,
1713 true,
1714 None,
1715 )
1716 .unwrap();
1717 let started = Utc::now();
1718 let finished = started + ChronoDuration::milliseconds(10);
1719
1720 let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1721 assert!(!success);
1722 let updated = cron::get_job(&config, &job.id).unwrap();
1723 assert!(!updated.enabled);
1724 assert_eq!(updated.last_status.as_deref(), Some("error"));
1725 }
1726
1727 #[tokio::test]
1728 async fn persist_job_result_uses_one_write_connection_for_failed_one_shot_disable() {
1729 let tmp = TempDir::new().unwrap();
1730 let config = test_config(&tmp).await;
1731 let at = Utc::now() + ChronoDuration::minutes(10);
1732 let job = cron::add_agent_job(
1733 &config,
1734 "test-agent",
1735 Some("one-shot".into()),
1736 crate::cron::Schedule::At { at },
1737 "Hello",
1738 SessionTarget::Isolated,
1739 None,
1740 None,
1741 true,
1742 None,
1743 )
1744 .unwrap();
1745 let started = Utc::now();
1746 let finished = started + ChronoDuration::milliseconds(10);
1747
1748 crate::cron::store::reset_write_connection_count_for_tests(&config);
1749 let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1750
1751 assert!(!success);
1752 assert_eq!(
1753 crate::cron::store::write_connection_count_for_tests(&config),
1754 1
1755 );
1756 }
1757
1758 #[tokio::test]
1759 async fn persist_job_result_falls_back_to_state_update_when_history_prune_fails() {
1760 let tmp = TempDir::new().unwrap();
1761 let mut config = test_config(&tmp).await;
1762 config.scheduler.max_run_history = 1;
1763 let job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1764 let original_next_run = job.next_run;
1765 let seed_started = Utc::now() - ChronoDuration::minutes(20);
1766 let seed_finished = seed_started + ChronoDuration::milliseconds(10);
1767 let started = Utc::now();
1768 let finished = started + ChronoDuration::milliseconds(10);
1769
1770 let conn =
1771 rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1772 conn.execute(
1773 "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
1774 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1775 rusqlite::params![
1776 job.id,
1777 seed_started.to_rfc3339(),
1778 seed_finished.to_rfc3339(),
1779 "seed",
1780 "seed",
1781 10,
1782 ],
1783 )
1784 .unwrap();
1785 conn.execute_batch(
1786 "CREATE TRIGGER fail_cron_run_prune
1787 BEFORE DELETE ON cron_runs
1788 BEGIN
1789 SELECT RAISE(ABORT, 'blocked prune');
1790 END;",
1791 )
1792 .unwrap();
1793 drop(conn);
1794
1795 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1796 assert!(success);
1797
1798 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1799 assert_eq!(runs.len(), 1);
1800 assert_eq!(runs[0].status, "seed");
1801
1802 let updated = cron::get_job(&config, &job.id).unwrap();
1803 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1804 assert_eq!(updated.last_output.as_deref(), Some("ok"));
1805 assert!(updated.last_run.is_some());
1806 assert!(updated.next_run >= original_next_run);
1807 }
1808
1809 #[tokio::test]
1810 async fn persist_job_result_falls_back_to_disable_when_auto_delete_history_insert_fails() {
1811 let tmp = TempDir::new().unwrap();
1812 let config = test_config(&tmp).await;
1813 let at = Utc::now() + ChronoDuration::minutes(10);
1814 let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1815 assert!(job.delete_after_run);
1816 let started = Utc::now();
1817 let finished = started + ChronoDuration::milliseconds(10);
1818
1819 let conn =
1820 rusqlite::Connection::open(config.data_dir.join("cron").join("jobs.db")).unwrap();
1821 conn.execute_batch(
1822 "CREATE TRIGGER fail_cron_run_insert
1823 BEFORE INSERT ON cron_runs
1824 BEGIN
1825 SELECT RAISE(ABORT, 'blocked insert');
1826 END;",
1827 )
1828 .unwrap();
1829 drop(conn);
1830
1831 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1832 assert!(success);
1833
1834 let updated = cron::get_job(&config, &job.id).unwrap();
1835 assert!(!updated.enabled);
1836 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1837 assert_eq!(updated.last_output.as_deref(), Some("ok"));
1838 assert!(cron::list_runs(&config, &job.id, 10).unwrap().is_empty());
1839 }
1840
1841 #[tokio::test]
1842 async fn persist_job_result_success_deletes_one_shot_shell_job() {
1843 let tmp = TempDir::new().unwrap();
1844 let config = test_config(&tmp).await;
1845 let at = Utc::now() + ChronoDuration::minutes(10);
1846 let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1847 assert!(job.delete_after_run);
1848 let started = Utc::now();
1849 let finished = started + ChronoDuration::milliseconds(10);
1850
1851 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1852 assert!(success);
1853 let lookup = cron::get_job(&config, &job.id);
1854 assert!(lookup.is_err());
1855 }
1856
1857 #[tokio::test]
1858 async fn persist_job_result_failure_disables_one_shot_shell_job() {
1859 let tmp = TempDir::new().unwrap();
1860 let config = test_config(&tmp).await;
1861 let at = Utc::now() + ChronoDuration::minutes(10);
1862 let job = cron::add_once_at(&config, "test-agent", at, "echo one-shot-shell").unwrap();
1863 assert!(job.delete_after_run);
1864 let started = Utc::now();
1865 let finished = started + ChronoDuration::milliseconds(10);
1866
1867 let success = persist_job_result(&config, &job, false, "boom", started, finished).await;
1868 assert!(!success);
1869 let updated = cron::get_job(&config, &job.id).unwrap();
1870 assert!(!updated.enabled);
1871 assert_eq!(updated.last_status.as_deref(), Some("error"));
1872 }
1873
1874 #[tokio::test]
1875 async fn persist_job_result_delivery_stubbed_succeeds() {
1876 let tmp = TempDir::new().unwrap();
1879 let config = test_config(&tmp).await;
1880 let job = cron::add_agent_job(
1881 &config,
1882 TEST_AGENT,
1883 Some("announce-job".into()),
1884 crate::cron::Schedule::Cron {
1885 expr: "*/5 * * * *".into(),
1886 tz: None,
1887 },
1888 "deliver this",
1889 SessionTarget::Isolated,
1890 None,
1891 Some(DeliveryConfig {
1892 mode: "announce".into(),
1893 channel: Some("telegram".into()),
1894 to: Some("123456".into()),
1895 thread_id: None,
1896 best_effort: false,
1897 }),
1898 false,
1899 None,
1900 )
1901 .unwrap();
1902 let started = Utc::now();
1903 let finished = started + ChronoDuration::milliseconds(10);
1904
1905 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1906 assert!(success);
1907
1908 let updated = cron::get_job(&config, &job.id).unwrap();
1909 assert!(updated.enabled);
1910 assert_eq!(updated.last_status.as_deref(), Some("ok"));
1911
1912 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1913 assert_eq!(runs.len(), 1);
1914 assert_eq!(runs[0].status, "ok");
1915 }
1916
1917 #[tokio::test]
1918 async fn persist_job_result_delivery_failure_best_effort_marks_degraded() {
1919 let tmp = TempDir::new().unwrap();
1920 let config = test_config(&tmp).await;
1921 register_delivery_fn(Box::new(
1922 |_config, channel, _target, _thread_id, _output| {
1923 Box::pin(async move {
1924 if channel == "fail-delivery" {
1925 anyhow::bail!("synthetic delivery failure");
1926 }
1927 Ok(())
1928 })
1929 },
1930 ));
1931 let mut job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1932 job.delivery = DeliveryConfig {
1933 mode: "announce".into(),
1934 channel: Some("fail-delivery".into()),
1935 to: Some("123456".into()),
1936 thread_id: None,
1937 best_effort: true,
1938 };
1939 let started = Utc::now();
1940 let finished = started + ChronoDuration::milliseconds(10);
1941
1942 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
1943 assert!(success);
1944
1945 let updated = cron::get_job(&config, &job.id).unwrap();
1946 assert!(updated.enabled);
1947 assert_eq!(updated.last_status.as_deref(), Some("degraded"));
1948 assert!(
1949 updated
1950 .last_output
1951 .as_deref()
1952 .unwrap_or_default()
1953 .contains("delivery failed:")
1954 );
1955
1956 let runs = cron::list_runs(&config, &job.id, 10).unwrap();
1957 assert_eq!(runs.len(), 1);
1958 assert_eq!(runs[0].status, "degraded");
1959 }
1960
1961 #[tokio::test]
1962 async fn delivery_failure_classification_preserves_empty_output_evidence() {
1963 let tmp = TempDir::new().unwrap();
1964 let config = test_config(&tmp).await;
1965 register_delivery_fn(Box::new(
1966 |_config, channel, _target, _thread_id, _output| {
1967 Box::pin(async move {
1968 if channel == "fail-delivery" {
1969 anyhow::bail!("synthetic delivery failure");
1970 }
1971 Ok(())
1972 })
1973 },
1974 ));
1975 let mut job = cron::add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1976 job.delivery = DeliveryConfig {
1977 mode: "announce".into(),
1978 channel: Some("fail-delivery".into()),
1979 to: Some("123456".into()),
1980 thread_id: None,
1981 best_effort: true,
1982 };
1983
1984 let outcome = deliver_and_classify_run_result(
1985 &config,
1986 &job,
1987 true,
1988 String::new(),
1989 CronDeliveryContext::Scheduled,
1990 )
1991 .await;
1992
1993 assert!(outcome.success);
1994 assert_eq!(outcome.status, "degraded");
1995 assert!(outcome.output.starts_with("delivery failed:"));
1996 }
1997
1998 #[tokio::test]
1999 async fn persist_job_result_at_schedule_without_delete_after_run_is_disabled() {
2000 let tmp = TempDir::new().unwrap();
2001 let config = test_config(&tmp).await;
2002 let at = Utc::now() + ChronoDuration::minutes(10);
2003 let job = cron::add_agent_job(
2004 &config,
2005 TEST_AGENT,
2006 Some("at-no-autodelete".into()),
2007 crate::cron::Schedule::At { at },
2008 "Hello",
2009 SessionTarget::Isolated,
2010 None,
2011 None,
2012 false,
2013 None,
2014 )
2015 .unwrap();
2016 assert!(!job.delete_after_run);
2017
2018 let started = Utc::now();
2019 let finished = started + ChronoDuration::milliseconds(10);
2020 let success = persist_job_result(&config, &job, true, "ok", started, finished).await;
2021 assert!(success);
2022
2023 let updated = cron::get_job(&config, &job.id).unwrap();
2026 assert!(
2027 !updated.enabled,
2028 "At schedule job should be disabled after execution via reschedule"
2029 );
2030 assert_eq!(updated.last_status.as_deref(), Some("ok"));
2031 }
2032
2033 #[tokio::test]
2034 async fn deliver_if_configured_handles_none_mode() {
2035 let tmp = TempDir::new().unwrap();
2036 let config = test_config(&tmp).await;
2037 let job = test_job("echo ok");
2038
2039 assert!(deliver_if_configured(&config, &job, "x").await.is_ok());
2041 }
2042
2043 #[tokio::test]
2044 async fn deliver_announcement_returns_ok_when_no_handler_registered() {
2045 let tmp = TempDir::new().unwrap();
2046 let config = test_config(&tmp).await;
2047 deliver_announcement(&config, "telegram", "chat-id", None, "payload")
2052 .await
2053 .expect("missing delivery handler should be Ok with a warn log");
2054 }
2055
2056 #[test]
2057 fn build_cron_shell_command_uses_sh_non_login() {
2058 let workspace = std::env::temp_dir();
2059 let cmd = build_cron_shell_command("echo cron-test", &workspace).unwrap();
2060 let debug = format!("{cmd:?}");
2061 assert!(debug.contains("echo cron-test"));
2062 assert!(debug.contains("\"sh\""), "should use sh: {debug}");
2063 assert!(
2066 !debug.contains("\"-lc\""),
2067 "must not use login shell: {debug}"
2068 );
2069 }
2070
2071 #[tokio::test]
2072 async fn build_cron_shell_command_executes_successfully() {
2073 let workspace = std::env::temp_dir();
2074 let mut cmd = build_cron_shell_command("echo cron-ok", &workspace).unwrap();
2075 let output = cmd.output().await.unwrap();
2076 assert!(output.status.success());
2077 let stdout = String::from_utf8_lossy(&output.stdout);
2078 assert!(stdout.contains("cron-ok"));
2079 }
2080
2081 #[tokio::test]
2082 async fn catch_up_queries_all_overdue_jobs_ignoring_max_tasks() {
2083 let tmp = TempDir::new().unwrap();
2084 let mut config = test_config(&tmp).await;
2085 config.scheduler.max_tasks = 1; for i in 0..3 {
2089 let _ = cron::add_job(
2090 &config,
2091 "test-agent",
2092 "* * * * *",
2093 &format!("echo catchup-{i}"),
2094 )
2095 .unwrap();
2096 }
2097
2098 let far_future = Utc::now() + ChronoDuration::days(1);
2100 let due = cron::due_jobs(&config, far_future).unwrap();
2101 assert_eq!(due.len(), 1, "due_jobs must respect max_tasks");
2102
2103 let overdue = cron::all_overdue_jobs(&config, far_future).unwrap();
2105 assert_eq!(overdue.len(), 3, "all_overdue_jobs must return all");
2106 }
2107
2108 #[tokio::test]
2113 async fn broadcast_sends_cron_result_on_success() {
2114 let tmp = TempDir::new().unwrap();
2115 let mut config = test_config(&tmp).await;
2116 let job = test_job("echo broadcast-ok");
2117 config
2120 .agents
2121 .get_mut("test-agent")
2122 .unwrap()
2123 .cron_jobs
2124 .push(job.id.clone());
2125 let component = unique_component("broadcast-ok");
2126
2127 let (tx, mut rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2128 let event_tx: EventBroadcast = Some(tx);
2129
2130 process_due_jobs(&config, vec![job], &component, &event_tx).await;
2131
2132 let event = rx.try_recv().expect("should receive a broadcast event");
2133 assert_eq!(event["type"], "cron_result");
2134 assert_eq!(event["job_id"], "test-job");
2135 assert_eq!(event["success"], true);
2136 assert!(event["output"].as_str().unwrap().contains("broadcast-ok"));
2137 assert!(event["timestamp"].as_str().is_some());
2138 }
2139
2140 #[tokio::test]
2141 async fn broadcast_sends_cron_result_on_failure() {
2142 let tmp = TempDir::new().unwrap();
2143 let mut config = test_config(&tmp).await;
2144 let job = test_job("ls definitely_missing_file_for_broadcast_fail_test");
2145 config
2146 .agents
2147 .get_mut("test-agent")
2148 .unwrap()
2149 .cron_jobs
2150 .push(job.id.clone());
2151 let component = unique_component("broadcast-fail");
2152
2153 let (tx, mut rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2154 let event_tx: EventBroadcast = Some(tx);
2155
2156 process_due_jobs(&config, vec![job], &component, &event_tx).await;
2157
2158 let event = rx.try_recv().expect("should receive a broadcast event");
2159 assert_eq!(event["type"], "cron_result");
2160 assert_eq!(event["job_id"], "test-job");
2161 assert_eq!(event["success"], false);
2162 assert!(event["timestamp"].as_str().is_some());
2163 }
2164
2165 #[tokio::test]
2166 async fn broadcast_none_skips_without_error() {
2167 let tmp = TempDir::new().unwrap();
2168 let config = test_config(&tmp).await;
2169 let job = test_job("echo no-broadcast");
2170 let component = unique_component("broadcast-none");
2171
2172 process_due_jobs(&config, vec![job], &component, &None).await;
2174 }
2175
2176 #[tokio::test]
2177 async fn broadcast_handles_no_subscribers() {
2178 let tmp = TempDir::new().unwrap();
2179 let config = test_config(&tmp).await;
2180 let job = test_job("echo no-subscribers");
2181 let component = unique_component("broadcast-no-sub");
2182
2183 let (tx, _) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
2184 let event_tx: EventBroadcast = Some(tx);
2187
2188 process_due_jobs(&config, vec![job], &component, &event_tx).await;
2189 }
2191}