1use crate::security::SecurityPolicy;
2use anyhow::{Result, bail};
3use zeroclaw_config::schema::Config;
4
5mod schedule;
6mod store;
7mod types;
8
9pub mod scheduler;
10
11#[allow(unused_imports)]
12pub use schedule::{
13 next_run_for_schedule, normalize_expression, schedule_cron_expression, validate_schedule,
14};
15#[allow(unused_imports)]
16pub use store::{
17 add_agent_job, all_overdue_jobs, due_jobs, get_job, list_jobs, list_runs, record_last_run,
18 record_last_run_with_status, record_run, remove_job, reschedule_after_run,
19 reschedule_after_run_with_status, skip_missed_run, sync_declarative_jobs, update_job,
20};
21pub use types::{
22 CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
23 deserialize_maybe_stringified,
24};
25
26pub(crate) const CRON_DELIVERY_SCHEMA_CHANNELS: &[&str] = &[
29 "telegram",
30 "discord",
31 "slack",
32 "mattermost",
33 "matrix",
34 "qq",
35 "webhook",
36 "lark",
37 "feishu",
38 "dingtalk",
39];
40
41pub fn validate_shell_command(
46 config: &Config,
47 agent_alias: &str,
48 command: &str,
49 approved: bool,
50) -> Result<()> {
51 let security = SecurityPolicy::for_agent(config, agent_alias)?;
52 validate_shell_command_with_security(&security, command, approved)
53}
54
55pub fn validate_shell_command_with_security(
59 security: &SecurityPolicy,
60 command: &str,
61 approved: bool,
62) -> Result<()> {
63 security
64 .validate_command_execution(command, approved)
65 .map(|_| ())
66 .map_err(|reason| {
67 ::zeroclaw_log::record!(
68 WARN,
69 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
70 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
71 .with_attrs(::serde_json::json!({"reason": reason.to_string()})),
72 "cron shell command rejected by security policy"
73 );
74 anyhow::Error::msg(format!("blocked by security policy: {reason}"))
75 })
76}
77
78pub fn validate_delivery_config(delivery: Option<&DeliveryConfig>) -> Result<()> {
79 let Some(delivery) = delivery else {
80 return Ok(());
81 };
82
83 if delivery.mode.eq_ignore_ascii_case("none") {
84 return Ok(());
85 }
86 if !delivery.mode.eq_ignore_ascii_case("announce") {
87 bail!("unsupported delivery mode: {}", delivery.mode);
88 }
89
90 let channel = delivery.channel.as_deref().map(str::trim);
97 if channel.filter(|value| !value.is_empty()).is_none() {
98 bail!("delivery.channel is required for announce mode");
99 }
100
101 let has_target = delivery
102 .to
103 .as_deref()
104 .map(str::trim)
105 .is_some_and(|value| !value.is_empty());
106 if !has_target {
107 bail!("delivery.to is required for announce mode");
108 }
109
110 Ok(())
111}
112
113pub fn add_shell_job_with_approval(
120 config: &Config,
121 agent_alias: &str,
122 name: Option<String>,
123 schedule: Schedule,
124 command: &str,
125 delivery: Option<DeliveryConfig>,
126 approved: bool,
127) -> Result<CronJob> {
128 validate_shell_command(config, agent_alias, command, approved)?;
129 validate_delivery_config(delivery.as_ref())?;
130 store::add_shell_job(config, agent_alias, name, schedule, command, delivery)
131}
132
133pub fn update_shell_job_with_approval(
138 config: &Config,
139 agent_alias: &str,
140 job_id: &str,
141 patch: CronJobPatch,
142 approved: bool,
143) -> Result<CronJob> {
144 if let Some(command) = patch.command.as_deref() {
145 validate_shell_command(config, agent_alias, command, approved)?;
146 }
147 update_job(config, job_id, patch)
148}
149
150pub fn add_once_validated(
152 config: &Config,
153 agent_alias: &str,
154 delay: &str,
155 command: &str,
156 approved: bool,
157) -> Result<CronJob> {
158 let duration = parse_delay(delay)?;
159 let at = chrono::Utc::now() + duration;
160 add_once_at_validated(config, agent_alias, at, command, approved)
161}
162
163pub fn add_once_at_validated(
165 config: &Config,
166 agent_alias: &str,
167 at: chrono::DateTime<chrono::Utc>,
168 command: &str,
169 approved: bool,
170) -> Result<CronJob> {
171 let schedule = Schedule::At { at };
172 add_shell_job_with_approval(config, agent_alias, None, schedule, command, None, approved)
173}
174
175pub fn add_shell_job(
178 config: &Config,
179 agent_alias: &str,
180 name: Option<String>,
181 schedule: Schedule,
182 command: &str,
183) -> Result<CronJob> {
184 add_shell_job_with_approval(config, agent_alias, name, schedule, command, None, false)
185}
186
187pub fn add_job(
188 config: &Config,
189 agent_alias: &str,
190 expression: &str,
191 command: &str,
192) -> Result<CronJob> {
193 let schedule = Schedule::Cron {
194 expr: expression.to_string(),
195 tz: None,
196 };
197 add_shell_job(config, agent_alias, None, schedule, command)
198}
199
200#[allow(clippy::needless_pass_by_value)]
201pub fn add_once(config: &Config, agent_alias: &str, delay: &str, command: &str) -> Result<CronJob> {
202 add_once_validated(config, agent_alias, delay, command, false)
203}
204
205pub fn add_once_at(
206 config: &Config,
207 agent_alias: &str,
208 at: chrono::DateTime<chrono::Utc>,
209 command: &str,
210) -> Result<CronJob> {
211 add_once_at_validated(config, agent_alias, at, command, false)
212}
213
214pub fn pause_job(config: &Config, id: &str) -> Result<CronJob> {
215 update_job(
216 config,
217 id,
218 CronJobPatch {
219 enabled: Some(false),
220 ..CronJobPatch::default()
221 },
222 )
223}
224
225pub fn resume_job(config: &Config, id: &str) -> Result<CronJob> {
226 update_job(
227 config,
228 id,
229 CronJobPatch {
230 enabled: Some(true),
231 ..CronJobPatch::default()
232 },
233 )
234}
235
236pub fn parse_delay(input: &str) -> Result<chrono::Duration> {
237 let input = input.trim();
238 if input.is_empty() {
239 anyhow::bail!("delay must not be empty");
240 }
241 let split = input
242 .find(|c: char| !c.is_ascii_digit())
243 .unwrap_or(input.len());
244 let (num, unit) = input.split_at(split);
245 let amount: i64 = num.parse()?;
246 let unit = if unit.is_empty() { "m" } else { unit };
247 let duration = match unit {
248 "s" => chrono::Duration::seconds(amount),
249 "m" => chrono::Duration::minutes(amount),
250 "h" => chrono::Duration::hours(amount),
251 "d" => chrono::Duration::days(amount),
252 _ => anyhow::bail!("unsupported delay unit '{unit}', use s/m/h/d"),
253 };
254 Ok(duration)
255}
256
257#[cfg(all(test, zeroclaw_root_crate))] mod tests {
259 use super::*;
260 use tempfile::TempDir;
261
262 fn test_config(tmp: &TempDir) -> Config {
263 let config = Config {
264 data_dir: tmp.path().join("data"),
265 config_path: tmp.path().join("config.toml"),
266 ..Config::default()
267 };
268 std::fs::create_dir_all(&config.data_dir).unwrap();
269 config
270 }
271
272 fn make_job(config: &Config, expr: &str, tz: Option<&str>, cmd: &str) -> CronJob {
273 add_shell_job(
274 config,
275 None,
276 Schedule::Cron {
277 expr: expr.into(),
278 tz: tz.map(Into::into),
279 },
280 cmd,
281 )
282 .unwrap()
283 }
284
285 fn run_update(
286 config: &Config,
287 id: &str,
288 expression: Option<&str>,
289 tz: Option<&str>,
290 command: Option<&str>,
291 name: Option<&str>,
292 ) -> Result<()> {
293 handle_command(
294 crate::CronCommands::Update {
295 id: id.into(),
296 expression: expression.map(Into::into),
297 tz: tz.map(Into::into),
298 command: command.map(Into::into),
299 name: name.map(Into::into),
300 allowed_tools: vec![],
301 },
302 config,
303 )
304 }
305
306 #[test]
307 fn update_changes_command_via_handler() {
308 let tmp = TempDir::new().unwrap();
309 let config = test_config(&tmp);
310 let job = make_job(&config, "*/5 * * * *", None, "echo original");
311
312 run_update(&config, &job.id, None, None, Some("echo updated"), None).unwrap();
313
314 let updated = get_job(&config, &job.id).unwrap();
315 assert_eq!(updated.command, "echo updated");
316 assert_eq!(updated.id, job.id);
317 }
318
319 #[test]
320 fn update_changes_expression_via_handler() {
321 let tmp = TempDir::new().unwrap();
322 let config = test_config(&tmp);
323 let job = make_job(&config, "*/5 * * * *", None, "echo test");
324
325 run_update(&config, &job.id, Some("0 9 * * *"), None, None, None).unwrap();
326
327 let updated = get_job(&config, &job.id).unwrap();
328 assert_eq!(updated.expression, "0 9 * * *");
329 }
330
331 #[test]
332 fn update_changes_name_via_handler() {
333 let tmp = TempDir::new().unwrap();
334 let config = test_config(&tmp);
335 let job = make_job(&config, "*/5 * * * *", None, "echo test");
336
337 run_update(&config, &job.id, None, None, None, Some("new-name")).unwrap();
338
339 let updated = get_job(&config, &job.id).unwrap();
340 assert_eq!(updated.name.as_deref(), Some("new-name"));
341 }
342
343 #[test]
344 fn update_tz_alone_sets_timezone() {
345 let tmp = TempDir::new().unwrap();
346 let config = test_config(&tmp);
347 let job = make_job(&config, "*/5 * * * *", None, "echo test");
348
349 run_update(
350 &config,
351 &job.id,
352 None,
353 Some("America/Los_Angeles"),
354 None,
355 None,
356 )
357 .unwrap();
358
359 let updated = get_job(&config, &job.id).unwrap();
360 assert_eq!(
361 updated.schedule,
362 Schedule::Cron {
363 expr: "*/5 * * * *".into(),
364 tz: Some("America/Los_Angeles".into()),
365 }
366 );
367 }
368
369 #[test]
370 fn update_expression_preserves_existing_tz() {
371 let tmp = TempDir::new().unwrap();
372 let config = test_config(&tmp);
373 let job = make_job(
374 &config,
375 "*/5 * * * *",
376 Some("America/Los_Angeles"),
377 "echo test",
378 );
379
380 run_update(&config, &job.id, Some("0 9 * * *"), None, None, None).unwrap();
381
382 let updated = get_job(&config, &job.id).unwrap();
383 assert_eq!(
384 updated.schedule,
385 Schedule::Cron {
386 expr: "0 9 * * *".into(),
387 tz: Some("America/Los_Angeles".into()),
388 }
389 );
390 }
391
392 #[test]
393 fn update_preserves_unchanged_fields() {
394 let tmp = TempDir::new().unwrap();
395 let config = test_config(&tmp);
396 let job = add_shell_job(
397 &config,
398 Some("original-name".into()),
399 Schedule::Cron {
400 expr: "*/5 * * * *".into(),
401 tz: None,
402 },
403 "echo original",
404 )
405 .unwrap();
406
407 run_update(&config, &job.id, None, None, Some("echo changed"), None).unwrap();
408
409 let updated = get_job(&config, &job.id).unwrap();
410 assert_eq!(updated.command, "echo changed");
411 assert_eq!(updated.name.as_deref(), Some("original-name"));
412 assert_eq!(updated.expression, "*/5 * * * *");
413 }
414
415 #[test]
416 fn update_no_flags_fails() {
417 let tmp = TempDir::new().unwrap();
418 let config = test_config(&tmp);
419 let job = make_job(&config, "*/5 * * * *", None, "echo test");
420
421 let result = run_update(&config, &job.id, None, None, None, None);
422 assert!(result.is_err());
423 assert!(result.unwrap_err().to_string().contains("At least one of"));
424 }
425
426 #[test]
427 fn update_nonexistent_job_fails() {
428 let tmp = TempDir::new().unwrap();
429 let config = test_config(&tmp);
430
431 let result = run_update(
432 &config,
433 "nonexistent-id",
434 None,
435 None,
436 Some("echo test"),
437 None,
438 );
439 assert!(result.is_err());
440 }
441
442 #[test]
443 fn update_security_allows_safe_command() {
444 let tmp = TempDir::new().unwrap();
445 let config = test_config(&tmp);
446
447 let security = SecurityPolicy::from_risk_profile(
448 &zeroclaw_config::schema::RiskProfileConfig::default(),
449 &config.data_dir,
450 );
451 assert!(security.is_command_allowed("echo safe"));
452 }
453
454 #[test]
455 fn add_shell_job_requires_explicit_approval_for_medium_risk() {
456 let tmp = TempDir::new().unwrap();
457 let mut config = test_config(&tmp);
458 config
459 .risk_profiles
460 .entry("default".into())
461 .or_default()
462 .allowed_commands = vec!["echo".into(), "touch".into()];
463
464 let denied = add_shell_job(
465 &config,
466 None,
467 Schedule::Cron {
468 expr: "*/5 * * * *".into(),
469 tz: None,
470 },
471 "touch cron-medium-risk",
472 );
473 assert!(denied.is_err());
474 assert!(
475 denied
476 .unwrap_err()
477 .to_string()
478 .contains("explicit approval")
479 );
480
481 let approved = add_shell_job_with_approval(
482 &config,
483 None,
484 Schedule::Cron {
485 expr: "*/5 * * * *".into(),
486 tz: None,
487 },
488 "touch cron-medium-risk",
489 None,
490 true,
491 );
492 assert!(approved.is_ok(), "{approved:?}");
493 }
494
495 #[test]
496 fn update_requires_explicit_approval_for_medium_risk() {
497 let tmp = TempDir::new().unwrap();
498 let mut config = test_config(&tmp);
499 config
500 .risk_profiles
501 .entry("default".into())
502 .or_default()
503 .allowed_commands = vec!["echo".into(), "touch".into()];
504 let job = make_job(&config, "*/5 * * * *", None, "echo original");
505
506 let denied = update_shell_job_with_approval(
507 &config,
508 &job.id,
509 CronJobPatch {
510 command: Some("touch cron-medium-risk-update".into()),
511 ..CronJobPatch::default()
512 },
513 false,
514 );
515 assert!(denied.is_err());
516 assert!(
517 denied
518 .unwrap_err()
519 .to_string()
520 .contains("explicit approval")
521 );
522
523 let approved = update_shell_job_with_approval(
524 &config,
525 &job.id,
526 CronJobPatch {
527 command: Some("touch cron-medium-risk-update".into()),
528 ..CronJobPatch::default()
529 },
530 true,
531 )
532 .unwrap();
533 assert_eq!(approved.command, "touch cron-medium-risk-update");
534 }
535
536 #[test]
537 fn cli_update_requires_explicit_approval_for_medium_risk() {
538 let tmp = TempDir::new().unwrap();
539 let mut config = test_config(&tmp);
540 config
541 .risk_profiles
542 .entry("default".into())
543 .or_default()
544 .allowed_commands = vec!["echo".into(), "touch".into()];
545 let job = make_job(&config, "*/5 * * * *", None, "echo original");
546
547 let result = run_update(
548 &config,
549 &job.id,
550 None,
551 None,
552 Some("touch cron-cli-medium-risk"),
553 None,
554 );
555 assert!(result.is_err());
556 assert!(
557 result
558 .unwrap_err()
559 .to_string()
560 .contains("explicit approval")
561 );
562 }
563
564 #[test]
565 fn add_once_validated_creates_one_shot_job() {
566 let tmp = TempDir::new().unwrap();
567 let config = test_config(&tmp);
568
569 let job = add_once_validated(&config, "1h", "echo one-shot", false).unwrap();
570 assert_eq!(job.command, "echo one-shot");
571 assert!(matches!(job.schedule, Schedule::At { .. }));
572 }
573
574 #[test]
575 fn add_once_validated_blocks_disallowed_command() {
576 let tmp = TempDir::new().unwrap();
577 let mut config = test_config(&tmp);
578 config
579 .risk_profiles
580 .entry("default".into())
581 .or_default()
582 .allowed_commands = vec!["echo".into()];
583 config
584 .risk_profiles
585 .entry("default".into())
586 .or_default()
587 .level = crate::security::AutonomyLevel::Supervised;
588
589 let result = add_once_validated(&config, "1h", "curl https://example.com", false);
590 assert!(result.is_err());
591 assert!(
592 result
593 .unwrap_err()
594 .to_string()
595 .contains("blocked by security policy")
596 );
597 }
598
599 #[test]
600 fn add_once_at_validated_creates_one_shot_job() {
601 let tmp = TempDir::new().unwrap();
602 let config = test_config(&tmp);
603 let at = chrono::Utc::now() + chrono::Duration::hours(1);
604
605 let job = add_once_at_validated(&config, at, "echo at-shot", false).unwrap();
606 assert_eq!(job.command, "echo at-shot");
607 assert!(matches!(job.schedule, Schedule::At { .. }));
608 }
609
610 #[test]
611 fn add_once_at_validated_blocks_medium_risk_without_approval() {
612 let tmp = TempDir::new().unwrap();
613 let mut config = test_config(&tmp);
614 config
615 .risk_profiles
616 .entry("default".into())
617 .or_default()
618 .allowed_commands = vec!["echo".into(), "touch".into()];
619 let at = chrono::Utc::now() + chrono::Duration::hours(1);
620
621 let denied = add_once_at_validated(&config, at, "touch at-medium", false);
622 assert!(denied.is_err());
623 assert!(
624 denied
625 .unwrap_err()
626 .to_string()
627 .contains("explicit approval")
628 );
629
630 let approved = add_once_at_validated(&config, at, "touch at-medium", true);
631 assert!(approved.is_ok(), "{approved:?}");
632 }
633
634 #[test]
635 fn gateway_api_path_validates_shell_command() {
636 let tmp = TempDir::new().unwrap();
637 let mut config = test_config(&tmp);
638 config
639 .risk_profiles
640 .entry("default".into())
641 .or_default()
642 .allowed_commands = vec!["echo".into()];
643 config
644 .risk_profiles
645 .entry("default".into())
646 .or_default()
647 .level = crate::security::AutonomyLevel::Supervised;
648
649 let result = add_shell_job_with_approval(
651 &config,
652 None,
653 Schedule::Cron {
654 expr: "*/5 * * * *".into(),
655 tz: None,
656 },
657 "curl https://example.com",
658 None,
659 false,
660 );
661 assert!(result.is_err());
662 assert!(
663 result
664 .unwrap_err()
665 .to_string()
666 .contains("blocked by security policy")
667 );
668 }
669
670 #[test]
671 fn scheduler_path_validates_shell_command() {
672 let tmp = TempDir::new().unwrap();
673 let mut config = test_config(&tmp);
674 config
675 .risk_profiles
676 .entry("default".into())
677 .or_default()
678 .allowed_commands = vec!["echo".into()];
679 config
680 .risk_profiles
681 .entry("default".into())
682 .or_default()
683 .level = crate::security::AutonomyLevel::Supervised;
684
685 let security = SecurityPolicy::from_risk_profile(
686 &zeroclaw_config::schema::RiskProfileConfig::default(),
687 &config.data_dir,
688 );
689 let result =
691 validate_shell_command_with_security(&security, "curl https://example.com", false);
692 assert!(result.is_err());
693 assert!(
694 result
695 .unwrap_err()
696 .to_string()
697 .contains("blocked by security policy")
698 );
699 }
700
701 #[test]
702 fn cli_agent_flag_creates_agent_job() {
703 let tmp = TempDir::new().unwrap();
704 let config = test_config(&tmp);
705
706 handle_command(
707 crate::CronCommands::Add {
708 expression: "*/15 * * * *".into(),
709 tz: None,
710 agent: true,
711 allowed_tools: vec![],
712 command: "Check server health: disk space, memory, CPU load".into(),
713 },
714 &config,
715 )
716 .unwrap();
717
718 let jobs = list_jobs(&config).unwrap();
719 assert_eq!(jobs.len(), 1);
720 assert_eq!(jobs[0].job_type, JobType::Agent);
721 assert_eq!(
722 jobs[0].prompt.as_deref(),
723 Some("Check server health: disk space, memory, CPU load")
724 );
725 }
726
727 #[test]
728 fn cli_agent_flag_bypasses_shell_security_validation() {
729 let tmp = TempDir::new().unwrap();
730 let mut config = test_config(&tmp);
731 config
732 .risk_profiles
733 .entry("default".into())
734 .or_default()
735 .allowed_commands = vec!["echo".into()];
736 config
737 .risk_profiles
738 .entry("default".into())
739 .or_default()
740 .level = crate::security::AutonomyLevel::Supervised;
741
742 let result = handle_command(
746 crate::CronCommands::Add {
747 expression: "*/15 * * * *".into(),
748 tz: None,
749 agent: true,
750 allowed_tools: vec![],
751 command: "Check server health: disk space, memory, CPU load".into(),
752 },
753 &config,
754 );
755 assert!(result.is_ok());
756
757 let jobs = list_jobs(&config).unwrap();
758 assert_eq!(jobs.len(), 1);
759 assert_eq!(jobs[0].job_type, JobType::Agent);
760 }
761
762 #[test]
763 fn cli_agent_allowed_tools_persist() {
764 let tmp = TempDir::new().unwrap();
765 let config = test_config(&tmp);
766
767 handle_command(
768 crate::CronCommands::Add {
769 expression: "*/15 * * * *".into(),
770 tz: None,
771 agent: true,
772 allowed_tools: vec!["file_read".into(), "web_search".into()],
773 command: "Check server health".into(),
774 },
775 &config,
776 )
777 .unwrap();
778
779 let jobs = list_jobs(&config).unwrap();
780 assert_eq!(jobs.len(), 1);
781 assert_eq!(
782 jobs[0].allowed_tools,
783 Some(vec!["file_read".into(), "web_search".into()])
784 );
785 }
786
787 #[test]
788 fn cli_update_agent_allowed_tools_persist() {
789 let tmp = TempDir::new().unwrap();
790 let config = test_config(&tmp);
791 let job = add_agent_job(
792 &config,
793 Some("agent".into()),
794 Schedule::Cron {
795 expr: "*/5 * * * *".into(),
796 tz: None,
797 },
798 "original prompt",
799 SessionTarget::Isolated,
800 None,
801 None,
802 false,
803 None,
804 )
805 .unwrap();
806
807 handle_command(
808 crate::CronCommands::Update {
809 id: job.id.clone(),
810 expression: None,
811 tz: None,
812 command: None,
813 name: None,
814 allowed_tools: vec!["shell".into()],
815 },
816 &config,
817 )
818 .unwrap();
819
820 let updated = get_job(&config, &job.id).unwrap();
821 assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
822 }
823
824 #[test]
825 fn cli_without_agent_flag_defaults_to_shell_job() {
826 let tmp = TempDir::new().unwrap();
827 let config = test_config(&tmp);
828
829 handle_command(
830 crate::CronCommands::Add {
831 expression: "*/5 * * * *".into(),
832 tz: None,
833 agent: false,
834 allowed_tools: vec![],
835 command: "echo ok".into(),
836 },
837 &config,
838 )
839 .unwrap();
840
841 let jobs = list_jobs(&config).unwrap();
842 assert_eq!(jobs.len(), 1);
843 assert_eq!(jobs[0].job_type, JobType::Shell);
844 assert_eq!(jobs[0].command, "echo ok");
845 }
846}
847
848#[cfg(test)]
849mod validate_delivery_tests {
850 use super::*;
851 use crate::cron::types::DeliveryConfig;
852
853 #[test]
854 fn validate_delivery_accepts_webhook_with_thread_id() {
855 let delivery = DeliveryConfig {
856 mode: "announce".into(),
857 channel: Some("webhook".into()),
858 to: Some("user-42".into()),
859 thread_id: Some("conv-99".into()),
860 best_effort: true,
861 };
862 validate_delivery_config(Some(&delivery)).expect("webhook with thread_id must validate");
863 }
864
865 #[test]
866 fn validate_delivery_accepts_webhook_without_thread_id() {
867 let delivery = DeliveryConfig {
868 mode: "announce".into(),
869 channel: Some("webhook".into()),
870 to: Some("user-42".into()),
871 thread_id: None,
872 best_effort: true,
873 };
874 validate_delivery_config(Some(&delivery)).expect("webhook without thread_id must validate");
875 }
876}