1use crate::security::SecurityPolicy;
2use anyhow::{Result, bail};
3use zeroclaw_config::schema::Config;
4
5mod schedule;
6mod store;
7mod types;
8
9pub mod scheduler;
10
11#[allow(unused_imports)]
12pub use schedule::{
13 next_run_for_schedule, normalize_expression, schedule_cron_expression, validate_schedule,
14};
15#[allow(unused_imports)]
16pub use store::{
17 add_agent_job, all_overdue_jobs, due_jobs, get_job, list_jobs, list_runs, record_last_run,
18 record_last_run_with_status, record_run, remove_job, reschedule_after_run,
19 reschedule_after_run_with_status, sync_declarative_jobs, update_job,
20};
21pub use types::{
22 CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
23 deserialize_maybe_stringified,
24};
25
26pub fn validate_shell_command(
31 config: &Config,
32 agent_alias: &str,
33 command: &str,
34 approved: bool,
35) -> Result<()> {
36 let security = SecurityPolicy::for_agent(config, agent_alias)?;
37 validate_shell_command_with_security(&security, command, approved)
38}
39
40pub fn validate_shell_command_with_security(
44 security: &SecurityPolicy,
45 command: &str,
46 approved: bool,
47) -> Result<()> {
48 security
49 .validate_command_execution(command, approved)
50 .map(|_| ())
51 .map_err(|reason| {
52 ::zeroclaw_log::record!(
53 WARN,
54 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
55 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
56 .with_attrs(::serde_json::json!({"reason": reason.to_string()})),
57 "cron shell command rejected by security policy"
58 );
59 anyhow::Error::msg(format!("blocked by security policy: {reason}"))
60 })
61}
62
63pub fn validate_delivery_config(delivery: Option<&DeliveryConfig>) -> Result<()> {
64 let Some(delivery) = delivery else {
65 return Ok(());
66 };
67
68 if delivery.mode.eq_ignore_ascii_case("none") {
69 return Ok(());
70 }
71 if !delivery.mode.eq_ignore_ascii_case("announce") {
72 bail!("unsupported delivery mode: {}", delivery.mode);
73 }
74
75 let channel = delivery.channel.as_deref().map(str::trim);
82 if channel.filter(|value| !value.is_empty()).is_none() {
83 bail!("delivery.channel is required for announce mode");
84 }
85
86 let has_target = delivery
87 .to
88 .as_deref()
89 .map(str::trim)
90 .is_some_and(|value| !value.is_empty());
91 if !has_target {
92 bail!("delivery.to is required for announce mode");
93 }
94
95 Ok(())
96}
97
98pub fn add_shell_job_with_approval(
105 config: &Config,
106 agent_alias: &str,
107 name: Option<String>,
108 schedule: Schedule,
109 command: &str,
110 delivery: Option<DeliveryConfig>,
111 approved: bool,
112) -> Result<CronJob> {
113 validate_shell_command(config, agent_alias, command, approved)?;
114 validate_delivery_config(delivery.as_ref())?;
115 store::add_shell_job(config, agent_alias, name, schedule, command, delivery)
116}
117
118pub fn update_shell_job_with_approval(
123 config: &Config,
124 agent_alias: &str,
125 job_id: &str,
126 patch: CronJobPatch,
127 approved: bool,
128) -> Result<CronJob> {
129 if let Some(command) = patch.command.as_deref() {
130 validate_shell_command(config, agent_alias, command, approved)?;
131 }
132 update_job(config, job_id, patch)
133}
134
135pub fn add_once_validated(
137 config: &Config,
138 agent_alias: &str,
139 delay: &str,
140 command: &str,
141 approved: bool,
142) -> Result<CronJob> {
143 let duration = parse_delay(delay)?;
144 let at = chrono::Utc::now() + duration;
145 add_once_at_validated(config, agent_alias, at, command, approved)
146}
147
148pub fn add_once_at_validated(
150 config: &Config,
151 agent_alias: &str,
152 at: chrono::DateTime<chrono::Utc>,
153 command: &str,
154 approved: bool,
155) -> Result<CronJob> {
156 let schedule = Schedule::At { at };
157 add_shell_job_with_approval(config, agent_alias, None, schedule, command, None, approved)
158}
159
160pub fn add_shell_job(
163 config: &Config,
164 agent_alias: &str,
165 name: Option<String>,
166 schedule: Schedule,
167 command: &str,
168) -> Result<CronJob> {
169 add_shell_job_with_approval(config, agent_alias, name, schedule, command, None, false)
170}
171
172pub fn add_job(
173 config: &Config,
174 agent_alias: &str,
175 expression: &str,
176 command: &str,
177) -> Result<CronJob> {
178 let schedule = Schedule::Cron {
179 expr: expression.to_string(),
180 tz: None,
181 };
182 add_shell_job(config, agent_alias, None, schedule, command)
183}
184
185#[allow(clippy::needless_pass_by_value)]
186pub fn add_once(config: &Config, agent_alias: &str, delay: &str, command: &str) -> Result<CronJob> {
187 add_once_validated(config, agent_alias, delay, command, false)
188}
189
190pub fn add_once_at(
191 config: &Config,
192 agent_alias: &str,
193 at: chrono::DateTime<chrono::Utc>,
194 command: &str,
195) -> Result<CronJob> {
196 add_once_at_validated(config, agent_alias, at, command, false)
197}
198
199pub fn pause_job(config: &Config, id: &str) -> Result<CronJob> {
200 update_job(
201 config,
202 id,
203 CronJobPatch {
204 enabled: Some(false),
205 ..CronJobPatch::default()
206 },
207 )
208}
209
210pub fn resume_job(config: &Config, id: &str) -> Result<CronJob> {
211 update_job(
212 config,
213 id,
214 CronJobPatch {
215 enabled: Some(true),
216 ..CronJobPatch::default()
217 },
218 )
219}
220
221pub fn parse_delay(input: &str) -> Result<chrono::Duration> {
222 let input = input.trim();
223 if input.is_empty() {
224 anyhow::bail!("delay must not be empty");
225 }
226 let split = input
227 .find(|c: char| !c.is_ascii_digit())
228 .unwrap_or(input.len());
229 let (num, unit) = input.split_at(split);
230 let amount: i64 = num.parse()?;
231 let unit = if unit.is_empty() { "m" } else { unit };
232 let duration = match unit {
233 "s" => chrono::Duration::seconds(amount),
234 "m" => chrono::Duration::minutes(amount),
235 "h" => chrono::Duration::hours(amount),
236 "d" => chrono::Duration::days(amount),
237 _ => anyhow::bail!("unsupported delay unit '{unit}', use s/m/h/d"),
238 };
239 Ok(duration)
240}
241
242#[cfg(all(test, zeroclaw_root_crate))] mod tests {
244 use super::*;
245 use tempfile::TempDir;
246
247 fn test_config(tmp: &TempDir) -> Config {
248 let config = Config {
249 data_dir: tmp.path().join("data"),
250 config_path: tmp.path().join("config.toml"),
251 ..Config::default()
252 };
253 std::fs::create_dir_all(&config.data_dir).unwrap();
254 config
255 }
256
257 fn make_job(config: &Config, expr: &str, tz: Option<&str>, cmd: &str) -> CronJob {
258 add_shell_job(
259 config,
260 None,
261 Schedule::Cron {
262 expr: expr.into(),
263 tz: tz.map(Into::into),
264 },
265 cmd,
266 )
267 .unwrap()
268 }
269
270 fn run_update(
271 config: &Config,
272 id: &str,
273 expression: Option<&str>,
274 tz: Option<&str>,
275 command: Option<&str>,
276 name: Option<&str>,
277 ) -> Result<()> {
278 handle_command(
279 crate::CronCommands::Update {
280 id: id.into(),
281 expression: expression.map(Into::into),
282 tz: tz.map(Into::into),
283 command: command.map(Into::into),
284 name: name.map(Into::into),
285 allowed_tools: vec![],
286 },
287 config,
288 )
289 }
290
291 #[test]
292 fn update_changes_command_via_handler() {
293 let tmp = TempDir::new().unwrap();
294 let config = test_config(&tmp);
295 let job = make_job(&config, "*/5 * * * *", None, "echo original");
296
297 run_update(&config, &job.id, None, None, Some("echo updated"), None).unwrap();
298
299 let updated = get_job(&config, &job.id).unwrap();
300 assert_eq!(updated.command, "echo updated");
301 assert_eq!(updated.id, job.id);
302 }
303
304 #[test]
305 fn update_changes_expression_via_handler() {
306 let tmp = TempDir::new().unwrap();
307 let config = test_config(&tmp);
308 let job = make_job(&config, "*/5 * * * *", None, "echo test");
309
310 run_update(&config, &job.id, Some("0 9 * * *"), None, None, None).unwrap();
311
312 let updated = get_job(&config, &job.id).unwrap();
313 assert_eq!(updated.expression, "0 9 * * *");
314 }
315
316 #[test]
317 fn update_changes_name_via_handler() {
318 let tmp = TempDir::new().unwrap();
319 let config = test_config(&tmp);
320 let job = make_job(&config, "*/5 * * * *", None, "echo test");
321
322 run_update(&config, &job.id, None, None, None, Some("new-name")).unwrap();
323
324 let updated = get_job(&config, &job.id).unwrap();
325 assert_eq!(updated.name.as_deref(), Some("new-name"));
326 }
327
328 #[test]
329 fn update_tz_alone_sets_timezone() {
330 let tmp = TempDir::new().unwrap();
331 let config = test_config(&tmp);
332 let job = make_job(&config, "*/5 * * * *", None, "echo test");
333
334 run_update(
335 &config,
336 &job.id,
337 None,
338 Some("America/Los_Angeles"),
339 None,
340 None,
341 )
342 .unwrap();
343
344 let updated = get_job(&config, &job.id).unwrap();
345 assert_eq!(
346 updated.schedule,
347 Schedule::Cron {
348 expr: "*/5 * * * *".into(),
349 tz: Some("America/Los_Angeles".into()),
350 }
351 );
352 }
353
354 #[test]
355 fn update_expression_preserves_existing_tz() {
356 let tmp = TempDir::new().unwrap();
357 let config = test_config(&tmp);
358 let job = make_job(
359 &config,
360 "*/5 * * * *",
361 Some("America/Los_Angeles"),
362 "echo test",
363 );
364
365 run_update(&config, &job.id, Some("0 9 * * *"), None, None, None).unwrap();
366
367 let updated = get_job(&config, &job.id).unwrap();
368 assert_eq!(
369 updated.schedule,
370 Schedule::Cron {
371 expr: "0 9 * * *".into(),
372 tz: Some("America/Los_Angeles".into()),
373 }
374 );
375 }
376
377 #[test]
378 fn update_preserves_unchanged_fields() {
379 let tmp = TempDir::new().unwrap();
380 let config = test_config(&tmp);
381 let job = add_shell_job(
382 &config,
383 Some("original-name".into()),
384 Schedule::Cron {
385 expr: "*/5 * * * *".into(),
386 tz: None,
387 },
388 "echo original",
389 )
390 .unwrap();
391
392 run_update(&config, &job.id, None, None, Some("echo changed"), None).unwrap();
393
394 let updated = get_job(&config, &job.id).unwrap();
395 assert_eq!(updated.command, "echo changed");
396 assert_eq!(updated.name.as_deref(), Some("original-name"));
397 assert_eq!(updated.expression, "*/5 * * * *");
398 }
399
400 #[test]
401 fn update_no_flags_fails() {
402 let tmp = TempDir::new().unwrap();
403 let config = test_config(&tmp);
404 let job = make_job(&config, "*/5 * * * *", None, "echo test");
405
406 let result = run_update(&config, &job.id, None, None, None, None);
407 assert!(result.is_err());
408 assert!(result.unwrap_err().to_string().contains("At least one of"));
409 }
410
411 #[test]
412 fn update_nonexistent_job_fails() {
413 let tmp = TempDir::new().unwrap();
414 let config = test_config(&tmp);
415
416 let result = run_update(
417 &config,
418 "nonexistent-id",
419 None,
420 None,
421 Some("echo test"),
422 None,
423 );
424 assert!(result.is_err());
425 }
426
427 #[test]
428 fn update_security_allows_safe_command() {
429 let tmp = TempDir::new().unwrap();
430 let config = test_config(&tmp);
431
432 let security = SecurityPolicy::from_risk_profile(
433 &zeroclaw_config::schema::RiskProfileConfig::default(),
434 &config.data_dir,
435 );
436 assert!(security.is_command_allowed("echo safe"));
437 }
438
439 #[test]
440 fn add_shell_job_requires_explicit_approval_for_medium_risk() {
441 let tmp = TempDir::new().unwrap();
442 let mut config = test_config(&tmp);
443 config
444 .risk_profiles
445 .entry("default".into())
446 .or_default()
447 .allowed_commands = vec!["echo".into(), "touch".into()];
448
449 let denied = add_shell_job(
450 &config,
451 None,
452 Schedule::Cron {
453 expr: "*/5 * * * *".into(),
454 tz: None,
455 },
456 "touch cron-medium-risk",
457 );
458 assert!(denied.is_err());
459 assert!(
460 denied
461 .unwrap_err()
462 .to_string()
463 .contains("explicit approval")
464 );
465
466 let approved = add_shell_job_with_approval(
467 &config,
468 None,
469 Schedule::Cron {
470 expr: "*/5 * * * *".into(),
471 tz: None,
472 },
473 "touch cron-medium-risk",
474 None,
475 true,
476 );
477 assert!(approved.is_ok(), "{approved:?}");
478 }
479
480 #[test]
481 fn update_requires_explicit_approval_for_medium_risk() {
482 let tmp = TempDir::new().unwrap();
483 let mut config = test_config(&tmp);
484 config
485 .risk_profiles
486 .entry("default".into())
487 .or_default()
488 .allowed_commands = vec!["echo".into(), "touch".into()];
489 let job = make_job(&config, "*/5 * * * *", None, "echo original");
490
491 let denied = update_shell_job_with_approval(
492 &config,
493 &job.id,
494 CronJobPatch {
495 command: Some("touch cron-medium-risk-update".into()),
496 ..CronJobPatch::default()
497 },
498 false,
499 );
500 assert!(denied.is_err());
501 assert!(
502 denied
503 .unwrap_err()
504 .to_string()
505 .contains("explicit approval")
506 );
507
508 let approved = update_shell_job_with_approval(
509 &config,
510 &job.id,
511 CronJobPatch {
512 command: Some("touch cron-medium-risk-update".into()),
513 ..CronJobPatch::default()
514 },
515 true,
516 )
517 .unwrap();
518 assert_eq!(approved.command, "touch cron-medium-risk-update");
519 }
520
521 #[test]
522 fn cli_update_requires_explicit_approval_for_medium_risk() {
523 let tmp = TempDir::new().unwrap();
524 let mut config = test_config(&tmp);
525 config
526 .risk_profiles
527 .entry("default".into())
528 .or_default()
529 .allowed_commands = vec!["echo".into(), "touch".into()];
530 let job = make_job(&config, "*/5 * * * *", None, "echo original");
531
532 let result = run_update(
533 &config,
534 &job.id,
535 None,
536 None,
537 Some("touch cron-cli-medium-risk"),
538 None,
539 );
540 assert!(result.is_err());
541 assert!(
542 result
543 .unwrap_err()
544 .to_string()
545 .contains("explicit approval")
546 );
547 }
548
549 #[test]
550 fn add_once_validated_creates_one_shot_job() {
551 let tmp = TempDir::new().unwrap();
552 let config = test_config(&tmp);
553
554 let job = add_once_validated(&config, "1h", "echo one-shot", false).unwrap();
555 assert_eq!(job.command, "echo one-shot");
556 assert!(matches!(job.schedule, Schedule::At { .. }));
557 }
558
559 #[test]
560 fn add_once_validated_blocks_disallowed_command() {
561 let tmp = TempDir::new().unwrap();
562 let mut config = test_config(&tmp);
563 config
564 .risk_profiles
565 .entry("default".into())
566 .or_default()
567 .allowed_commands = vec!["echo".into()];
568 config
569 .risk_profiles
570 .entry("default".into())
571 .or_default()
572 .level = crate::security::AutonomyLevel::Supervised;
573
574 let result = add_once_validated(&config, "1h", "curl https://example.com", false);
575 assert!(result.is_err());
576 assert!(
577 result
578 .unwrap_err()
579 .to_string()
580 .contains("blocked by security policy")
581 );
582 }
583
584 #[test]
585 fn add_once_at_validated_creates_one_shot_job() {
586 let tmp = TempDir::new().unwrap();
587 let config = test_config(&tmp);
588 let at = chrono::Utc::now() + chrono::Duration::hours(1);
589
590 let job = add_once_at_validated(&config, at, "echo at-shot", false).unwrap();
591 assert_eq!(job.command, "echo at-shot");
592 assert!(matches!(job.schedule, Schedule::At { .. }));
593 }
594
595 #[test]
596 fn add_once_at_validated_blocks_medium_risk_without_approval() {
597 let tmp = TempDir::new().unwrap();
598 let mut config = test_config(&tmp);
599 config
600 .risk_profiles
601 .entry("default".into())
602 .or_default()
603 .allowed_commands = vec!["echo".into(), "touch".into()];
604 let at = chrono::Utc::now() + chrono::Duration::hours(1);
605
606 let denied = add_once_at_validated(&config, at, "touch at-medium", false);
607 assert!(denied.is_err());
608 assert!(
609 denied
610 .unwrap_err()
611 .to_string()
612 .contains("explicit approval")
613 );
614
615 let approved = add_once_at_validated(&config, at, "touch at-medium", true);
616 assert!(approved.is_ok(), "{approved:?}");
617 }
618
619 #[test]
620 fn gateway_api_path_validates_shell_command() {
621 let tmp = TempDir::new().unwrap();
622 let mut config = test_config(&tmp);
623 config
624 .risk_profiles
625 .entry("default".into())
626 .or_default()
627 .allowed_commands = vec!["echo".into()];
628 config
629 .risk_profiles
630 .entry("default".into())
631 .or_default()
632 .level = crate::security::AutonomyLevel::Supervised;
633
634 let result = add_shell_job_with_approval(
636 &config,
637 None,
638 Schedule::Cron {
639 expr: "*/5 * * * *".into(),
640 tz: None,
641 },
642 "curl https://example.com",
643 None,
644 false,
645 );
646 assert!(result.is_err());
647 assert!(
648 result
649 .unwrap_err()
650 .to_string()
651 .contains("blocked by security policy")
652 );
653 }
654
655 #[test]
656 fn scheduler_path_validates_shell_command() {
657 let tmp = TempDir::new().unwrap();
658 let mut config = test_config(&tmp);
659 config
660 .risk_profiles
661 .entry("default".into())
662 .or_default()
663 .allowed_commands = vec!["echo".into()];
664 config
665 .risk_profiles
666 .entry("default".into())
667 .or_default()
668 .level = crate::security::AutonomyLevel::Supervised;
669
670 let security = SecurityPolicy::from_risk_profile(
671 &zeroclaw_config::schema::RiskProfileConfig::default(),
672 &config.data_dir,
673 );
674 let result =
676 validate_shell_command_with_security(&security, "curl https://example.com", false);
677 assert!(result.is_err());
678 assert!(
679 result
680 .unwrap_err()
681 .to_string()
682 .contains("blocked by security policy")
683 );
684 }
685
686 #[test]
687 fn cli_agent_flag_creates_agent_job() {
688 let tmp = TempDir::new().unwrap();
689 let config = test_config(&tmp);
690
691 handle_command(
692 crate::CronCommands::Add {
693 expression: "*/15 * * * *".into(),
694 tz: None,
695 agent: true,
696 allowed_tools: vec![],
697 command: "Check server health: disk space, memory, CPU load".into(),
698 },
699 &config,
700 )
701 .unwrap();
702
703 let jobs = list_jobs(&config).unwrap();
704 assert_eq!(jobs.len(), 1);
705 assert_eq!(jobs[0].job_type, JobType::Agent);
706 assert_eq!(
707 jobs[0].prompt.as_deref(),
708 Some("Check server health: disk space, memory, CPU load")
709 );
710 }
711
712 #[test]
713 fn cli_agent_flag_bypasses_shell_security_validation() {
714 let tmp = TempDir::new().unwrap();
715 let mut config = test_config(&tmp);
716 config
717 .risk_profiles
718 .entry("default".into())
719 .or_default()
720 .allowed_commands = vec!["echo".into()];
721 config
722 .risk_profiles
723 .entry("default".into())
724 .or_default()
725 .level = crate::security::AutonomyLevel::Supervised;
726
727 let result = handle_command(
731 crate::CronCommands::Add {
732 expression: "*/15 * * * *".into(),
733 tz: None,
734 agent: true,
735 allowed_tools: vec![],
736 command: "Check server health: disk space, memory, CPU load".into(),
737 },
738 &config,
739 );
740 assert!(result.is_ok());
741
742 let jobs = list_jobs(&config).unwrap();
743 assert_eq!(jobs.len(), 1);
744 assert_eq!(jobs[0].job_type, JobType::Agent);
745 }
746
747 #[test]
748 fn cli_agent_allowed_tools_persist() {
749 let tmp = TempDir::new().unwrap();
750 let config = test_config(&tmp);
751
752 handle_command(
753 crate::CronCommands::Add {
754 expression: "*/15 * * * *".into(),
755 tz: None,
756 agent: true,
757 allowed_tools: vec!["file_read".into(), "web_search".into()],
758 command: "Check server health".into(),
759 },
760 &config,
761 )
762 .unwrap();
763
764 let jobs = list_jobs(&config).unwrap();
765 assert_eq!(jobs.len(), 1);
766 assert_eq!(
767 jobs[0].allowed_tools,
768 Some(vec!["file_read".into(), "web_search".into()])
769 );
770 }
771
772 #[test]
773 fn cli_update_agent_allowed_tools_persist() {
774 let tmp = TempDir::new().unwrap();
775 let config = test_config(&tmp);
776 let job = add_agent_job(
777 &config,
778 Some("agent".into()),
779 Schedule::Cron {
780 expr: "*/5 * * * *".into(),
781 tz: None,
782 },
783 "original prompt",
784 SessionTarget::Isolated,
785 None,
786 None,
787 false,
788 None,
789 )
790 .unwrap();
791
792 handle_command(
793 crate::CronCommands::Update {
794 id: job.id.clone(),
795 expression: None,
796 tz: None,
797 command: None,
798 name: None,
799 allowed_tools: vec!["shell".into()],
800 },
801 &config,
802 )
803 .unwrap();
804
805 let updated = get_job(&config, &job.id).unwrap();
806 assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
807 }
808
809 #[test]
810 fn cli_without_agent_flag_defaults_to_shell_job() {
811 let tmp = TempDir::new().unwrap();
812 let config = test_config(&tmp);
813
814 handle_command(
815 crate::CronCommands::Add {
816 expression: "*/5 * * * *".into(),
817 tz: None,
818 agent: false,
819 allowed_tools: vec![],
820 command: "echo ok".into(),
821 },
822 &config,
823 )
824 .unwrap();
825
826 let jobs = list_jobs(&config).unwrap();
827 assert_eq!(jobs.len(), 1);
828 assert_eq!(jobs[0].job_type, JobType::Shell);
829 assert_eq!(jobs[0].command, "echo ok");
830 }
831}
832
833#[cfg(test)]
834mod validate_delivery_tests {
835 use super::*;
836 use crate::cron::types::DeliveryConfig;
837
838 #[test]
839 fn validate_delivery_accepts_webhook_with_thread_id() {
840 let delivery = DeliveryConfig {
841 mode: "announce".into(),
842 channel: Some("webhook".into()),
843 to: Some("user-42".into()),
844 thread_id: Some("conv-99".into()),
845 best_effort: true,
846 };
847 validate_delivery_config(Some(&delivery)).expect("webhook with thread_id must validate");
848 }
849
850 #[test]
851 fn validate_delivery_accepts_webhook_without_thread_id() {
852 let delivery = DeliveryConfig {
853 mode: "announce".into(),
854 channel: Some("webhook".into()),
855 to: Some("user-42".into()),
856 thread_id: None,
857 best_effort: true,
858 };
859 validate_delivery_config(Some(&delivery)).expect("webhook without thread_id must validate");
860 }
861}