1use crate::cron::{
2 CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
3 next_run_for_schedule, schedule_cron_expression, validate_delivery_config, validate_schedule,
4};
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use rusqlite::types::{FromSqlResult, ValueRef};
8use rusqlite::{Connection, OpenFlags, params};
9use uuid::Uuid;
10use zeroclaw_config::schema::Config;
11
12const MAX_CRON_OUTPUT_BYTES: usize = 16 * 1024;
13const TRUNCATED_OUTPUT_MARKER: &str = "\n...[truncated]";
14
15#[derive(Clone, Copy, Debug, Eq, PartialEq)]
16pub(crate) enum RunCompletionAction {
17 Reschedule,
18 Disable,
19 Delete,
20}
21
22#[cfg(test)]
23static WRITE_CONNECTION_COUNTS_FOR_TESTS: std::sync::LazyLock<
24 std::sync::Mutex<std::collections::HashMap<std::path::PathBuf, usize>>,
25> = std::sync::LazyLock::new(|| std::sync::Mutex::new(std::collections::HashMap::new()));
26
27#[cfg(test)]
28pub(crate) fn reset_write_connection_count_for_tests(config: &Config) {
29 let mut counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
30 .lock()
31 .unwrap_or_else(|e| e.into_inner());
32 counts.insert(cron_db_path(config), 0);
33}
34
35#[cfg(test)]
36pub(crate) fn write_connection_count_for_tests(config: &Config) -> usize {
37 let counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
38 .lock()
39 .unwrap_or_else(|e| e.into_inner());
40 counts.get(&cron_db_path(config)).copied().unwrap_or(0)
41}
42
43impl rusqlite::types::FromSql for JobType {
44 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
45 let text = value.as_str()?;
46 JobType::try_from(text).map_err(|e| rusqlite::types::FromSqlError::Other(e.into()))
47 }
48}
49
50#[cfg(test)]
51pub fn add_job(
52 config: &Config,
53 agent_alias: &str,
54 expression: &str,
55 command: &str,
56) -> Result<CronJob> {
57 let schedule = Schedule::Cron {
58 expr: expression.to_string(),
59 tz: None,
60 };
61 add_shell_job(config, agent_alias, None, schedule, command, None)
62}
63
64pub fn add_shell_job(
65 config: &Config,
66 agent_alias: &str,
67 name: Option<String>,
68 schedule: Schedule,
69 command: &str,
70 delivery: Option<DeliveryConfig>,
71) -> Result<CronJob> {
72 let now = Utc::now();
73 validate_schedule(&schedule, now)?;
74 validate_delivery_config(delivery.as_ref())?;
75 let next_run = next_run_for_schedule(&schedule, now)?;
76 let id = Uuid::new_v4().to_string();
77 let expression = schedule_cron_expression(&schedule).unwrap_or_default();
78 let schedule_json = serde_json::to_string(&schedule)?;
79 let delivery = delivery.unwrap_or_default();
80
81 let delete_after_run = matches!(schedule, Schedule::At { .. });
82 let agent_alias = agent_alias.trim();
83 if agent_alias.is_empty() {
84 anyhow::bail!("agent_alias is required; cron jobs must name an owning agent");
85 }
86
87 with_initialized_connection(config, |conn| {
88 conn.execute(
89 "INSERT INTO cron_jobs (
90 id, expression, command, schedule, job_type, prompt, name, session_target, model,
91 enabled, delivery, delete_after_run, agent_alias, created_at, next_run
92 ) VALUES (?1, ?2, ?3, ?4, 'shell', NULL, ?5, 'isolated', NULL, 1, ?6, ?7, ?8, ?9, ?10)",
93 params![
94 id,
95 expression,
96 command,
97 schedule_json,
98 name,
99 serde_json::to_string(&delivery)?,
100 if delete_after_run { 1 } else { 0 },
101 agent_alias,
102 now.to_rfc3339(),
103 next_run.to_rfc3339(),
104 ],
105 )
106 .context("Failed to insert cron shell job")?;
107 Ok(())
108 })?;
109
110 get_job(config, &id)
111}
112
113#[allow(clippy::too_many_arguments)]
114pub fn add_agent_job(
115 config: &Config,
116 agent_alias: &str,
117 name: Option<String>,
118 schedule: Schedule,
119 prompt: &str,
120 session_target: SessionTarget,
121 model: Option<String>,
122 delivery: Option<DeliveryConfig>,
123 delete_after_run: bool,
124 allowed_tools: Option<Vec<String>>,
125) -> Result<CronJob> {
126 let now = Utc::now();
127 validate_schedule(&schedule, now)?;
128 validate_delivery_config(delivery.as_ref())?;
129 let next_run = next_run_for_schedule(&schedule, now)?;
130 let id = Uuid::new_v4().to_string();
131 let expression = schedule_cron_expression(&schedule).unwrap_or_default();
132 let schedule_json = serde_json::to_string(&schedule)?;
133 let delivery = delivery.unwrap_or_default();
134 let agent_alias = agent_alias.trim();
135 if agent_alias.is_empty() {
136 anyhow::bail!("agent_alias is required; cron jobs must name an owning agent");
137 }
138
139 with_initialized_connection(config, |conn| {
140 conn.execute(
141 "INSERT INTO cron_jobs (
142 id, expression, command, schedule, job_type, prompt, name, session_target, model,
143 enabled, delivery, delete_after_run, allowed_tools, agent_alias, created_at, next_run
144 ) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11, ?12, ?13)",
145 params![
146 id,
147 expression,
148 schedule_json,
149 prompt,
150 name,
151 session_target.as_str(),
152 model,
153 serde_json::to_string(&delivery)?,
154 if delete_after_run { 1 } else { 0 },
155 encode_allowed_tools(allowed_tools.as_ref())?,
156 agent_alias,
157 now.to_rfc3339(),
158 next_run.to_rfc3339(),
159 ],
160 )
161 .context("Failed to insert cron agent job")?;
162 Ok(())
163 })?;
164
165 get_job(config, &id)
166}
167
168pub fn list_jobs(config: &Config) -> Result<Vec<CronJob>> {
169 let Some(jobs) = with_read_connection(config, |conn| {
170 let mut stmt = conn.prepare(
171 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
172 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
173 allowed_tools, source, uses_memory, agent_alias
174 FROM cron_jobs ORDER BY next_run ASC",
175 )?;
176
177 let rows = stmt.query_map([], map_cron_job_row)?;
178
179 let mut jobs = Vec::new();
180 for row in rows {
181 jobs.push(row?);
182 }
183 Ok(jobs)
184 })?
185 else {
186 return Ok(Vec::new());
187 };
188
189 Ok(jobs)
190}
191
192pub fn get_job(config: &Config, job_id: &str) -> Result<CronJob> {
193 let Some(job) = with_read_connection(config, |conn| {
194 let mut stmt = conn.prepare(
195 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
196 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
197 allowed_tools, source, uses_memory, agent_alias
198 FROM cron_jobs WHERE id = ?1",
199 )?;
200
201 let mut rows = stmt.query(params![job_id])?;
202 if let Some(row) = rows.next()? {
203 map_cron_job_row(row).map_err(Into::into)
204 } else {
205 anyhow::bail!("Cron job '{job_id}' not found")
206 }
207 })?
208 else {
209 anyhow::bail!("Cron job '{job_id}' not found")
210 };
211
212 Ok(job)
213}
214
215pub fn remove_job(config: &Config, id: &str) -> Result<()> {
216 let changed = with_initialized_connection(config, |conn| {
217 conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id])
218 .context("Failed to delete cron job")
219 })?;
220
221 if changed == 0 {
222 anyhow::bail!("Cron job '{id}' not found");
223 }
224
225 println!("✅ Removed cron job {id}");
226 Ok(())
227}
228
229pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
230 let lim = i64::try_from(config.scheduler.max_tasks.max(1))
231 .context("Scheduler max_tasks overflows i64")?;
232 let Some(jobs) = with_read_connection(config, |conn| {
233 let mut stmt = conn.prepare(
234 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
235 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
236 allowed_tools, source, uses_memory, agent_alias
237 FROM cron_jobs
238 WHERE enabled = 1 AND next_run <= ?1
239 ORDER BY next_run ASC
240 LIMIT ?2",
241 )?;
242
243 let rows = stmt.query_map(params![now.to_rfc3339(), lim], map_cron_job_row)?;
244
245 let mut jobs = Vec::new();
246 for row in rows {
247 match row {
248 Ok(job) => jobs.push(job),
249 Err(e) => ::zeroclaw_log::record!(
250 WARN,
251 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
252 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
253 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
254 "Skipping cron job with unparseable row data"
255 ),
256 }
257 }
258 Ok(jobs)
259 })?
260 else {
261 return Ok(Vec::new());
262 };
263
264 Ok(jobs)
265}
266
267pub fn all_overdue_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
273 let Some(jobs) = with_read_connection(config, |conn| {
274 let mut stmt = conn.prepare(
275 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
276 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
277 allowed_tools, source, uses_memory, agent_alias
278 FROM cron_jobs
279 WHERE enabled = 1 AND next_run <= ?1
280 ORDER BY next_run ASC",
281 )?;
282
283 let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?;
284
285 let mut jobs = Vec::new();
286 for row in rows {
287 match row {
288 Ok(job) => jobs.push(job),
289 Err(e) => ::zeroclaw_log::record!(
290 WARN,
291 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
292 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
293 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
294 "Skipping cron job with unparseable row data"
295 ),
296 }
297 }
298 Ok(jobs)
299 })?
300 else {
301 return Ok(Vec::new());
302 };
303
304 Ok(jobs)
305}
306
307pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<CronJob> {
308 let mut job = get_job(config, job_id)?;
309 let mut schedule_changed = false;
310
311 if let Some(schedule) = patch.schedule {
312 validate_schedule(&schedule, Utc::now())?;
313 job.schedule = schedule;
314 job.expression = schedule_cron_expression(&job.schedule).unwrap_or_default();
315 schedule_changed = true;
316 }
317 if let Some(command) = patch.command {
318 job.command = command;
319 }
320 if let Some(prompt) = patch.prompt {
321 job.prompt = Some(prompt);
322 }
323 if let Some(name) = patch.name {
324 job.name = Some(name);
325 }
326 if let Some(enabled) = patch.enabled {
327 job.enabled = enabled;
328 }
329 if let Some(delivery) = patch.delivery {
330 job.delivery = delivery;
331 }
332 if let Some(model) = patch.model {
333 job.model = Some(model);
334 }
335 if let Some(target) = patch.session_target {
336 job.session_target = target;
337 }
338 if let Some(delete_after_run) = patch.delete_after_run {
339 job.delete_after_run = delete_after_run;
340 }
341 if let Some(allowed_tools) = patch.allowed_tools {
342 if allowed_tools.is_empty() {
345 job.allowed_tools = None;
346 } else {
347 job.allowed_tools = Some(allowed_tools);
348 }
349 }
350 if let Some(uses_memory) = patch.uses_memory {
351 job.uses_memory = uses_memory;
352 }
353
354 if schedule_changed {
355 job.next_run = next_run_for_schedule(&job.schedule, Utc::now())?;
356 }
357
358 with_initialized_connection(config, |conn| {
359 conn.execute(
360 "UPDATE cron_jobs
361 SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4, prompt = ?5, name = ?6,
362 session_target = ?7, model = ?8, enabled = ?9, delivery = ?10, delete_after_run = ?11,
363 allowed_tools = ?12, next_run = ?13, uses_memory = ?14
364 WHERE id = ?15",
365 params![
366 job.expression,
367 job.command,
368 serde_json::to_string(&job.schedule)?,
369 <JobType as Into<&str>>::into(job.job_type).to_string(),
370 job.prompt,
371 job.name,
372 job.session_target.as_str(),
373 job.model,
374 if job.enabled { 1 } else { 0 },
375 serde_json::to_string(&job.delivery)?,
376 if job.delete_after_run { 1 } else { 0 },
377 encode_allowed_tools(job.allowed_tools.as_ref())?,
378 job.next_run.to_rfc3339(),
379 if job.uses_memory { 1 } else { 0 },
380 job.id,
381 ],
382 )
383 .context("Failed to update cron job")?;
384 Ok(())
385 })?;
386
387 get_job(config, job_id)
388}
389
390pub fn record_last_run(
391 config: &Config,
392 job_id: &str,
393 finished_at: DateTime<Utc>,
394 success: bool,
395 output: &str,
396) -> Result<()> {
397 let status = if success { "ok" } else { "error" };
398 record_last_run_with_status(config, job_id, finished_at, status, output)
399}
400
401pub fn record_last_run_with_status(
402 config: &Config,
403 job_id: &str,
404 finished_at: DateTime<Utc>,
405 status: &str,
406 output: &str,
407) -> Result<()> {
408 let bounded_output = truncate_cron_output(output);
409 with_initialized_connection(config, |conn| {
410 conn.execute(
411 "UPDATE cron_jobs
412 SET last_run = ?1, last_status = ?2, last_output = ?3
413 WHERE id = ?4",
414 params![finished_at.to_rfc3339(), status, bounded_output, job_id],
415 )
416 .context("Failed to update cron last run fields")?;
417 Ok(())
418 })
419}
420
421pub fn reschedule_after_run(
422 config: &Config,
423 job: &CronJob,
424 success: bool,
425 output: &str,
426) -> Result<()> {
427 let status = if success { "ok" } else { "error" };
428 reschedule_after_run_with_status(config, job, status, output)
429}
430
431pub fn reschedule_after_run_with_status(
432 config: &Config,
433 job: &CronJob,
434 status: &str,
435 output: &str,
436) -> Result<()> {
437 let now = Utc::now();
438 let bounded_output = truncate_cron_output(output);
439
440 if matches!(job.schedule, Schedule::At { .. }) {
443 with_initialized_connection(config, |conn| {
444 conn.execute(
445 "UPDATE cron_jobs
446 SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
447 WHERE id = ?4",
448 params![now.to_rfc3339(), status, bounded_output, job.id],
449 )
450 .context("Failed to disable completed one-shot cron job")?;
451 Ok(())
452 })
453 } else {
454 let next_run = next_run_for_schedule(&job.schedule, now)?;
455 with_initialized_connection(config, |conn| {
456 conn.execute(
457 "UPDATE cron_jobs
458 SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
459 WHERE id = ?5",
460 params![
461 next_run.to_rfc3339(),
462 now.to_rfc3339(),
463 status,
464 bounded_output,
465 job.id
466 ],
467 )
468 .context("Failed to update cron job run state")?;
469 Ok(())
470 })
471 }
472}
473
474pub fn record_run(
475 config: &Config,
476 job_id: &str,
477 started_at: DateTime<Utc>,
478 finished_at: DateTime<Utc>,
479 status: &str,
480 output: Option<&str>,
481 duration_ms: i64,
482) -> Result<()> {
483 let bounded_output = output.map(truncate_cron_output);
484 with_initialized_connection(config, |conn| {
485 let tx = conn.unchecked_transaction()?;
489
490 insert_run_and_prune(
491 &tx,
492 config,
493 job_id,
494 started_at,
495 finished_at,
496 status,
497 bounded_output.as_deref(),
498 duration_ms,
499 )?;
500
501 tx.commit()
502 .context("Failed to commit cron run transaction")?;
503 Ok(())
504 })
505}
506
507#[allow(clippy::too_many_arguments)]
508pub(crate) fn persist_run_result(
509 config: &Config,
510 job: &CronJob,
511 started_at: DateTime<Utc>,
512 finished_at: DateTime<Utc>,
513 job_state_at: DateTime<Utc>,
514 status: &str,
515 output: Option<&str>,
516 duration_ms: i64,
517 action: RunCompletionAction,
518) -> Result<()> {
519 let bounded_output = output.map(truncate_cron_output);
520
521 with_initialized_connection(config, |conn| {
522 let tx = conn.unchecked_transaction()?;
523
524 insert_run_and_prune(
525 &tx,
526 config,
527 &job.id,
528 started_at,
529 finished_at,
530 status,
531 bounded_output.as_deref(),
532 duration_ms,
533 )?;
534
535 apply_run_completion_state(
536 &tx,
537 job,
538 job_state_at,
539 status,
540 bounded_output.as_deref(),
541 action,
542 )?;
543
544 tx.commit()
545 .context("Failed to commit cron run result transaction")?;
546 Ok(())
547 })
548}
549
550pub(crate) fn persist_run_completion_state(
556 config: &Config,
557 job: &CronJob,
558 job_state_at: DateTime<Utc>,
559 status: &str,
560 output: Option<&str>,
561 action: RunCompletionAction,
562) -> Result<()> {
563 with_initialized_connection(config, |conn| {
564 apply_run_completion_state(conn, job, job_state_at, status, output, action)
565 })
566}
567
568#[allow(clippy::too_many_arguments)]
569fn insert_run_and_prune(
570 conn: &Connection,
571 config: &Config,
572 job_id: &str,
573 started_at: DateTime<Utc>,
574 finished_at: DateTime<Utc>,
575 status: &str,
576 output: Option<&str>,
577 duration_ms: i64,
578) -> Result<()> {
579 conn.execute(
580 "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
581 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
582 params![
583 job_id,
584 started_at.to_rfc3339(),
585 finished_at.to_rfc3339(),
586 status,
587 output,
588 duration_ms,
589 ],
590 )
591 .context("Failed to insert cron run")?;
592
593 let keep = i64::from(config.scheduler.max_run_history.max(1));
594 conn.execute(
595 "DELETE FROM cron_runs
596 WHERE job_id = ?1
597 AND id NOT IN (
598 SELECT id FROM cron_runs
599 WHERE job_id = ?1
600 ORDER BY started_at DESC, id DESC
601 LIMIT ?2
602 )",
603 params![job_id, keep],
604 )
605 .context("Failed to prune cron run history")?;
606
607 Ok(())
608}
609
610fn truncate_cron_output(output: &str) -> String {
611 if output.len() <= MAX_CRON_OUTPUT_BYTES {
612 return output.to_string();
613 }
614
615 if MAX_CRON_OUTPUT_BYTES <= TRUNCATED_OUTPUT_MARKER.len() {
616 return TRUNCATED_OUTPUT_MARKER.to_string();
617 }
618
619 let mut cutoff = MAX_CRON_OUTPUT_BYTES - TRUNCATED_OUTPUT_MARKER.len();
620 while cutoff > 0 && !output.is_char_boundary(cutoff) {
621 cutoff -= 1;
622 }
623
624 let mut truncated = output[..cutoff].to_string();
625 truncated.push_str(TRUNCATED_OUTPUT_MARKER);
626 truncated
627}
628
629pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result<Vec<CronRun>> {
630 let Some(runs) = with_read_connection(config, |conn| {
631 let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
632 let mut stmt = conn.prepare(
633 "SELECT id, job_id, started_at, finished_at, status, output, duration_ms
634 FROM cron_runs
635 WHERE job_id = ?1
636 ORDER BY started_at DESC, id DESC
637 LIMIT ?2",
638 )?;
639
640 let rows = stmt.query_map(params![job_id, lim], |row| {
641 Ok(CronRun {
642 id: row.get(0)?,
643 job_id: row.get(1)?,
644 started_at: parse_rfc3339(&row.get::<_, String>(2)?)
645 .map_err(sql_conversion_error)?,
646 finished_at: parse_rfc3339(&row.get::<_, String>(3)?)
647 .map_err(sql_conversion_error)?,
648 status: row.get(4)?,
649 output: row.get(5)?,
650 duration_ms: row.get(6)?,
651 })
652 })?;
653
654 let mut runs = Vec::new();
655 for row in rows {
656 runs.push(row?);
657 }
658 Ok(runs)
659 })?
660 else {
661 return Ok(Vec::new());
662 };
663
664 Ok(runs)
665}
666
667fn parse_rfc3339(raw: &str) -> Result<DateTime<Utc>> {
668 let parsed = DateTime::parse_from_rfc3339(raw)
669 .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?;
670 Ok(parsed.with_timezone(&Utc))
671}
672
673fn sql_conversion_error(err: anyhow::Error) -> rusqlite::Error {
674 rusqlite::Error::ToSqlConversionFailure(err.into())
675}
676
677fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
678 let expression: String = row.get(1)?;
679 let schedule_raw: Option<String> = row.get(3)?;
680 let schedule =
681 decode_schedule(schedule_raw.as_deref(), &expression).map_err(sql_conversion_error)?;
682
683 let delivery_raw: Option<String> = row.get(10)?;
684 let delivery = decode_delivery(delivery_raw.as_deref()).map_err(sql_conversion_error)?;
685
686 let next_run_raw: String = row.get(13)?;
687 let last_run_raw: Option<String> = row.get(14)?;
688 let created_at_raw: String = row.get(12)?;
689 let allowed_tools_raw: Option<String> = row.get(17)?;
690 let source: Option<String> = row.get(18)?;
691 let uses_memory: Option<i64> = row.get(19)?;
692 let agent_alias: Option<String> = row.get(20)?;
693
694 Ok(CronJob {
695 id: row.get(0)?,
696 expression,
697 schedule,
698 command: row.get(2)?,
699 job_type: row.get(4)?,
700 prompt: row.get(5)?,
701 name: row.get(6)?,
702 session_target: SessionTarget::parse(&row.get::<_, String>(7)?),
703 model: row.get(8)?,
704 agent_alias: agent_alias
705 .map(|s| s.trim().to_string())
706 .unwrap_or_default(),
707 enabled: row.get::<_, i64>(9)? != 0,
708 delivery,
709 delete_after_run: row.get::<_, i64>(11)? != 0,
710 source: source.unwrap_or_else(|| "imperative".to_string()),
711 uses_memory: uses_memory != Some(0),
712 created_at: parse_rfc3339(&created_at_raw).map_err(sql_conversion_error)?,
713 next_run: parse_rfc3339(&next_run_raw).map_err(sql_conversion_error)?,
714 last_run: match last_run_raw {
715 Some(raw) => Some(parse_rfc3339(&raw).map_err(sql_conversion_error)?),
716 None => None,
717 },
718 last_status: row.get(15)?,
719 last_output: row.get(16)?,
720 allowed_tools: decode_allowed_tools(allowed_tools_raw.as_deref())
721 .map_err(sql_conversion_error)?,
722 })
723}
724
725fn decode_schedule(schedule_raw: Option<&str>, expression: &str) -> Result<Schedule> {
726 if let Some(raw) = schedule_raw {
727 let trimmed = raw.trim();
728 if !trimmed.is_empty() {
729 return serde_json::from_str(trimmed)
730 .with_context(|| format!("Failed to parse cron schedule JSON: {trimmed}"));
731 }
732 }
733
734 if expression.trim().is_empty() {
735 anyhow::bail!("Missing schedule and legacy expression for cron job")
736 }
737
738 Ok(Schedule::Cron {
739 expr: expression.to_string(),
740 tz: None,
741 })
742}
743
744fn decode_delivery(delivery_raw: Option<&str>) -> Result<DeliveryConfig> {
745 if let Some(raw) = delivery_raw {
746 let trimmed = raw.trim();
747 if !trimmed.is_empty() {
748 return serde_json::from_str(trimmed)
749 .with_context(|| format!("Failed to parse cron delivery JSON: {trimmed}"));
750 }
751 }
752 Ok(DeliveryConfig::default())
753}
754
755fn encode_allowed_tools(allowed_tools: Option<&Vec<String>>) -> Result<Option<String>> {
756 allowed_tools
757 .map(serde_json::to_string)
758 .transpose()
759 .context("Failed to serialize cron allowed_tools")
760}
761
762fn decode_allowed_tools(raw: Option<&str>) -> Result<Option<Vec<String>>> {
763 if let Some(raw) = raw {
764 let trimmed = raw.trim();
765 if !trimmed.is_empty() {
766 return serde_json::from_str(trimmed)
767 .map(Some)
768 .with_context(|| format!("Failed to parse cron allowed_tools JSON: {trimmed}"));
769 }
770 }
771 Ok(None)
772}
773
774pub fn sync_declarative_jobs(
783 config: &Config,
784 decls: &std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl>,
785) -> Result<()> {
786 use zeroclaw_config::schema::CronScheduleDecl;
787
788 if decls.is_empty() {
789 let _ = with_existing_initialized_connection(config, |conn| {
793 let deleted = conn
794 .execute("DELETE FROM cron_jobs WHERE source = 'declarative'", [])
795 .context("Failed to remove stale declarative cron jobs")?;
796 if deleted > 0 {
797 ::zeroclaw_log::record!(
798 INFO,
799 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
800 .with_attrs(::serde_json::json!({"count": deleted})),
801 "Removed declarative cron jobs no longer in config"
802 );
803 }
804 Ok(())
805 })?;
806 return Ok(());
807 }
808
809 for (id, decl) in decls {
811 validate_decl(id, decl)?;
812 }
813
814 let now = Utc::now();
815
816 with_initialized_connection(config, |conn| {
817 let config_ids: std::collections::HashSet<&str> =
819 decls.keys().map(String::as_str).collect();
820
821 {
823 let mut stmt = conn.prepare("SELECT id FROM cron_jobs WHERE source = 'declarative'")?;
824 let db_ids: Vec<String> = stmt
825 .query_map([], |row| row.get(0))?
826 .filter_map(|r| r.ok())
827 .collect();
828
829 for db_id in &db_ids {
830 if !config_ids.contains(db_id.as_str()) {
831 conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![db_id])
832 .with_context(|| {
833 format!("Failed to remove stale declarative cron job '{db_id}'")
834 })?;
835 ::zeroclaw_log::record!(
836 INFO,
837 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
838 .with_attrs(::serde_json::json!({"job_id": db_id})),
839 "Removed declarative cron job no longer in config"
840 );
841 }
842 }
843 }
844
845 for (id, decl) in decls {
846 let schedule = convert_schedule_decl(&decl.schedule)?;
847 let expression = schedule_cron_expression(&schedule).unwrap_or_default();
848 let schedule_json = serde_json::to_string(&schedule)?;
849 let job_type = &decl.job_type;
850 let session_target = decl.session_target.as_deref().unwrap_or("isolated");
851 let delivery = match &decl.delivery {
852 Some(d) => convert_delivery_decl(d),
853 None => DeliveryConfig::default(),
854 };
855 let delivery_json = serde_json::to_string(&delivery)?;
856 let allowed_tools_json = encode_allowed_tools(decl.allowed_tools.as_ref())?;
857 let command = decl.command.as_deref().unwrap_or("");
858 let delete_after_run = matches!(decl.schedule, CronScheduleDecl::At { .. });
859
860 let exists: bool = conn
862 .prepare("SELECT COUNT(*) FROM cron_jobs WHERE id = ?1")?
863 .query_row(params![id], |row| row.get::<_, i64>(0))
864 .map(|c| c > 0)
865 .unwrap_or(false);
866
867 if exists {
868 let current_schedule_raw: Option<String> = conn
872 .prepare("SELECT schedule FROM cron_jobs WHERE id = ?1")?
873 .query_row(params![id], |row| row.get(0))
874 .ok();
875
876 let schedule_changed = current_schedule_raw.as_deref() != Some(&schedule_json);
877
878 if schedule_changed {
879 let next_run = next_run_for_schedule(&schedule, now)?;
880 conn.execute(
881 "UPDATE cron_jobs
882 SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
883 prompt = ?5, name = ?6, session_target = ?7, model = ?8,
884 enabled = ?9, delivery = ?10, delete_after_run = ?11,
885 allowed_tools = ?12, source = 'declarative', next_run = ?13,
886 uses_memory = ?14
887 WHERE id = ?15",
888 params![
889 expression,
890 command,
891 schedule_json,
892 job_type,
893 decl.prompt,
894 decl.name,
895 session_target,
896 decl.model,
897 i32::from(decl.enabled),
898 delivery_json,
899 i32::from(delete_after_run),
900 allowed_tools_json,
901 next_run.to_rfc3339(),
902 i32::from(decl.uses_memory),
903 id,
904 ],
905 )
906 .with_context(|| format!("Failed to update declarative cron job '{id}'"))?;
907 } else {
908 conn.execute(
909 "UPDATE cron_jobs
910 SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
911 prompt = ?5, name = ?6, session_target = ?7, model = ?8,
912 enabled = ?9, delivery = ?10, delete_after_run = ?11,
913 allowed_tools = ?12, source = 'declarative',
914 uses_memory = ?13
915 WHERE id = ?14",
916 params![
917 expression,
918 command,
919 schedule_json,
920 job_type,
921 decl.prompt,
922 decl.name,
923 session_target,
924 decl.model,
925 i32::from(decl.enabled),
926 delivery_json,
927 i32::from(delete_after_run),
928 allowed_tools_json,
929 i32::from(decl.uses_memory),
930 id,
931 ],
932 )
933 .with_context(|| format!("Failed to update declarative cron job '{id}'"))?;
934 }
935
936 ::zeroclaw_log::record!(
937 DEBUG,
938 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
939 .with_attrs(::serde_json::json!({"job_id": id})),
940 "Updated declarative cron job"
941 );
942 } else {
943 let Some(agent_alias) = config.agent_for_cron_job(id) else {
948 ::zeroclaw_log::record!(
949 WARN,
950 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
951 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
952 .with_attrs(::serde_json::json!({"job_id": id})),
953 "Skipping declarative cron job: no [agents.<x>].cron_jobs entry claims this id"
954 );
955 continue;
956 };
957 let next_run = next_run_for_schedule(&schedule, now)?;
958 conn.execute(
959 "INSERT INTO cron_jobs (
960 id, expression, command, schedule, job_type, prompt, name,
961 session_target, model, enabled, delivery, delete_after_run,
962 allowed_tools, source, uses_memory, agent_alias, created_at, next_run
963 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, 'declarative', ?14, ?15, ?16, ?17)",
964 params![
965 id,
966 expression,
967 command,
968 schedule_json,
969 job_type,
970 decl.prompt,
971 decl.name,
972 session_target,
973 decl.model,
974 i32::from(decl.enabled),
975 delivery_json,
976 i32::from(delete_after_run),
977 allowed_tools_json,
978 i32::from(decl.uses_memory),
979 agent_alias,
980 now.to_rfc3339(),
981 next_run.to_rfc3339(),
982 ],
983 )
984 .with_context(|| {
985 format!("Failed to insert declarative cron job '{id}'")
986 })?;
987
988 ::zeroclaw_log::record!(
989 INFO,
990 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
991 .with_attrs(::serde_json::json!({"job_id": id})),
992 "Inserted declarative cron job from config"
993 );
994 }
995 }
996
997 Ok(())
998 })
999}
1000
1001fn validate_decl(id: &str, decl: &zeroclaw_config::schema::CronJobDecl) -> Result<()> {
1003 if id.trim().is_empty() {
1004 anyhow::bail!("Declarative cron job has empty id");
1005 }
1006
1007 match decl.job_type.to_lowercase().as_str() {
1008 "shell" => {
1009 if decl.command.as_deref().is_none_or(|c| c.trim().is_empty()) {
1010 anyhow::bail!(
1011 "Declarative cron job '{id}': shell job requires a non-empty 'command'"
1012 );
1013 }
1014 }
1015 "agent" => {
1016 if decl.prompt.as_deref().is_none_or(|p| p.trim().is_empty()) {
1017 anyhow::bail!(
1018 "Declarative cron job '{id}': agent job requires a non-empty 'prompt'"
1019 );
1020 }
1021 }
1022 other => {
1023 anyhow::bail!(
1024 "Declarative cron job '{id}': invalid job_type '{other}', expected 'shell' or 'agent'"
1025 );
1026 }
1027 }
1028
1029 Ok(())
1030}
1031
1032fn convert_schedule_decl(decl: &zeroclaw_config::schema::CronScheduleDecl) -> Result<Schedule> {
1034 use zeroclaw_config::schema::CronScheduleDecl;
1035 match decl {
1036 CronScheduleDecl::Cron { expr, tz } => Ok(Schedule::Cron {
1037 expr: expr.clone(),
1038 tz: tz.clone(),
1039 }),
1040 CronScheduleDecl::Every { every_ms } => Ok(Schedule::Every {
1041 every_ms: *every_ms,
1042 }),
1043 CronScheduleDecl::At { at } => {
1044 let parsed = DateTime::parse_from_rfc3339(at)
1045 .with_context(|| {
1046 format!("Invalid RFC3339 timestamp in declarative cron 'at': {at}")
1047 })?
1048 .with_timezone(&Utc);
1049 Ok(Schedule::At { at: parsed })
1050 }
1051 }
1052}
1053
1054fn convert_delivery_decl(decl: &zeroclaw_config::schema::DeliveryConfigDecl) -> DeliveryConfig {
1056 DeliveryConfig {
1057 mode: decl.mode.clone(),
1058 channel: decl.channel.clone(),
1059 to: decl.to.clone(),
1060 thread_id: decl.thread_id.clone(),
1061 best_effort: decl.best_effort,
1062 }
1063}
1064
1065fn add_column_if_missing(conn: &Connection, name: &str, sql_type: &str) -> Result<()> {
1066 let mut stmt = conn.prepare("PRAGMA table_info(cron_jobs)")?;
1067 let mut rows = stmt.query([])?;
1068 while let Some(row) = rows.next()? {
1069 let col_name: String = row.get(1)?;
1070 if col_name == name {
1071 return Ok(());
1072 }
1073 }
1074 drop(rows);
1076 drop(stmt);
1077
1078 match conn.execute(
1081 &format!("ALTER TABLE cron_jobs ADD COLUMN {name} {sql_type}"),
1082 [],
1083 ) {
1084 Ok(_) => Ok(()),
1085 Err(rusqlite::Error::SqliteFailure(err, Some(ref msg)))
1086 if msg.contains("duplicate column name") =>
1087 {
1088 ::zeroclaw_log::record!(
1089 DEBUG,
1090 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1091 .with_attrs(::serde_json::json!({"error": format!("{}", err), "name": name})),
1092 "Column cron_jobs. already exists (concurrent migration)"
1093 );
1094 Ok(())
1095 }
1096 Err(e) => Err(e).with_context(|| format!("Failed to add cron_jobs.{name}")),
1097 }
1098}
1099
1100fn cron_db_path(config: &Config) -> std::path::PathBuf {
1101 config.data_dir.join("cron").join("jobs.db")
1102}
1103
1104fn with_read_connection<T>(
1108 config: &Config,
1109 f: impl FnOnce(&Connection) -> Result<T>,
1110) -> Result<Option<T>> {
1111 with_existing_initialized_connection(config, f)
1112}
1113
1114fn with_existing_initialized_connection<T>(
1115 config: &Config,
1116 f: impl FnOnce(&Connection) -> Result<T>,
1117) -> Result<Option<T>> {
1118 let db_path = cron_db_path(config);
1119 if !db_path.exists() {
1120 return Ok(None);
1121 }
1122
1123 let conn = Connection::open_with_flags(
1124 &db_path,
1125 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
1126 )
1127 .with_context(|| {
1128 format!(
1129 "Failed to open existing cron DB: {}",
1130 db_path.display().to_string()
1131 )
1132 })?;
1133
1134 initialize_schema(&conn)?;
1135
1136 f(&conn).map(Some)
1137}
1138
1139fn with_initialized_connection<T>(
1140 config: &Config,
1141 f: impl FnOnce(&Connection) -> Result<T>,
1142) -> Result<T> {
1143 let db_path = cron_db_path(config);
1144 #[cfg(test)]
1145 {
1146 let mut counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
1147 .lock()
1148 .unwrap_or_else(|e| e.into_inner());
1149 if let Some(count) = counts.get_mut(&db_path) {
1150 *count += 1;
1151 }
1152 }
1153
1154 if let Some(parent) = db_path.parent() {
1155 std::fs::create_dir_all(parent).with_context(|| {
1156 format!(
1157 "Failed to create cron directory: {}",
1158 parent.display().to_string()
1159 )
1160 })?;
1161 }
1162
1163 let conn = Connection::open(&db_path)
1164 .with_context(|| format!("Failed to open cron DB: {}", db_path.display().to_string()))?;
1165
1166 initialize_schema(&conn)?;
1167
1168 f(&conn)
1169}
1170
1171fn apply_run_completion_state(
1177 conn: &Connection,
1178 job: &CronJob,
1179 job_state_at: DateTime<Utc>,
1180 status: &str,
1181 output: Option<&str>,
1182 action: RunCompletionAction,
1183) -> Result<()> {
1184 let bounded_output = output.map(truncate_cron_output);
1185
1186 match action {
1187 RunCompletionAction::Reschedule => {
1188 let next_run = next_run_for_schedule(&job.schedule, job_state_at)?;
1189 let changed = conn
1190 .execute(
1191 "UPDATE cron_jobs
1192 SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
1193 WHERE id = ?5",
1194 params![
1195 next_run.to_rfc3339(),
1196 job_state_at.to_rfc3339(),
1197 status,
1198 bounded_output.as_deref(),
1199 job.id,
1200 ],
1201 )
1202 .context("Failed to update cron job run state")?;
1203 if changed == 0 {
1204 anyhow::bail!("Cron job '{}' not found", job.id);
1205 }
1206 }
1207 RunCompletionAction::Disable => {
1208 let changed = conn
1209 .execute(
1210 "UPDATE cron_jobs
1211 SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
1212 WHERE id = ?4",
1213 params![
1214 job_state_at.to_rfc3339(),
1215 status,
1216 bounded_output.as_deref(),
1217 job.id,
1218 ],
1219 )
1220 .context("Failed to disable completed one-shot cron job")?;
1221 if changed == 0 {
1222 anyhow::bail!("Cron job '{}' not found", job.id);
1223 }
1224 }
1225 RunCompletionAction::Delete => {
1226 let changed = conn
1227 .execute("DELETE FROM cron_jobs WHERE id = ?1", params![job.id])
1228 .context("Failed to delete completed one-shot cron job")?;
1229 if changed == 0 {
1230 anyhow::bail!("Cron job '{}' not found", job.id);
1231 }
1232 }
1233 }
1234
1235 Ok(())
1236}
1237
1238fn initialize_schema(conn: &Connection) -> Result<()> {
1239 conn.execute_batch(
1240 "PRAGMA foreign_keys = ON;
1241 CREATE TABLE IF NOT EXISTS cron_jobs (
1242 id TEXT PRIMARY KEY,
1243 expression TEXT NOT NULL,
1244 command TEXT NOT NULL,
1245 schedule TEXT,
1246 job_type TEXT NOT NULL DEFAULT 'shell',
1247 prompt TEXT,
1248 name TEXT,
1249 session_target TEXT NOT NULL DEFAULT 'isolated',
1250 model TEXT,
1251 enabled INTEGER NOT NULL DEFAULT 1,
1252 delivery TEXT,
1253 delete_after_run INTEGER NOT NULL DEFAULT 0,
1254 allowed_tools TEXT,
1255 created_at TEXT NOT NULL,
1256 next_run TEXT NOT NULL,
1257 last_run TEXT,
1258 last_status TEXT,
1259 last_output TEXT
1260 );
1261 CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run);
1262
1263 CREATE TABLE IF NOT EXISTS cron_runs (
1264 id INTEGER PRIMARY KEY AUTOINCREMENT,
1265 job_id TEXT NOT NULL,
1266 started_at TEXT NOT NULL,
1267 finished_at TEXT NOT NULL,
1268 status TEXT NOT NULL,
1269 output TEXT,
1270 duration_ms INTEGER,
1271 FOREIGN KEY (job_id) REFERENCES cron_jobs(id) ON DELETE CASCADE
1272 );
1273 CREATE INDEX IF NOT EXISTS idx_cron_runs_job_id ON cron_runs(job_id);
1274 CREATE INDEX IF NOT EXISTS idx_cron_runs_started_at ON cron_runs(started_at);
1275 CREATE INDEX IF NOT EXISTS idx_cron_runs_job_started ON cron_runs(job_id, started_at);",
1276 )
1277 .context("Failed to initialize cron schema")?;
1278
1279 add_column_if_missing(conn, "schedule", "TEXT")?;
1280 add_column_if_missing(conn, "job_type", "TEXT NOT NULL DEFAULT 'shell'")?;
1281 add_column_if_missing(conn, "prompt", "TEXT")?;
1282 add_column_if_missing(conn, "name", "TEXT")?;
1283 add_column_if_missing(conn, "session_target", "TEXT NOT NULL DEFAULT 'isolated'")?;
1284 add_column_if_missing(conn, "model", "TEXT")?;
1285 add_column_if_missing(conn, "enabled", "INTEGER NOT NULL DEFAULT 1")?;
1286 add_column_if_missing(conn, "delivery", "TEXT")?;
1287 add_column_if_missing(conn, "delete_after_run", "INTEGER NOT NULL DEFAULT 0")?;
1288 add_column_if_missing(conn, "allowed_tools", "TEXT")?;
1289 add_column_if_missing(conn, "source", "TEXT DEFAULT 'imperative'")?;
1290 add_column_if_missing(conn, "uses_memory", "INTEGER NOT NULL DEFAULT 1")?;
1291 add_column_if_missing(conn, "agent_alias", "TEXT NOT NULL DEFAULT ''")?;
1295
1296 Ok(())
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use super::*;
1302 use chrono::Duration as ChronoDuration;
1303 use tempfile::TempDir;
1304 use zeroclaw_config::schema::Config;
1305
1306 fn test_config(tmp: &TempDir) -> Config {
1307 let config = Config {
1308 data_dir: tmp.path().join("data"),
1309 config_path: tmp.path().join("config.toml"),
1310 ..Config::default()
1311 };
1312 std::fs::create_dir_all(&config.data_dir).unwrap();
1313 config
1314 }
1315
1316 fn cron_dir(config: &Config) -> std::path::PathBuf {
1317 config.data_dir.join("cron")
1318 }
1319
1320 fn cron_db(config: &Config) -> std::path::PathBuf {
1321 cron_dir(config).join("jobs.db")
1322 }
1323
1324 #[test]
1325 fn read_only_queries_on_empty_workspace_do_not_initialize_cron_db() {
1326 let tmp = TempDir::new().unwrap();
1327 let config = test_config(&tmp);
1328
1329 assert!(list_jobs(&config).unwrap().is_empty());
1330 assert!(due_jobs(&config, Utc::now()).unwrap().is_empty());
1331 assert!(all_overdue_jobs(&config, Utc::now()).unwrap().is_empty());
1332 assert!(list_runs(&config, "missing", 10).unwrap().is_empty());
1333
1334 let err = get_job(&config, "missing").unwrap_err();
1335 assert!(err.to_string().contains("not found"));
1336
1337 assert!(
1338 !cron_dir(&config).exists(),
1339 "read-only queries should not create the cron directory"
1340 );
1341 assert!(
1342 !cron_db(&config).exists(),
1343 "read-only queries should not create jobs.db"
1344 );
1345 }
1346
1347 #[test]
1348 fn first_write_initializes_schema_and_follow_up_reads_work() {
1349 let tmp = TempDir::new().unwrap();
1350 let config = test_config(&tmp);
1351
1352 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1353
1354 assert!(cron_db(&config).exists());
1355 assert_eq!(get_job(&config, &job.id).unwrap().id, job.id);
1356 assert_eq!(list_jobs(&config).unwrap().len(), 1);
1357 }
1358
1359 #[test]
1360 fn empty_declarative_sync_on_empty_workspace_does_not_initialize_cron_db() {
1361 let tmp = TempDir::new().unwrap();
1362 let config = test_config(&tmp);
1363
1364 sync_declarative_jobs(&config, &std::collections::HashMap::new()).unwrap();
1365
1366 assert!(
1367 !cron_dir(&config).exists(),
1368 "empty declarative sync should not create the cron directory"
1369 );
1370 assert!(
1371 !cron_db(&config).exists(),
1372 "empty declarative sync should not create jobs.db"
1373 );
1374 }
1375
1376 #[test]
1377 fn read_existing_old_schema_db_migrates_before_querying_new_columns() {
1378 let tmp = TempDir::new().unwrap();
1379 let config = test_config(&tmp);
1380 let cron_dir = cron_dir(&config);
1381 std::fs::create_dir_all(&cron_dir).unwrap();
1382 let db_path = cron_db(&config);
1383 let conn = Connection::open(&db_path).unwrap();
1384 conn.execute_batch(
1385 "CREATE TABLE cron_jobs (
1386 id TEXT PRIMARY KEY,
1387 expression TEXT NOT NULL,
1388 command TEXT NOT NULL,
1389 schedule TEXT,
1390 job_type TEXT NOT NULL DEFAULT 'shell',
1391 prompt TEXT,
1392 name TEXT,
1393 session_target TEXT NOT NULL DEFAULT 'isolated',
1394 model TEXT,
1395 enabled INTEGER NOT NULL DEFAULT 1,
1396 delivery TEXT,
1397 delete_after_run INTEGER NOT NULL DEFAULT 0,
1398 allowed_tools TEXT,
1399 created_at TEXT NOT NULL,
1400 next_run TEXT NOT NULL,
1401 last_run TEXT,
1402 last_status TEXT,
1403 last_output TEXT
1404 );",
1405 )
1406 .unwrap();
1407 conn.execute(
1408 "INSERT INTO cron_jobs (
1409 id, expression, command, schedule, job_type, session_target,
1410 enabled, delete_after_run, created_at, next_run
1411 ) VALUES (?1, ?2, ?3, ?4, 'shell', 'isolated', 1, 0, ?5, ?6)",
1412 params![
1413 "legacy-schema",
1414 "*/5 * * * *",
1415 "echo legacy",
1416 Option::<String>::None,
1417 Utc::now().to_rfc3339(),
1418 (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1419 ],
1420 )
1421 .unwrap();
1422 drop(conn);
1423
1424 let jobs = list_jobs(&config).unwrap();
1425
1426 assert_eq!(jobs.len(), 1);
1427 assert_eq!(jobs[0].id, "legacy-schema");
1428 assert_eq!(jobs[0].source, "imperative");
1429 assert!(jobs[0].uses_memory);
1430
1431 let conn = Connection::open(&db_path).unwrap();
1432 let columns: Vec<String> = conn
1433 .prepare("PRAGMA table_info(cron_jobs)")
1434 .unwrap()
1435 .query_map([], |row| row.get(1))
1436 .unwrap()
1437 .collect::<Result<_, _>>()
1438 .unwrap();
1439 assert!(columns.iter().any(|name| name == "source"));
1440 assert!(columns.iter().any(|name| name == "uses_memory"));
1441 }
1442
1443 #[test]
1444 fn add_job_accepts_five_field_expression() {
1445 let tmp = TempDir::new().unwrap();
1446 let config = test_config(&tmp);
1447
1448 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1449 assert_eq!(job.expression, "*/5 * * * *");
1450 assert_eq!(job.command, "echo ok");
1451 assert!(matches!(job.schedule, Schedule::Cron { .. }));
1452 }
1453
1454 #[test]
1455 fn add_shell_job_marks_at_schedule_for_auto_delete() {
1456 let tmp = TempDir::new().unwrap();
1457 let config = test_config(&tmp);
1458
1459 let one_shot = add_shell_job(
1460 &config,
1461 "default",
1462 None,
1463 Schedule::At {
1464 at: Utc::now() + ChronoDuration::minutes(10),
1465 },
1466 "echo once",
1467 None,
1468 )
1469 .unwrap();
1470 assert!(one_shot.delete_after_run);
1471
1472 let recurring = add_shell_job(
1473 &config,
1474 "default",
1475 None,
1476 Schedule::Every { every_ms: 60_000 },
1477 "echo recurring",
1478 None,
1479 )
1480 .unwrap();
1481 assert!(!recurring.delete_after_run);
1482 }
1483
1484 #[test]
1485 fn add_shell_job_persists_delivery() {
1486 let tmp = TempDir::new().unwrap();
1487 let config = test_config(&tmp);
1488
1489 let job = add_shell_job(
1490 &config,
1491 "default",
1492 Some("deliver-shell".into()),
1493 Schedule::Cron {
1494 expr: "*/5 * * * *".into(),
1495 tz: None,
1496 },
1497 "echo delivered",
1498 Some(DeliveryConfig {
1499 mode: "announce".into(),
1500 channel: Some("discord".into()),
1501 to: Some("1234567890".into()),
1502 thread_id: None,
1503 best_effort: true,
1504 }),
1505 )
1506 .unwrap();
1507
1508 assert_eq!(job.delivery.mode, "announce");
1509 assert_eq!(job.delivery.channel.as_deref(), Some("discord"));
1510 assert_eq!(job.delivery.to.as_deref(), Some("1234567890"));
1511
1512 let stored = get_job(&config, &job.id).unwrap();
1513 assert_eq!(stored.delivery.mode, "announce");
1514 assert_eq!(stored.delivery.channel.as_deref(), Some("discord"));
1515 assert_eq!(stored.delivery.to.as_deref(), Some("1234567890"));
1516 }
1517
1518 #[test]
1519 fn add_agent_job_rejects_invalid_announce_delivery() {
1520 let tmp = TempDir::new().unwrap();
1521 let config = test_config(&tmp);
1522
1523 let err = add_agent_job(
1524 &config,
1525 "default",
1526 Some("deliver-agent".into()),
1527 Schedule::Cron {
1528 expr: "*/5 * * * *".into(),
1529 tz: None,
1530 },
1531 "summarize logs",
1532 SessionTarget::Isolated,
1533 None,
1534 Some(DeliveryConfig {
1535 mode: "announce".into(),
1536 channel: Some("discord".into()),
1537 to: None,
1538 thread_id: None,
1539 best_effort: true,
1540 }),
1541 false,
1542 None,
1543 )
1544 .unwrap_err();
1545
1546 assert!(err.to_string().contains("delivery.to is required"));
1547 }
1548
1549 #[test]
1550 fn add_shell_job_rejects_invalid_delivery_mode() {
1551 let tmp = TempDir::new().unwrap();
1552 let config = test_config(&tmp);
1553
1554 let err = add_shell_job(
1555 &config,
1556 "default",
1557 Some("deliver-shell".into()),
1558 Schedule::Cron {
1559 expr: "*/5 * * * *".into(),
1560 tz: None,
1561 },
1562 "echo delivered",
1563 Some(DeliveryConfig {
1564 mode: "annouce".into(),
1565 channel: Some("discord".into()),
1566 to: Some("1234567890".into()),
1567 thread_id: None,
1568 best_effort: true,
1569 }),
1570 )
1571 .unwrap_err();
1572
1573 assert!(err.to_string().contains("unsupported delivery mode"));
1574 }
1575
1576 #[test]
1577 fn add_list_remove_roundtrip() {
1578 let tmp = TempDir::new().unwrap();
1579 let config = test_config(&tmp);
1580
1581 let job = add_job(&config, "test-agent", "*/10 * * * *", "echo roundtrip").unwrap();
1582 let listed = list_jobs(&config).unwrap();
1583 assert_eq!(listed.len(), 1);
1584 assert_eq!(listed[0].id, job.id);
1585
1586 remove_job(&config, &job.id).unwrap();
1587 assert!(list_jobs(&config).unwrap().is_empty());
1588 }
1589
1590 #[test]
1591 fn due_jobs_filters_by_timestamp_and_enabled() {
1592 let tmp = TempDir::new().unwrap();
1593 let config = test_config(&tmp);
1594
1595 let job = add_job(&config, "test-agent", "* * * * *", "echo due").unwrap();
1596
1597 let due_now = due_jobs(&config, Utc::now()).unwrap();
1598 assert!(due_now.is_empty(), "new job should not be due immediately");
1599
1600 let far_future = Utc::now() + ChronoDuration::days(365);
1601 let due_future = due_jobs(&config, far_future).unwrap();
1602 assert_eq!(due_future.len(), 1, "job should be due in far future");
1603
1604 let _ = update_job(
1605 &config,
1606 &job.id,
1607 CronJobPatch {
1608 enabled: Some(false),
1609 ..CronJobPatch::default()
1610 },
1611 )
1612 .unwrap();
1613 let due_after_disable = due_jobs(&config, far_future).unwrap();
1614 assert!(due_after_disable.is_empty());
1615 }
1616
1617 #[test]
1618 fn due_jobs_respects_scheduler_max_tasks_limit() {
1619 let tmp = TempDir::new().unwrap();
1620 let mut config = test_config(&tmp);
1621 config.scheduler.max_tasks = 2;
1622
1623 let _ = add_job(&config, "test-agent", "* * * * *", "echo due-1").unwrap();
1624 let _ = add_job(&config, "test-agent", "* * * * *", "echo due-2").unwrap();
1625 let _ = add_job(&config, "test-agent", "* * * * *", "echo due-3").unwrap();
1626
1627 let far_future = Utc::now() + ChronoDuration::days(365);
1628 let due = due_jobs(&config, far_future).unwrap();
1629 assert_eq!(due.len(), 2);
1630 }
1631
1632 #[test]
1633 fn all_overdue_jobs_ignores_max_tasks_limit() {
1634 let tmp = TempDir::new().unwrap();
1635 let mut config = test_config(&tmp);
1636 config.scheduler.max_tasks = 2;
1637
1638 let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-1").unwrap();
1639 let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-2").unwrap();
1640 let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-3").unwrap();
1641
1642 let far_future = Utc::now() + ChronoDuration::days(365);
1643 let due = due_jobs(&config, far_future).unwrap();
1645 assert_eq!(due.len(), 2);
1646 let overdue = all_overdue_jobs(&config, far_future).unwrap();
1648 assert_eq!(overdue.len(), 3);
1649 }
1650
1651 #[test]
1652 fn all_overdue_jobs_excludes_disabled_jobs() {
1653 let tmp = TempDir::new().unwrap();
1654 let config = test_config(&tmp);
1655
1656 let job = add_job(&config, "test-agent", "* * * * *", "echo disabled").unwrap();
1657 let _ = update_job(
1658 &config,
1659 &job.id,
1660 CronJobPatch {
1661 enabled: Some(false),
1662 ..CronJobPatch::default()
1663 },
1664 )
1665 .unwrap();
1666
1667 let far_future = Utc::now() + ChronoDuration::days(365);
1668 let overdue = all_overdue_jobs(&config, far_future).unwrap();
1669 assert!(overdue.is_empty());
1670 }
1671
1672 #[test]
1673 fn add_agent_job_persists_allowed_tools() {
1674 let tmp = TempDir::new().unwrap();
1675 let config = test_config(&tmp);
1676
1677 let job = add_agent_job(
1678 &config,
1679 "default",
1680 Some("agent".into()),
1681 Schedule::Every { every_ms: 60_000 },
1682 "do work",
1683 SessionTarget::Isolated,
1684 None,
1685 None,
1686 false,
1687 Some(vec!["file_read".into(), "web_search".into()]),
1688 )
1689 .unwrap();
1690
1691 assert_eq!(
1692 job.allowed_tools,
1693 Some(vec!["file_read".into(), "web_search".into()])
1694 );
1695
1696 let stored = get_job(&config, &job.id).unwrap();
1697 assert_eq!(stored.allowed_tools, job.allowed_tools);
1698 }
1699
1700 #[test]
1701 fn update_job_persists_allowed_tools_patch() {
1702 let tmp = TempDir::new().unwrap();
1703 let config = test_config(&tmp);
1704
1705 let job = add_agent_job(
1706 &config,
1707 "default",
1708 Some("agent".into()),
1709 Schedule::Every { every_ms: 60_000 },
1710 "do work",
1711 SessionTarget::Isolated,
1712 None,
1713 None,
1714 false,
1715 None,
1716 )
1717 .unwrap();
1718
1719 let updated = update_job(
1720 &config,
1721 &job.id,
1722 CronJobPatch {
1723 allowed_tools: Some(vec!["shell".into()]),
1724 ..CronJobPatch::default()
1725 },
1726 )
1727 .unwrap();
1728
1729 assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
1730 assert_eq!(
1731 get_job(&config, &job.id).unwrap().allowed_tools,
1732 Some(vec!["shell".into()])
1733 );
1734 }
1735
1736 #[test]
1737 fn reschedule_after_run_persists_last_status_and_last_run() {
1738 let tmp = TempDir::new().unwrap();
1739 let config = test_config(&tmp);
1740
1741 let job = add_job(&config, "test-agent", "*/15 * * * *", "echo run").unwrap();
1742 reschedule_after_run(&config, &job, false, "failed output").unwrap();
1743
1744 let listed = list_jobs(&config).unwrap();
1745 let stored = listed.iter().find(|j| j.id == job.id).unwrap();
1746 assert_eq!(stored.last_status.as_deref(), Some("error"));
1747 assert!(stored.last_run.is_some());
1748 assert_eq!(stored.last_output.as_deref(), Some("failed output"));
1749 }
1750
1751 #[test]
1752 fn job_type_from_sql_reads_valid_value() {
1753 let tmp = TempDir::new().unwrap();
1754 let config = test_config(&tmp);
1755 let now = Utc::now();
1756
1757 with_initialized_connection(&config, |conn| {
1758 conn.execute(
1759 "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1760 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1761 params![
1762 "job-type-valid",
1763 "*/5 * * * *",
1764 "echo ok",
1765 Option::<String>::None,
1766 "agent",
1767 now.to_rfc3339(),
1768 (now + ChronoDuration::minutes(5)).to_rfc3339(),
1769 ],
1770 )?;
1771 Ok(())
1772 })
1773 .unwrap();
1774
1775 let job = get_job(&config, "job-type-valid").unwrap();
1776 assert_eq!(job.job_type, JobType::Agent);
1777 }
1778
1779 #[test]
1780 fn job_type_from_sql_rejects_invalid_value() {
1781 let tmp = TempDir::new().unwrap();
1782 let config = test_config(&tmp);
1783 let now = Utc::now();
1784
1785 with_initialized_connection(&config, |conn| {
1786 conn.execute(
1787 "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1788 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1789 params![
1790 "job-type-invalid",
1791 "*/5 * * * *",
1792 "echo ok",
1793 Option::<String>::None,
1794 "unknown",
1795 now.to_rfc3339(),
1796 (now + ChronoDuration::minutes(5)).to_rfc3339(),
1797 ],
1798 )?;
1799 Ok(())
1800 })
1801 .unwrap();
1802
1803 assert!(get_job(&config, "job-type-invalid").is_err());
1804 }
1805
1806 #[test]
1807 fn migration_falls_back_to_legacy_expression() {
1808 let tmp = TempDir::new().unwrap();
1809 let config = test_config(&tmp);
1810
1811 with_initialized_connection(&config, |conn| {
1812 conn.execute(
1813 "INSERT INTO cron_jobs (id, expression, command, created_at, next_run)
1814 VALUES (?1, ?2, ?3, ?4, ?5)",
1815 params![
1816 "legacy-id",
1817 "*/5 * * * *",
1818 "echo legacy",
1819 Utc::now().to_rfc3339(),
1820 (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1821 ],
1822 )?;
1823 conn.execute(
1824 "UPDATE cron_jobs SET schedule = NULL WHERE id = 'legacy-id'",
1825 [],
1826 )?;
1827 Ok(())
1828 })
1829 .unwrap();
1830
1831 let job = get_job(&config, "legacy-id").unwrap();
1832 assert!(matches!(job.schedule, Schedule::Cron { .. }));
1833 }
1834
1835 #[test]
1836 fn record_and_prune_runs() {
1837 let tmp = TempDir::new().unwrap();
1838 let mut config = test_config(&tmp);
1839 config.scheduler.max_run_history = 2;
1840 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1841 let base = Utc::now();
1842
1843 for idx in 0..3 {
1844 let start = base + ChronoDuration::seconds(idx);
1845 let end = start + ChronoDuration::milliseconds(100);
1846 record_run(&config, &job.id, start, end, "ok", Some("done"), 100).unwrap();
1847 }
1848
1849 let runs = list_runs(&config, &job.id, 10).unwrap();
1850 assert_eq!(runs.len(), 2);
1851 }
1852
1853 #[test]
1854 fn remove_job_cascades_run_history() {
1855 let tmp = TempDir::new().unwrap();
1856 let config = test_config(&tmp);
1857 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1858 let start = Utc::now();
1859 record_run(
1860 &config,
1861 &job.id,
1862 start,
1863 start + ChronoDuration::milliseconds(5),
1864 "ok",
1865 Some("ok"),
1866 5,
1867 )
1868 .unwrap();
1869
1870 remove_job(&config, &job.id).unwrap();
1871 let runs = list_runs(&config, &job.id, 10).unwrap();
1872 assert!(runs.is_empty());
1873 }
1874
1875 #[test]
1876 fn record_run_truncates_large_output() {
1877 let tmp = TempDir::new().unwrap();
1878 let config = test_config(&tmp);
1879 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo trunc").unwrap();
1880 let output = "x".repeat(MAX_CRON_OUTPUT_BYTES + 512);
1881
1882 record_run(
1883 &config,
1884 &job.id,
1885 Utc::now(),
1886 Utc::now(),
1887 "ok",
1888 Some(&output),
1889 1,
1890 )
1891 .unwrap();
1892
1893 let runs = list_runs(&config, &job.id, 1).unwrap();
1894 let stored = runs[0].output.as_deref().unwrap_or_default();
1895 assert!(stored.ends_with(TRUNCATED_OUTPUT_MARKER));
1896 assert!(stored.len() <= MAX_CRON_OUTPUT_BYTES);
1897 }
1898
1899 #[test]
1900 fn reschedule_after_run_disables_at_schedule_job() {
1901 let tmp = TempDir::new().unwrap();
1902 let config = test_config(&tmp);
1903 let at = Utc::now() + ChronoDuration::minutes(10);
1904 let job = add_shell_job(
1905 &config,
1906 "test-agent",
1907 None,
1908 Schedule::At { at },
1909 "echo once",
1910 None,
1911 )
1912 .unwrap();
1913
1914 reschedule_after_run(&config, &job, true, "done").unwrap();
1915
1916 let stored = get_job(&config, &job.id).unwrap();
1917 assert!(
1918 !stored.enabled,
1919 "At schedule job should be disabled after reschedule"
1920 );
1921 assert_eq!(stored.last_status.as_deref(), Some("ok"));
1922 }
1923
1924 #[test]
1925 fn reschedule_after_run_disables_at_schedule_job_on_failure() {
1926 let tmp = TempDir::new().unwrap();
1927 let config = test_config(&tmp);
1928 let at = Utc::now() + ChronoDuration::minutes(10);
1929 let job = add_shell_job(
1930 &config,
1931 "test-agent",
1932 None,
1933 Schedule::At { at },
1934 "echo once",
1935 None,
1936 )
1937 .unwrap();
1938
1939 reschedule_after_run(&config, &job, false, "failed").unwrap();
1940
1941 let stored = get_job(&config, &job.id).unwrap();
1942 assert!(
1943 !stored.enabled,
1944 "At schedule job should be disabled after reschedule even on failure"
1945 );
1946 assert_eq!(stored.last_status.as_deref(), Some("error"));
1947 assert_eq!(stored.last_output.as_deref(), Some("failed"));
1948 }
1949
1950 #[test]
1951 fn reschedule_after_run_truncates_last_output() {
1952 let tmp = TempDir::new().unwrap();
1953 let config = test_config(&tmp);
1954 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo trunc").unwrap();
1955 let output = "y".repeat(MAX_CRON_OUTPUT_BYTES + 1024);
1956
1957 reschedule_after_run(&config, &job, false, &output).unwrap();
1958
1959 let stored = get_job(&config, &job.id).unwrap();
1960 let last_output = stored.last_output.as_deref().unwrap_or_default();
1961 assert!(last_output.ends_with(TRUNCATED_OUTPUT_MARKER));
1962 assert!(last_output.len() <= MAX_CRON_OUTPUT_BYTES);
1963 }
1964
1965 fn make_shell_decl(
1968 id: &str,
1969 expr: &str,
1970 cmd: &str,
1971 ) -> (String, zeroclaw_config::schema::CronJobDecl) {
1972 (
1973 id.to_string(),
1974 zeroclaw_config::schema::CronJobDecl {
1975 name: Some(format!("decl-{id}")),
1976 job_type: "shell".to_string(),
1977 schedule: zeroclaw_config::schema::CronScheduleDecl::Cron {
1978 expr: expr.to_string(),
1979 tz: None,
1980 },
1981 command: Some(cmd.to_string()),
1982 prompt: None,
1983 enabled: true,
1984 model: None,
1985 allowed_tools: None,
1986 uses_memory: true,
1987 session_target: None,
1988 delivery: None,
1989 },
1990 )
1991 }
1992
1993 fn make_agent_decl(
1994 id: &str,
1995 expr: &str,
1996 prompt: &str,
1997 ) -> (String, zeroclaw_config::schema::CronJobDecl) {
1998 (
1999 id.to_string(),
2000 zeroclaw_config::schema::CronJobDecl {
2001 name: Some(format!("decl-{id}")),
2002 job_type: "agent".to_string(),
2003 schedule: zeroclaw_config::schema::CronScheduleDecl::Cron {
2004 expr: expr.to_string(),
2005 tz: None,
2006 },
2007 command: None,
2008 prompt: Some(prompt.to_string()),
2009 enabled: true,
2010 model: None,
2011 allowed_tools: None,
2012 uses_memory: true,
2013 session_target: None,
2014 delivery: None,
2015 },
2016 )
2017 }
2018
2019 fn decls_map(
2020 items: Vec<(String, zeroclaw_config::schema::CronJobDecl)>,
2021 ) -> std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl> {
2022 items.into_iter().collect()
2023 }
2024
2025 fn seed_claiming_agent(config: &mut Config, ids: &[&str]) {
2028 config.agents.insert(
2029 "test-agent".to_string(),
2030 zeroclaw_config::schema::AliasedAgentConfig {
2031 enabled: true,
2032 cron_jobs: ids.iter().map(|s| (*s).to_string()).collect(),
2033 ..Default::default()
2034 },
2035 );
2036 }
2037
2038 #[test]
2039 fn sync_inserts_new_declarative_job() {
2040 let tmp = TempDir::new().unwrap();
2041 let mut config = test_config(&tmp);
2042 seed_claiming_agent(&mut config, &["daily-backup"]);
2043
2044 let decls = decls_map(vec![make_shell_decl(
2045 "daily-backup",
2046 "0 2 * * *",
2047 "echo backup",
2048 )]);
2049 sync_declarative_jobs(&config, &decls).unwrap();
2050
2051 let job = get_job(&config, "daily-backup").unwrap();
2052 assert_eq!(job.command, "echo backup");
2053 assert_eq!(job.source, "declarative");
2054 assert_eq!(job.name.as_deref(), Some("decl-daily-backup"));
2055 }
2056
2057 #[test]
2058 fn sync_updates_existing_declarative_job() {
2059 let tmp = TempDir::new().unwrap();
2060 let mut config = test_config(&tmp);
2061 seed_claiming_agent(&mut config, &["updatable"]);
2062
2063 let decls = decls_map(vec![make_shell_decl("updatable", "0 2 * * *", "echo v1")]);
2064 sync_declarative_jobs(&config, &decls).unwrap();
2065
2066 let job_v1 = get_job(&config, "updatable").unwrap();
2067 assert_eq!(job_v1.command, "echo v1");
2068
2069 let decls_v2 = decls_map(vec![make_shell_decl("updatable", "0 3 * * *", "echo v2")]);
2070 sync_declarative_jobs(&config, &decls_v2).unwrap();
2071
2072 let job_v2 = get_job(&config, "updatable").unwrap();
2073 assert_eq!(job_v2.command, "echo v2");
2074 assert_eq!(job_v2.expression, "0 3 * * *");
2075 assert_eq!(job_v2.source, "declarative");
2076 }
2077
2078 #[test]
2079 fn sync_does_not_delete_imperative_jobs() {
2080 let tmp = TempDir::new().unwrap();
2081 let mut config = test_config(&tmp);
2082 seed_claiming_agent(&mut config, &["my-decl"]);
2083
2084 let imperative = add_job(&config, "test-agent", "*/10 * * * *", "echo imperative").unwrap();
2086
2087 let decls = decls_map(vec![make_shell_decl("my-decl", "0 2 * * *", "echo decl")]);
2089 sync_declarative_jobs(&config, &decls).unwrap();
2090
2091 let still_there = get_job(&config, &imperative.id).unwrap();
2093 assert_eq!(still_there.command, "echo imperative");
2094 assert_eq!(still_there.source, "imperative");
2095
2096 let decl_job = get_job(&config, "my-decl").unwrap();
2098 assert_eq!(decl_job.command, "echo decl");
2099 }
2100
2101 #[test]
2102 fn sync_removes_stale_declarative_jobs() {
2103 let tmp = TempDir::new().unwrap();
2104 let mut config = test_config(&tmp);
2105 seed_claiming_agent(&mut config, &["keeper", "stale"]);
2106
2107 let decls = decls_map(vec![
2109 make_shell_decl("keeper", "0 2 * * *", "echo keep"),
2110 make_shell_decl("stale", "0 3 * * *", "echo stale"),
2111 ]);
2112 sync_declarative_jobs(&config, &decls).unwrap();
2113
2114 let decls_v2 = decls_map(vec![make_shell_decl("keeper", "0 2 * * *", "echo keep")]);
2116 sync_declarative_jobs(&config, &decls_v2).unwrap();
2117
2118 assert!(get_job(&config, "stale").is_err());
2119 assert!(get_job(&config, "keeper").is_ok());
2120 }
2121
2122 #[test]
2123 fn sync_empty_removes_all_declarative_jobs() {
2124 let tmp = TempDir::new().unwrap();
2125 let mut config = test_config(&tmp);
2126 seed_claiming_agent(&mut config, &["to-remove"]);
2127
2128 let decls = decls_map(vec![make_shell_decl("to-remove", "0 2 * * *", "echo bye")]);
2129 sync_declarative_jobs(&config, &decls).unwrap();
2130 assert!(get_job(&config, "to-remove").is_ok());
2131
2132 sync_declarative_jobs(&config, &std::collections::HashMap::new()).unwrap();
2134 assert!(get_job(&config, "to-remove").is_err());
2135 }
2136
2137 #[test]
2138 fn sync_validates_shell_job_requires_command() {
2139 let tmp = TempDir::new().unwrap();
2140 let config = test_config(&tmp);
2141
2142 let (id, mut decl) = make_shell_decl("bad", "0 2 * * *", "echo ok");
2143 decl.command = None;
2144
2145 let decls = decls_map(vec![(id, decl)]);
2146 let result = sync_declarative_jobs(&config, &decls);
2147 assert!(result.is_err());
2148 assert!(result.unwrap_err().to_string().contains("command"));
2149 }
2150
2151 #[test]
2152 fn sync_validates_agent_job_requires_prompt() {
2153 let tmp = TempDir::new().unwrap();
2154 let config = test_config(&tmp);
2155
2156 let (id, mut decl) = make_agent_decl("bad-agent", "0 2 * * *", "do stuff");
2157 decl.prompt = None;
2158
2159 let decls = decls_map(vec![(id, decl)]);
2160 let result = sync_declarative_jobs(&config, &decls);
2161 assert!(result.is_err());
2162 assert!(result.unwrap_err().to_string().contains("prompt"));
2163 }
2164
2165 #[test]
2166 fn sync_agent_job_inserts_correctly() {
2167 let tmp = TempDir::new().unwrap();
2168 let mut config = test_config(&tmp);
2169 seed_claiming_agent(&mut config, &["agent-check"]);
2170
2171 let decls = decls_map(vec![make_agent_decl(
2172 "agent-check",
2173 "*/15 * * * *",
2174 "check health",
2175 )]);
2176 sync_declarative_jobs(&config, &decls).unwrap();
2177
2178 let job = get_job(&config, "agent-check").unwrap();
2179 assert_eq!(job.job_type, JobType::Agent);
2180 assert_eq!(job.prompt.as_deref(), Some("check health"));
2181 assert_eq!(job.source, "declarative");
2182 }
2183
2184 #[test]
2185 fn sync_every_schedule_works() {
2186 let tmp = TempDir::new().unwrap();
2187 let mut config = test_config(&tmp);
2188 seed_claiming_agent(&mut config, &["interval-job"]);
2189
2190 let decl = zeroclaw_config::schema::CronJobDecl {
2191 name: None,
2192 job_type: "shell".to_string(),
2193 schedule: zeroclaw_config::schema::CronScheduleDecl::Every { every_ms: 60000 },
2194 command: Some("echo interval".to_string()),
2195 prompt: None,
2196 enabled: true,
2197 model: None,
2198 allowed_tools: None,
2199 uses_memory: true,
2200 session_target: None,
2201 delivery: None,
2202 };
2203
2204 let mut decls = std::collections::HashMap::new();
2205 decls.insert("interval-job".to_string(), decl);
2206 sync_declarative_jobs(&config, &decls).unwrap();
2207
2208 let job = get_job(&config, "interval-job").unwrap();
2209 assert!(matches!(job.schedule, Schedule::Every { every_ms: 60000 }));
2210 assert_eq!(job.command, "echo interval");
2211 }
2212
2213 #[test]
2214 fn declarative_config_parses_from_toml() {
2215 let toml_str = r#"
2217[cron.daily-report]
2218name = "Daily Report"
2219job_type = "shell"
2220command = "echo report"
2221schedule = { kind = "cron", expr = "0 9 * * *" }
2222
2223[cron.health-check]
2224job_type = "agent"
2225prompt = "Check server health"
2226schedule = { kind = "every", every_ms = 300000 }
2227 "#;
2228
2229 #[derive(serde::Deserialize)]
2230 struct Wrap {
2231 cron: std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl>,
2232 }
2233 let parsed: Wrap = toml::from_str(toml_str).unwrap();
2234 assert_eq!(parsed.cron.len(), 2);
2235
2236 let report = parsed.cron.get("daily-report").unwrap();
2237 assert_eq!(report.command.as_deref(), Some("echo report"));
2238 assert!(matches!(
2239 report.schedule,
2240 zeroclaw_config::schema::CronScheduleDecl::Cron { ref expr, .. } if expr == "0 9 * * *"
2241 ));
2242
2243 let health = parsed.cron.get("health-check").unwrap();
2244 assert_eq!(health.job_type, "agent");
2245 assert_eq!(health.prompt.as_deref(), Some("Check server health"));
2246 assert!(matches!(
2247 health.schedule,
2248 zeroclaw_config::schema::CronScheduleDecl::Every { every_ms: 300_000 }
2249 ));
2250 }
2251}