1use crate::cron::{
2 CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
3 next_run_for_schedule, schedule_cron_expression, validate_delivery_config, validate_schedule,
4};
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use rusqlite::types::{FromSqlResult, ValueRef};
8use rusqlite::{Connection, OpenFlags, params};
9use uuid::Uuid;
10use zeroclaw_config::schema::Config;
11
12const MAX_CRON_OUTPUT_BYTES: usize = 16 * 1024;
13const TRUNCATED_OUTPUT_MARKER: &str = "\n...[truncated]";
14
15#[derive(Clone, Copy, Debug, Eq, PartialEq)]
16pub(crate) enum RunCompletionAction {
17 Reschedule,
18 Disable,
19 Delete,
20}
21
22#[cfg(test)]
23static WRITE_CONNECTION_COUNTS_FOR_TESTS: std::sync::LazyLock<
24 std::sync::Mutex<std::collections::HashMap<std::path::PathBuf, usize>>,
25> = std::sync::LazyLock::new(|| std::sync::Mutex::new(std::collections::HashMap::new()));
26
27#[cfg(test)]
28pub(crate) fn reset_write_connection_count_for_tests(config: &Config) {
29 let mut counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
30 .lock()
31 .unwrap_or_else(|e| e.into_inner());
32 counts.insert(cron_db_path(config), 0);
33}
34
35#[cfg(test)]
36pub(crate) fn write_connection_count_for_tests(config: &Config) -> usize {
37 let counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
38 .lock()
39 .unwrap_or_else(|e| e.into_inner());
40 counts.get(&cron_db_path(config)).copied().unwrap_or(0)
41}
42
43impl rusqlite::types::FromSql for JobType {
44 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
45 let text = value.as_str()?;
46 JobType::try_from(text).map_err(|e| rusqlite::types::FromSqlError::Other(e.into()))
47 }
48}
49
50#[cfg(test)]
51pub fn add_job(
52 config: &Config,
53 agent_alias: &str,
54 expression: &str,
55 command: &str,
56) -> Result<CronJob> {
57 let schedule = Schedule::Cron {
58 expr: expression.to_string(),
59 tz: None,
60 };
61 add_shell_job(config, agent_alias, None, schedule, command, None)
62}
63
64pub fn add_shell_job(
65 config: &Config,
66 agent_alias: &str,
67 name: Option<String>,
68 schedule: Schedule,
69 command: &str,
70 delivery: Option<DeliveryConfig>,
71) -> Result<CronJob> {
72 let now = Utc::now();
73 validate_schedule(&schedule, now)?;
74 validate_delivery_config(delivery.as_ref())?;
75 let next_run = next_run_for_schedule(&schedule, now)?;
76 let id = Uuid::new_v4().to_string();
77 let expression = schedule_cron_expression(&schedule).unwrap_or_default();
78 let schedule_json = serde_json::to_string(&schedule)?;
79 let delivery = delivery.unwrap_or_default();
80
81 let delete_after_run = matches!(schedule, Schedule::At { .. });
82 let agent_alias = agent_alias.trim();
83 if agent_alias.is_empty() {
84 anyhow::bail!("agent_alias is required; cron jobs must name an owning agent");
85 }
86
87 with_initialized_connection(config, |conn| {
88 conn.execute(
89 "INSERT INTO cron_jobs (
90 id, expression, command, schedule, job_type, prompt, name, session_target, model,
91 enabled, delivery, delete_after_run, agent_alias, created_at, next_run
92 ) VALUES (?1, ?2, ?3, ?4, 'shell', NULL, ?5, 'isolated', NULL, 1, ?6, ?7, ?8, ?9, ?10)",
93 params![
94 id,
95 expression,
96 command,
97 schedule_json,
98 name,
99 serde_json::to_string(&delivery)?,
100 if delete_after_run { 1 } else { 0 },
101 agent_alias,
102 now.to_rfc3339(),
103 next_run.to_rfc3339(),
104 ],
105 )
106 .context("Failed to insert cron shell job")?;
107 Ok(())
108 })?;
109
110 get_job(config, &id)
111}
112
113#[allow(clippy::too_many_arguments)]
114pub fn add_agent_job(
115 config: &Config,
116 agent_alias: &str,
117 name: Option<String>,
118 schedule: Schedule,
119 prompt: &str,
120 session_target: SessionTarget,
121 model: Option<String>,
122 delivery: Option<DeliveryConfig>,
123 delete_after_run: bool,
124 allowed_tools: Option<Vec<String>>,
125) -> Result<CronJob> {
126 let now = Utc::now();
127 validate_schedule(&schedule, now)?;
128 validate_delivery_config(delivery.as_ref())?;
129 let next_run = next_run_for_schedule(&schedule, now)?;
130 let id = Uuid::new_v4().to_string();
131 let expression = schedule_cron_expression(&schedule).unwrap_or_default();
132 let schedule_json = serde_json::to_string(&schedule)?;
133 let delivery = delivery.unwrap_or_default();
134 let agent_alias = agent_alias.trim();
135 if agent_alias.is_empty() {
136 anyhow::bail!("agent_alias is required; cron jobs must name an owning agent");
137 }
138
139 with_initialized_connection(config, |conn| {
140 conn.execute(
141 "INSERT INTO cron_jobs (
142 id, expression, command, schedule, job_type, prompt, name, session_target, model,
143 enabled, delivery, delete_after_run, allowed_tools, agent_alias, created_at, next_run
144 ) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11, ?12, ?13)",
145 params![
146 id,
147 expression,
148 schedule_json,
149 prompt,
150 name,
151 session_target.as_str(),
152 model,
153 serde_json::to_string(&delivery)?,
154 if delete_after_run { 1 } else { 0 },
155 encode_allowed_tools(allowed_tools.as_ref())?,
156 agent_alias,
157 now.to_rfc3339(),
158 next_run.to_rfc3339(),
159 ],
160 )
161 .context("Failed to insert cron agent job")?;
162 Ok(())
163 })?;
164
165 get_job(config, &id)
166}
167
168pub fn list_jobs(config: &Config) -> Result<Vec<CronJob>> {
169 let Some(jobs) = with_read_connection(config, |conn| {
170 let mut stmt = conn.prepare(
171 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
172 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
173 allowed_tools, source, uses_memory, agent_alias
174 FROM cron_jobs ORDER BY next_run ASC",
175 )?;
176
177 let rows = stmt.query_map([], map_cron_job_row)?;
178
179 let mut jobs = Vec::new();
180 for row in rows {
181 jobs.push(row?);
182 }
183 Ok(jobs)
184 })?
185 else {
186 return Ok(Vec::new());
187 };
188
189 Ok(jobs)
190}
191
192pub fn get_job(config: &Config, job_id: &str) -> Result<CronJob> {
193 let Some(job) = with_read_connection(config, |conn| {
194 let mut stmt = conn.prepare(
195 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
196 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
197 allowed_tools, source, uses_memory, agent_alias
198 FROM cron_jobs WHERE id = ?1",
199 )?;
200
201 let mut rows = stmt.query(params![job_id])?;
202 if let Some(row) = rows.next()? {
203 map_cron_job_row(row).map_err(Into::into)
204 } else {
205 anyhow::bail!("Cron job '{job_id}' not found")
206 }
207 })?
208 else {
209 anyhow::bail!("Cron job '{job_id}' not found")
210 };
211
212 Ok(job)
213}
214
215pub fn remove_job(config: &Config, id: &str) -> Result<()> {
216 let changed = with_initialized_connection(config, |conn| {
217 conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id])
218 .context("Failed to delete cron job")
219 })?;
220
221 if changed == 0 {
222 anyhow::bail!("Cron job '{id}' not found");
223 }
224
225 println!("✅ Removed cron job {id}");
226 Ok(())
227}
228
229pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
230 let lim = i64::try_from(config.scheduler.max_tasks.max(1))
231 .context("Scheduler max_tasks overflows i64")?;
232 let Some(jobs) = with_read_connection(config, |conn| {
233 let mut stmt = conn.prepare(
234 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
235 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
236 allowed_tools, source, uses_memory, agent_alias
237 FROM cron_jobs
238 WHERE enabled = 1 AND next_run <= ?1
239 ORDER BY next_run ASC
240 LIMIT ?2",
241 )?;
242
243 let rows = stmt.query_map(params![now.to_rfc3339(), lim], map_cron_job_row)?;
244
245 let mut jobs = Vec::new();
246 for row in rows {
247 match row {
248 Ok(job) => jobs.push(job),
249 Err(e) => ::zeroclaw_log::record!(
250 WARN,
251 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
252 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
253 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
254 "Skipping cron job with unparseable row data"
255 ),
256 }
257 }
258 Ok(jobs)
259 })?
260 else {
261 return Ok(Vec::new());
262 };
263
264 Ok(jobs)
265}
266
267pub fn all_overdue_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
273 let Some(jobs) = with_read_connection(config, |conn| {
274 let mut stmt = conn.prepare(
275 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
276 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
277 allowed_tools, source, uses_memory, agent_alias
278 FROM cron_jobs
279 WHERE enabled = 1 AND next_run <= ?1
280 ORDER BY next_run ASC",
281 )?;
282
283 let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?;
284
285 let mut jobs = Vec::new();
286 for row in rows {
287 match row {
288 Ok(job) => jobs.push(job),
289 Err(e) => ::zeroclaw_log::record!(
290 WARN,
291 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
292 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
293 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
294 "Skipping cron job with unparseable row data"
295 ),
296 }
297 }
298 Ok(jobs)
299 })?
300 else {
301 return Ok(Vec::new());
302 };
303
304 Ok(jobs)
305}
306
307pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<CronJob> {
308 let mut job = get_job(config, job_id)?;
309 let mut schedule_changed = false;
310
311 if let Some(schedule) = patch.schedule {
312 validate_schedule(&schedule, Utc::now())?;
313 job.schedule = schedule;
314 job.expression = schedule_cron_expression(&job.schedule).unwrap_or_default();
315 schedule_changed = true;
316 }
317 if let Some(command) = patch.command {
318 job.command = command;
319 }
320 if let Some(prompt) = patch.prompt {
321 job.prompt = Some(prompt);
322 }
323 if let Some(name) = patch.name {
324 job.name = Some(name);
325 }
326 if let Some(enabled) = patch.enabled {
327 job.enabled = enabled;
328 }
329 if let Some(delivery) = patch.delivery {
330 job.delivery = delivery;
331 }
332 if let Some(model) = patch.model {
333 job.model = Some(model);
334 }
335 if let Some(target) = patch.session_target {
336 job.session_target = target;
337 }
338 if let Some(delete_after_run) = patch.delete_after_run {
339 job.delete_after_run = delete_after_run;
340 }
341 if let Some(allowed_tools) = patch.allowed_tools {
342 if allowed_tools.is_empty() {
345 job.allowed_tools = None;
346 } else {
347 job.allowed_tools = Some(allowed_tools);
348 }
349 }
350 if let Some(uses_memory) = patch.uses_memory {
351 job.uses_memory = uses_memory;
352 }
353
354 if schedule_changed {
355 job.next_run = next_run_for_schedule(&job.schedule, Utc::now())?;
356 }
357
358 with_initialized_connection(config, |conn| {
359 conn.execute(
360 "UPDATE cron_jobs
361 SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4, prompt = ?5, name = ?6,
362 session_target = ?7, model = ?8, enabled = ?9, delivery = ?10, delete_after_run = ?11,
363 allowed_tools = ?12, next_run = ?13, uses_memory = ?14
364 WHERE id = ?15",
365 params![
366 job.expression,
367 job.command,
368 serde_json::to_string(&job.schedule)?,
369 <JobType as Into<&str>>::into(job.job_type).to_string(),
370 job.prompt,
371 job.name,
372 job.session_target.as_str(),
373 job.model,
374 if job.enabled { 1 } else { 0 },
375 serde_json::to_string(&job.delivery)?,
376 if job.delete_after_run { 1 } else { 0 },
377 encode_allowed_tools(job.allowed_tools.as_ref())?,
378 job.next_run.to_rfc3339(),
379 if job.uses_memory { 1 } else { 0 },
380 job.id,
381 ],
382 )
383 .context("Failed to update cron job")?;
384 Ok(())
385 })?;
386
387 get_job(config, job_id)
388}
389
390pub fn record_last_run(
391 config: &Config,
392 job_id: &str,
393 finished_at: DateTime<Utc>,
394 success: bool,
395 output: &str,
396) -> Result<()> {
397 let status = if success { "ok" } else { "error" };
398 record_last_run_with_status(config, job_id, finished_at, status, output)
399}
400
401pub fn record_last_run_with_status(
402 config: &Config,
403 job_id: &str,
404 finished_at: DateTime<Utc>,
405 status: &str,
406 output: &str,
407) -> Result<()> {
408 let bounded_output = truncate_cron_output(output);
409 with_initialized_connection(config, |conn| {
410 conn.execute(
411 "UPDATE cron_jobs
412 SET last_run = ?1, last_status = ?2, last_output = ?3
413 WHERE id = ?4",
414 params![finished_at.to_rfc3339(), status, bounded_output, job_id],
415 )
416 .context("Failed to update cron last run fields")?;
417 Ok(())
418 })
419}
420
421pub fn reschedule_after_run(
422 config: &Config,
423 job: &CronJob,
424 success: bool,
425 output: &str,
426) -> Result<()> {
427 let status = if success { "ok" } else { "error" };
428 reschedule_after_run_with_status(config, job, status, output)
429}
430
431pub fn reschedule_after_run_with_status(
432 config: &Config,
433 job: &CronJob,
434 status: &str,
435 output: &str,
436) -> Result<()> {
437 let now = Utc::now();
438 let bounded_output = truncate_cron_output(output);
439
440 if matches!(job.schedule, Schedule::At { .. }) {
443 with_initialized_connection(config, |conn| {
444 conn.execute(
445 "UPDATE cron_jobs
446 SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
447 WHERE id = ?4",
448 params![now.to_rfc3339(), status, bounded_output, job.id],
449 )
450 .context("Failed to disable completed one-shot cron job")?;
451 Ok(())
452 })
453 } else {
454 let next_run = next_run_for_schedule(&job.schedule, now)?;
455 with_initialized_connection(config, |conn| {
456 conn.execute(
457 "UPDATE cron_jobs
458 SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
459 WHERE id = ?5",
460 params![
461 next_run.to_rfc3339(),
462 now.to_rfc3339(),
463 status,
464 bounded_output,
465 job.id
466 ],
467 )
468 .context("Failed to update cron job run state")?;
469 Ok(())
470 })
471 }
472}
473
474pub fn skip_missed_run(config: &Config, job: &CronJob, now: DateTime<Utc>) -> Result<()> {
483 if matches!(job.schedule, Schedule::At { .. }) {
484 let bounded_output = truncate_cron_output("skipped — catch_up_on_startup disabled");
487 with_initialized_connection(config, |conn| {
488 conn.execute(
489 "UPDATE cron_jobs
490 SET enabled = 0, last_run = ?1, last_status = 'skipped', last_output = ?2
491 WHERE id = ?3",
492 params![now.to_rfc3339(), bounded_output, job.id],
493 )
494 .context("Failed to disable overdue one-shot cron job on startup skip")?;
495 Ok(())
496 })
497 } else {
498 let next_run = next_run_for_schedule(&job.schedule, now)?;
500 with_initialized_connection(config, |conn| {
501 conn.execute(
502 "UPDATE cron_jobs SET next_run = ?1 WHERE id = ?2",
503 params![next_run.to_rfc3339(), job.id],
504 )
505 .context("Failed to advance next_run on startup skip")?;
506 Ok(())
507 })
508 }
509}
510
511pub fn record_run(
512 config: &Config,
513 job_id: &str,
514 started_at: DateTime<Utc>,
515 finished_at: DateTime<Utc>,
516 status: &str,
517 output: Option<&str>,
518 duration_ms: i64,
519) -> Result<()> {
520 let bounded_output = output.map(truncate_cron_output);
521 with_initialized_connection(config, |conn| {
522 let tx = conn.unchecked_transaction()?;
526
527 insert_run_and_prune(
528 &tx,
529 config,
530 job_id,
531 started_at,
532 finished_at,
533 status,
534 bounded_output.as_deref(),
535 duration_ms,
536 )?;
537
538 tx.commit()
539 .context("Failed to commit cron run transaction")?;
540 Ok(())
541 })
542}
543
544#[allow(clippy::too_many_arguments)]
545pub(crate) fn persist_run_result(
546 config: &Config,
547 job: &CronJob,
548 started_at: DateTime<Utc>,
549 finished_at: DateTime<Utc>,
550 job_state_at: DateTime<Utc>,
551 status: &str,
552 output: Option<&str>,
553 duration_ms: i64,
554 action: RunCompletionAction,
555) -> Result<()> {
556 let bounded_output = output.map(truncate_cron_output);
557
558 with_initialized_connection(config, |conn| {
559 let tx = conn.unchecked_transaction()?;
560
561 insert_run_and_prune(
562 &tx,
563 config,
564 &job.id,
565 started_at,
566 finished_at,
567 status,
568 bounded_output.as_deref(),
569 duration_ms,
570 )?;
571
572 apply_run_completion_state(
573 &tx,
574 job,
575 job_state_at,
576 status,
577 bounded_output.as_deref(),
578 action,
579 )?;
580
581 tx.commit()
582 .context("Failed to commit cron run result transaction")?;
583 Ok(())
584 })
585}
586
587pub(crate) fn persist_run_completion_state(
593 config: &Config,
594 job: &CronJob,
595 job_state_at: DateTime<Utc>,
596 status: &str,
597 output: Option<&str>,
598 action: RunCompletionAction,
599) -> Result<()> {
600 with_initialized_connection(config, |conn| {
601 apply_run_completion_state(conn, job, job_state_at, status, output, action)
602 })
603}
604
605#[allow(clippy::too_many_arguments)]
606fn insert_run_and_prune(
607 conn: &Connection,
608 config: &Config,
609 job_id: &str,
610 started_at: DateTime<Utc>,
611 finished_at: DateTime<Utc>,
612 status: &str,
613 output: Option<&str>,
614 duration_ms: i64,
615) -> Result<()> {
616 conn.execute(
617 "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
618 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
619 params![
620 job_id,
621 started_at.to_rfc3339(),
622 finished_at.to_rfc3339(),
623 status,
624 output,
625 duration_ms,
626 ],
627 )
628 .context("Failed to insert cron run")?;
629
630 let keep = i64::from(config.scheduler.max_run_history.max(1));
631 conn.execute(
632 "DELETE FROM cron_runs
633 WHERE job_id = ?1
634 AND id NOT IN (
635 SELECT id FROM cron_runs
636 WHERE job_id = ?1
637 ORDER BY started_at DESC, id DESC
638 LIMIT ?2
639 )",
640 params![job_id, keep],
641 )
642 .context("Failed to prune cron run history")?;
643
644 Ok(())
645}
646
647fn truncate_cron_output(output: &str) -> String {
648 if output.len() <= MAX_CRON_OUTPUT_BYTES {
649 return output.to_string();
650 }
651
652 if MAX_CRON_OUTPUT_BYTES <= TRUNCATED_OUTPUT_MARKER.len() {
653 return TRUNCATED_OUTPUT_MARKER.to_string();
654 }
655
656 let mut cutoff = MAX_CRON_OUTPUT_BYTES - TRUNCATED_OUTPUT_MARKER.len();
657 while cutoff > 0 && !output.is_char_boundary(cutoff) {
658 cutoff -= 1;
659 }
660
661 let mut truncated = output[..cutoff].to_string();
662 truncated.push_str(TRUNCATED_OUTPUT_MARKER);
663 truncated
664}
665
666pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result<Vec<CronRun>> {
667 let Some(runs) = with_read_connection(config, |conn| {
668 let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
669 let mut stmt = conn.prepare(
670 "SELECT id, job_id, started_at, finished_at, status, output, duration_ms
671 FROM cron_runs
672 WHERE job_id = ?1
673 ORDER BY started_at DESC, id DESC
674 LIMIT ?2",
675 )?;
676
677 let rows = stmt.query_map(params![job_id, lim], |row| {
678 Ok(CronRun {
679 id: row.get(0)?,
680 job_id: row.get(1)?,
681 started_at: parse_rfc3339(&row.get::<_, String>(2)?)
682 .map_err(sql_conversion_error)?,
683 finished_at: parse_rfc3339(&row.get::<_, String>(3)?)
684 .map_err(sql_conversion_error)?,
685 status: row.get(4)?,
686 output: row.get(5)?,
687 duration_ms: row.get(6)?,
688 })
689 })?;
690
691 let mut runs = Vec::new();
692 for row in rows {
693 runs.push(row?);
694 }
695 Ok(runs)
696 })?
697 else {
698 return Ok(Vec::new());
699 };
700
701 Ok(runs)
702}
703
704fn parse_rfc3339(raw: &str) -> Result<DateTime<Utc>> {
705 let parsed = DateTime::parse_from_rfc3339(raw)
706 .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?;
707 Ok(parsed.with_timezone(&Utc))
708}
709
710fn sql_conversion_error(err: anyhow::Error) -> rusqlite::Error {
711 rusqlite::Error::ToSqlConversionFailure(err.into())
712}
713
714fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
715 let expression: String = row.get(1)?;
716 let schedule_raw: Option<String> = row.get(3)?;
717 let schedule =
718 decode_schedule(schedule_raw.as_deref(), &expression).map_err(sql_conversion_error)?;
719
720 let delivery_raw: Option<String> = row.get(10)?;
721 let delivery = decode_delivery(delivery_raw.as_deref()).map_err(sql_conversion_error)?;
722
723 let next_run_raw: String = row.get(13)?;
724 let last_run_raw: Option<String> = row.get(14)?;
725 let created_at_raw: String = row.get(12)?;
726 let allowed_tools_raw: Option<String> = row.get(17)?;
727 let source: Option<String> = row.get(18)?;
728 let uses_memory: Option<i64> = row.get(19)?;
729 let agent_alias: Option<String> = row.get(20)?;
730
731 Ok(CronJob {
732 id: row.get(0)?,
733 expression,
734 schedule,
735 command: row.get(2)?,
736 job_type: row.get(4)?,
737 prompt: row.get(5)?,
738 name: row.get(6)?,
739 session_target: SessionTarget::parse(&row.get::<_, String>(7)?),
740 model: row.get(8)?,
741 agent_alias: agent_alias
742 .map(|s| s.trim().to_string())
743 .unwrap_or_default(),
744 enabled: row.get::<_, i64>(9)? != 0,
745 delivery,
746 delete_after_run: row.get::<_, i64>(11)? != 0,
747 source: source.unwrap_or_else(|| "imperative".to_string()),
748 uses_memory: uses_memory != Some(0),
749 created_at: parse_rfc3339(&created_at_raw).map_err(sql_conversion_error)?,
750 next_run: parse_rfc3339(&next_run_raw).map_err(sql_conversion_error)?,
751 last_run: match last_run_raw {
752 Some(raw) => Some(parse_rfc3339(&raw).map_err(sql_conversion_error)?),
753 None => None,
754 },
755 last_status: row.get(15)?,
756 last_output: row.get(16)?,
757 allowed_tools: decode_allowed_tools(allowed_tools_raw.as_deref())
758 .map_err(sql_conversion_error)?,
759 })
760}
761
762fn decode_schedule(schedule_raw: Option<&str>, expression: &str) -> Result<Schedule> {
763 if let Some(raw) = schedule_raw {
764 let trimmed = raw.trim();
765 if !trimmed.is_empty() {
766 return serde_json::from_str(trimmed)
767 .with_context(|| format!("Failed to parse cron schedule JSON: {trimmed}"));
768 }
769 }
770
771 if expression.trim().is_empty() {
772 anyhow::bail!("Missing schedule and legacy expression for cron job")
773 }
774
775 Ok(Schedule::Cron {
776 expr: expression.to_string(),
777 tz: None,
778 })
779}
780
781fn decode_delivery(delivery_raw: Option<&str>) -> Result<DeliveryConfig> {
782 if let Some(raw) = delivery_raw {
783 let trimmed = raw.trim();
784 if !trimmed.is_empty() {
785 return serde_json::from_str(trimmed)
786 .with_context(|| format!("Failed to parse cron delivery JSON: {trimmed}"));
787 }
788 }
789 Ok(DeliveryConfig::default())
790}
791
792fn encode_allowed_tools(allowed_tools: Option<&Vec<String>>) -> Result<Option<String>> {
793 allowed_tools
794 .map(serde_json::to_string)
795 .transpose()
796 .context("Failed to serialize cron allowed_tools")
797}
798
799fn decode_allowed_tools(raw: Option<&str>) -> Result<Option<Vec<String>>> {
800 if let Some(raw) = raw {
801 let trimmed = raw.trim();
802 if !trimmed.is_empty() {
803 return serde_json::from_str(trimmed)
804 .map(Some)
805 .with_context(|| format!("Failed to parse cron allowed_tools JSON: {trimmed}"));
806 }
807 }
808 Ok(None)
809}
810
811pub fn sync_declarative_jobs(
820 config: &Config,
821 decls: &std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl>,
822) -> Result<()> {
823 use zeroclaw_config::schema::CronScheduleDecl;
824
825 if decls.is_empty() {
826 let _ = with_existing_initialized_connection(config, |conn| {
830 let deleted = conn
831 .execute("DELETE FROM cron_jobs WHERE source = 'declarative'", [])
832 .context("Failed to remove stale declarative cron jobs")?;
833 if deleted > 0 {
834 ::zeroclaw_log::record!(
835 INFO,
836 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
837 .with_attrs(::serde_json::json!({"count": deleted})),
838 "Removed declarative cron jobs no longer in config"
839 );
840 }
841 Ok(())
842 })?;
843 return Ok(());
844 }
845
846 for (id, decl) in decls {
848 validate_decl(id, decl)?;
849 }
850
851 let now = Utc::now();
852
853 with_initialized_connection(config, |conn| {
854 let config_ids: std::collections::HashSet<&str> =
856 decls.keys().map(String::as_str).collect();
857
858 {
860 let mut stmt = conn.prepare("SELECT id FROM cron_jobs WHERE source = 'declarative'")?;
861 let db_ids: Vec<String> = stmt
862 .query_map([], |row| row.get(0))?
863 .filter_map(|r| r.ok())
864 .collect();
865
866 for db_id in &db_ids {
867 if !config_ids.contains(db_id.as_str()) {
868 conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![db_id])
869 .with_context(|| {
870 format!("Failed to remove stale declarative cron job '{db_id}'")
871 })?;
872 ::zeroclaw_log::record!(
873 INFO,
874 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
875 .with_attrs(::serde_json::json!({"job_id": db_id})),
876 "Removed declarative cron job no longer in config"
877 );
878 }
879 }
880 }
881
882 for (id, decl) in decls {
883 let schedule = convert_schedule_decl(&decl.schedule)?;
884 let expression = schedule_cron_expression(&schedule).unwrap_or_default();
885 let schedule_json = serde_json::to_string(&schedule)?;
886 let job_type = &decl.job_type;
887 let session_target = decl.session_target.as_deref().unwrap_or("isolated");
888 let delivery = match &decl.delivery {
889 Some(d) => convert_delivery_decl(d),
890 None => DeliveryConfig::default(),
891 };
892 let delivery_json = serde_json::to_string(&delivery)?;
893 let allowed_tools_json = encode_allowed_tools(decl.allowed_tools.as_ref())?;
894 let command = decl.command.as_deref().unwrap_or("");
895 let delete_after_run = matches!(decl.schedule, CronScheduleDecl::At { .. });
896
897 let exists: bool = conn
899 .prepare("SELECT COUNT(*) FROM cron_jobs WHERE id = ?1")?
900 .query_row(params![id], |row| row.get::<_, i64>(0))
901 .map(|c| c > 0)
902 .unwrap_or(false);
903
904 if exists {
905 let current_schedule_raw: Option<String> = conn
909 .prepare("SELECT schedule FROM cron_jobs WHERE id = ?1")?
910 .query_row(params![id], |row| row.get(0))
911 .ok();
912
913 let schedule_changed = current_schedule_raw.as_deref() != Some(&schedule_json);
914
915 if schedule_changed {
916 let next_run = next_run_for_schedule(&schedule, now)?;
917 conn.execute(
918 "UPDATE cron_jobs
919 SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
920 prompt = ?5, name = ?6, session_target = ?7, model = ?8,
921 enabled = ?9, delivery = ?10, delete_after_run = ?11,
922 allowed_tools = ?12, source = 'declarative', next_run = ?13,
923 uses_memory = ?14
924 WHERE id = ?15",
925 params![
926 expression,
927 command,
928 schedule_json,
929 job_type,
930 decl.prompt,
931 decl.name,
932 session_target,
933 decl.model,
934 i32::from(decl.enabled),
935 delivery_json,
936 i32::from(delete_after_run),
937 allowed_tools_json,
938 next_run.to_rfc3339(),
939 i32::from(decl.uses_memory),
940 id,
941 ],
942 )
943 .with_context(|| format!("Failed to update declarative cron job '{id}'"))?;
944 } else {
945 conn.execute(
946 "UPDATE cron_jobs
947 SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
948 prompt = ?5, name = ?6, session_target = ?7, model = ?8,
949 enabled = ?9, delivery = ?10, delete_after_run = ?11,
950 allowed_tools = ?12, source = 'declarative',
951 uses_memory = ?13
952 WHERE id = ?14",
953 params![
954 expression,
955 command,
956 schedule_json,
957 job_type,
958 decl.prompt,
959 decl.name,
960 session_target,
961 decl.model,
962 i32::from(decl.enabled),
963 delivery_json,
964 i32::from(delete_after_run),
965 allowed_tools_json,
966 i32::from(decl.uses_memory),
967 id,
968 ],
969 )
970 .with_context(|| format!("Failed to update declarative cron job '{id}'"))?;
971 }
972
973 ::zeroclaw_log::record!(
974 DEBUG,
975 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
976 .with_attrs(::serde_json::json!({"job_id": id})),
977 "Updated declarative cron job"
978 );
979 } else {
980 let Some(agent_alias) = config.agent_for_cron_job(id) else {
985 ::zeroclaw_log::record!(
986 WARN,
987 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
988 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
989 .with_attrs(::serde_json::json!({"job_id": id})),
990 "Skipping declarative cron job: no [agents.<x>].cron_jobs entry claims this id"
991 );
992 continue;
993 };
994 let next_run = next_run_for_schedule(&schedule, now)?;
995 conn.execute(
996 "INSERT INTO cron_jobs (
997 id, expression, command, schedule, job_type, prompt, name,
998 session_target, model, enabled, delivery, delete_after_run,
999 allowed_tools, source, uses_memory, agent_alias, created_at, next_run
1000 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, 'declarative', ?14, ?15, ?16, ?17)",
1001 params![
1002 id,
1003 expression,
1004 command,
1005 schedule_json,
1006 job_type,
1007 decl.prompt,
1008 decl.name,
1009 session_target,
1010 decl.model,
1011 i32::from(decl.enabled),
1012 delivery_json,
1013 i32::from(delete_after_run),
1014 allowed_tools_json,
1015 i32::from(decl.uses_memory),
1016 agent_alias,
1017 now.to_rfc3339(),
1018 next_run.to_rfc3339(),
1019 ],
1020 )
1021 .with_context(|| {
1022 format!("Failed to insert declarative cron job '{id}'")
1023 })?;
1024
1025 ::zeroclaw_log::record!(
1026 INFO,
1027 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1028 .with_attrs(::serde_json::json!({"job_id": id})),
1029 "Inserted declarative cron job from config"
1030 );
1031 }
1032 }
1033
1034 Ok(())
1035 })
1036}
1037
1038fn validate_decl(id: &str, decl: &zeroclaw_config::schema::CronJobDecl) -> Result<()> {
1040 if id.trim().is_empty() {
1041 anyhow::bail!("Declarative cron job has empty id");
1042 }
1043
1044 match decl.job_type.to_lowercase().as_str() {
1045 "shell" => {
1046 if decl.command.as_deref().is_none_or(|c| c.trim().is_empty()) {
1047 anyhow::bail!(
1048 "Declarative cron job '{id}': shell job requires a non-empty 'command'"
1049 );
1050 }
1051 }
1052 "agent" => {
1053 if decl.prompt.as_deref().is_none_or(|p| p.trim().is_empty()) {
1054 anyhow::bail!(
1055 "Declarative cron job '{id}': agent job requires a non-empty 'prompt'"
1056 );
1057 }
1058 }
1059 other => {
1060 anyhow::bail!(
1061 "Declarative cron job '{id}': invalid job_type '{other}', expected 'shell' or 'agent'"
1062 );
1063 }
1064 }
1065
1066 Ok(())
1067}
1068
1069fn convert_schedule_decl(decl: &zeroclaw_config::schema::CronScheduleDecl) -> Result<Schedule> {
1071 use zeroclaw_config::schema::CronScheduleDecl;
1072 match decl {
1073 CronScheduleDecl::Cron { expr, tz } => Ok(Schedule::Cron {
1074 expr: expr.clone(),
1075 tz: tz.clone(),
1076 }),
1077 CronScheduleDecl::Every { every_ms } => Ok(Schedule::Every {
1078 every_ms: *every_ms,
1079 }),
1080 CronScheduleDecl::At { at } => {
1081 let parsed = DateTime::parse_from_rfc3339(at)
1082 .with_context(|| {
1083 format!("Invalid RFC3339 timestamp in declarative cron 'at': {at}")
1084 })?
1085 .with_timezone(&Utc);
1086 Ok(Schedule::At { at: parsed })
1087 }
1088 }
1089}
1090
1091fn convert_delivery_decl(decl: &zeroclaw_config::schema::DeliveryConfigDecl) -> DeliveryConfig {
1093 DeliveryConfig {
1094 mode: decl.mode.clone(),
1095 channel: decl.channel.clone(),
1096 to: decl.to.clone(),
1097 thread_id: decl.thread_id.clone(),
1098 best_effort: decl.best_effort,
1099 }
1100}
1101
1102fn add_column_if_missing(conn: &Connection, name: &str, sql_type: &str) -> Result<()> {
1103 let mut stmt = conn.prepare("PRAGMA table_info(cron_jobs)")?;
1104 let mut rows = stmt.query([])?;
1105 while let Some(row) = rows.next()? {
1106 let col_name: String = row.get(1)?;
1107 if col_name == name {
1108 return Ok(());
1109 }
1110 }
1111 drop(rows);
1113 drop(stmt);
1114
1115 match conn.execute(
1118 &format!("ALTER TABLE cron_jobs ADD COLUMN {name} {sql_type}"),
1119 [],
1120 ) {
1121 Ok(_) => Ok(()),
1122 Err(rusqlite::Error::SqliteFailure(err, Some(ref msg)))
1123 if msg.contains("duplicate column name") =>
1124 {
1125 ::zeroclaw_log::record!(
1126 DEBUG,
1127 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1128 .with_attrs(::serde_json::json!({"error": format!("{}", err), "name": name})),
1129 "Column cron_jobs. already exists (concurrent migration)"
1130 );
1131 Ok(())
1132 }
1133 Err(e) => Err(e).with_context(|| format!("Failed to add cron_jobs.{name}")),
1134 }
1135}
1136
1137fn cron_db_path(config: &Config) -> std::path::PathBuf {
1138 config.data_dir.join("cron").join("jobs.db")
1139}
1140
1141fn with_read_connection<T>(
1145 config: &Config,
1146 f: impl FnOnce(&Connection) -> Result<T>,
1147) -> Result<Option<T>> {
1148 with_existing_initialized_connection(config, f)
1149}
1150
1151fn with_existing_initialized_connection<T>(
1152 config: &Config,
1153 f: impl FnOnce(&Connection) -> Result<T>,
1154) -> Result<Option<T>> {
1155 let db_path = cron_db_path(config);
1156 if !db_path.exists() {
1157 return Ok(None);
1158 }
1159
1160 let conn = Connection::open_with_flags(
1161 &db_path,
1162 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
1163 )
1164 .with_context(|| {
1165 format!(
1166 "Failed to open existing cron DB: {}",
1167 db_path.display().to_string()
1168 )
1169 })?;
1170
1171 initialize_schema(&conn)?;
1172
1173 f(&conn).map(Some)
1174}
1175
1176fn with_initialized_connection<T>(
1177 config: &Config,
1178 f: impl FnOnce(&Connection) -> Result<T>,
1179) -> Result<T> {
1180 let db_path = cron_db_path(config);
1181 #[cfg(test)]
1182 {
1183 let mut counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
1184 .lock()
1185 .unwrap_or_else(|e| e.into_inner());
1186 if let Some(count) = counts.get_mut(&db_path) {
1187 *count += 1;
1188 }
1189 }
1190
1191 if let Some(parent) = db_path.parent() {
1192 std::fs::create_dir_all(parent).with_context(|| {
1193 format!(
1194 "Failed to create cron directory: {}",
1195 parent.display().to_string()
1196 )
1197 })?;
1198 }
1199
1200 let conn = Connection::open(&db_path)
1201 .with_context(|| format!("Failed to open cron DB: {}", db_path.display().to_string()))?;
1202
1203 initialize_schema(&conn)?;
1204
1205 f(&conn)
1206}
1207
1208fn apply_run_completion_state(
1214 conn: &Connection,
1215 job: &CronJob,
1216 job_state_at: DateTime<Utc>,
1217 status: &str,
1218 output: Option<&str>,
1219 action: RunCompletionAction,
1220) -> Result<()> {
1221 let bounded_output = output.map(truncate_cron_output);
1222
1223 match action {
1224 RunCompletionAction::Reschedule => {
1225 let next_run = next_run_for_schedule(&job.schedule, job_state_at)?;
1226 let changed = conn
1227 .execute(
1228 "UPDATE cron_jobs
1229 SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
1230 WHERE id = ?5",
1231 params![
1232 next_run.to_rfc3339(),
1233 job_state_at.to_rfc3339(),
1234 status,
1235 bounded_output.as_deref(),
1236 job.id,
1237 ],
1238 )
1239 .context("Failed to update cron job run state")?;
1240 if changed == 0 {
1241 anyhow::bail!("Cron job '{}' not found", job.id);
1242 }
1243 }
1244 RunCompletionAction::Disable => {
1245 let changed = conn
1246 .execute(
1247 "UPDATE cron_jobs
1248 SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
1249 WHERE id = ?4",
1250 params![
1251 job_state_at.to_rfc3339(),
1252 status,
1253 bounded_output.as_deref(),
1254 job.id,
1255 ],
1256 )
1257 .context("Failed to disable completed one-shot cron job")?;
1258 if changed == 0 {
1259 anyhow::bail!("Cron job '{}' not found", job.id);
1260 }
1261 }
1262 RunCompletionAction::Delete => {
1263 let changed = conn
1264 .execute("DELETE FROM cron_jobs WHERE id = ?1", params![job.id])
1265 .context("Failed to delete completed one-shot cron job")?;
1266 if changed == 0 {
1267 anyhow::bail!("Cron job '{}' not found", job.id);
1268 }
1269 }
1270 }
1271
1272 Ok(())
1273}
1274
1275fn initialize_schema(conn: &Connection) -> Result<()> {
1276 conn.execute_batch(
1277 "PRAGMA foreign_keys = ON;
1278 CREATE TABLE IF NOT EXISTS cron_jobs (
1279 id TEXT PRIMARY KEY,
1280 expression TEXT NOT NULL,
1281 command TEXT NOT NULL,
1282 schedule TEXT,
1283 job_type TEXT NOT NULL DEFAULT 'shell',
1284 prompt TEXT,
1285 name TEXT,
1286 session_target TEXT NOT NULL DEFAULT 'isolated',
1287 model TEXT,
1288 enabled INTEGER NOT NULL DEFAULT 1,
1289 delivery TEXT,
1290 delete_after_run INTEGER NOT NULL DEFAULT 0,
1291 allowed_tools TEXT,
1292 created_at TEXT NOT NULL,
1293 next_run TEXT NOT NULL,
1294 last_run TEXT,
1295 last_status TEXT,
1296 last_output TEXT
1297 );
1298 CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run);
1299
1300 CREATE TABLE IF NOT EXISTS cron_runs (
1301 id INTEGER PRIMARY KEY AUTOINCREMENT,
1302 job_id TEXT NOT NULL,
1303 started_at TEXT NOT NULL,
1304 finished_at TEXT NOT NULL,
1305 status TEXT NOT NULL,
1306 output TEXT,
1307 duration_ms INTEGER,
1308 FOREIGN KEY (job_id) REFERENCES cron_jobs(id) ON DELETE CASCADE
1309 );
1310 CREATE INDEX IF NOT EXISTS idx_cron_runs_job_id ON cron_runs(job_id);
1311 CREATE INDEX IF NOT EXISTS idx_cron_runs_started_at ON cron_runs(started_at);
1312 CREATE INDEX IF NOT EXISTS idx_cron_runs_job_started ON cron_runs(job_id, started_at);",
1313 )
1314 .context("Failed to initialize cron schema")?;
1315
1316 add_column_if_missing(conn, "schedule", "TEXT")?;
1317 add_column_if_missing(conn, "job_type", "TEXT NOT NULL DEFAULT 'shell'")?;
1318 add_column_if_missing(conn, "prompt", "TEXT")?;
1319 add_column_if_missing(conn, "name", "TEXT")?;
1320 add_column_if_missing(conn, "session_target", "TEXT NOT NULL DEFAULT 'isolated'")?;
1321 add_column_if_missing(conn, "model", "TEXT")?;
1322 add_column_if_missing(conn, "enabled", "INTEGER NOT NULL DEFAULT 1")?;
1323 add_column_if_missing(conn, "delivery", "TEXT")?;
1324 add_column_if_missing(conn, "delete_after_run", "INTEGER NOT NULL DEFAULT 0")?;
1325 add_column_if_missing(conn, "allowed_tools", "TEXT")?;
1326 add_column_if_missing(conn, "source", "TEXT DEFAULT 'imperative'")?;
1327 add_column_if_missing(conn, "uses_memory", "INTEGER NOT NULL DEFAULT 1")?;
1328 add_column_if_missing(conn, "agent_alias", "TEXT NOT NULL DEFAULT ''")?;
1332
1333 Ok(())
1334}
1335
1336#[cfg(test)]
1337mod tests {
1338 use super::*;
1339 use chrono::Duration as ChronoDuration;
1340 use tempfile::TempDir;
1341 use zeroclaw_config::schema::Config;
1342
1343 fn test_config(tmp: &TempDir) -> Config {
1344 let config = Config {
1345 data_dir: tmp.path().join("data"),
1346 config_path: tmp.path().join("config.toml"),
1347 ..Config::default()
1348 };
1349 std::fs::create_dir_all(&config.data_dir).unwrap();
1350 config
1351 }
1352
1353 fn cron_dir(config: &Config) -> std::path::PathBuf {
1354 config.data_dir.join("cron")
1355 }
1356
1357 fn cron_db(config: &Config) -> std::path::PathBuf {
1358 cron_dir(config).join("jobs.db")
1359 }
1360
1361 #[test]
1362 fn read_only_queries_on_empty_workspace_do_not_initialize_cron_db() {
1363 let tmp = TempDir::new().unwrap();
1364 let config = test_config(&tmp);
1365
1366 assert!(list_jobs(&config).unwrap().is_empty());
1367 assert!(due_jobs(&config, Utc::now()).unwrap().is_empty());
1368 assert!(all_overdue_jobs(&config, Utc::now()).unwrap().is_empty());
1369 assert!(list_runs(&config, "missing", 10).unwrap().is_empty());
1370
1371 let err = get_job(&config, "missing").unwrap_err();
1372 assert!(err.to_string().contains("not found"));
1373
1374 assert!(
1375 !cron_dir(&config).exists(),
1376 "read-only queries should not create the cron directory"
1377 );
1378 assert!(
1379 !cron_db(&config).exists(),
1380 "read-only queries should not create jobs.db"
1381 );
1382 }
1383
1384 #[test]
1385 fn first_write_initializes_schema_and_follow_up_reads_work() {
1386 let tmp = TempDir::new().unwrap();
1387 let config = test_config(&tmp);
1388
1389 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1390
1391 assert!(cron_db(&config).exists());
1392 assert_eq!(get_job(&config, &job.id).unwrap().id, job.id);
1393 assert_eq!(list_jobs(&config).unwrap().len(), 1);
1394 }
1395
1396 #[test]
1397 fn empty_declarative_sync_on_empty_workspace_does_not_initialize_cron_db() {
1398 let tmp = TempDir::new().unwrap();
1399 let config = test_config(&tmp);
1400
1401 sync_declarative_jobs(&config, &std::collections::HashMap::new()).unwrap();
1402
1403 assert!(
1404 !cron_dir(&config).exists(),
1405 "empty declarative sync should not create the cron directory"
1406 );
1407 assert!(
1408 !cron_db(&config).exists(),
1409 "empty declarative sync should not create jobs.db"
1410 );
1411 }
1412
1413 #[test]
1414 fn read_existing_old_schema_db_migrates_before_querying_new_columns() {
1415 let tmp = TempDir::new().unwrap();
1416 let config = test_config(&tmp);
1417 let cron_dir = cron_dir(&config);
1418 std::fs::create_dir_all(&cron_dir).unwrap();
1419 let db_path = cron_db(&config);
1420 let conn = Connection::open(&db_path).unwrap();
1421 conn.execute_batch(
1422 "CREATE TABLE cron_jobs (
1423 id TEXT PRIMARY KEY,
1424 expression TEXT NOT NULL,
1425 command TEXT NOT NULL,
1426 schedule TEXT,
1427 job_type TEXT NOT NULL DEFAULT 'shell',
1428 prompt TEXT,
1429 name TEXT,
1430 session_target TEXT NOT NULL DEFAULT 'isolated',
1431 model TEXT,
1432 enabled INTEGER NOT NULL DEFAULT 1,
1433 delivery TEXT,
1434 delete_after_run INTEGER NOT NULL DEFAULT 0,
1435 allowed_tools TEXT,
1436 created_at TEXT NOT NULL,
1437 next_run TEXT NOT NULL,
1438 last_run TEXT,
1439 last_status TEXT,
1440 last_output TEXT
1441 );",
1442 )
1443 .unwrap();
1444 conn.execute(
1445 "INSERT INTO cron_jobs (
1446 id, expression, command, schedule, job_type, session_target,
1447 enabled, delete_after_run, created_at, next_run
1448 ) VALUES (?1, ?2, ?3, ?4, 'shell', 'isolated', 1, 0, ?5, ?6)",
1449 params![
1450 "legacy-schema",
1451 "*/5 * * * *",
1452 "echo legacy",
1453 Option::<String>::None,
1454 Utc::now().to_rfc3339(),
1455 (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1456 ],
1457 )
1458 .unwrap();
1459 drop(conn);
1460
1461 let jobs = list_jobs(&config).unwrap();
1462
1463 assert_eq!(jobs.len(), 1);
1464 assert_eq!(jobs[0].id, "legacy-schema");
1465 assert_eq!(jobs[0].source, "imperative");
1466 assert!(jobs[0].uses_memory);
1467
1468 let conn = Connection::open(&db_path).unwrap();
1469 let columns: Vec<String> = conn
1470 .prepare("PRAGMA table_info(cron_jobs)")
1471 .unwrap()
1472 .query_map([], |row| row.get(1))
1473 .unwrap()
1474 .collect::<Result<_, _>>()
1475 .unwrap();
1476 assert!(columns.iter().any(|name| name == "source"));
1477 assert!(columns.iter().any(|name| name == "uses_memory"));
1478 }
1479
1480 #[test]
1481 fn add_job_accepts_five_field_expression() {
1482 let tmp = TempDir::new().unwrap();
1483 let config = test_config(&tmp);
1484
1485 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1486 assert_eq!(job.expression, "*/5 * * * *");
1487 assert_eq!(job.command, "echo ok");
1488 assert!(matches!(job.schedule, Schedule::Cron { .. }));
1489 }
1490
1491 #[test]
1492 fn add_shell_job_marks_at_schedule_for_auto_delete() {
1493 let tmp = TempDir::new().unwrap();
1494 let config = test_config(&tmp);
1495
1496 let one_shot = add_shell_job(
1497 &config,
1498 "default",
1499 None,
1500 Schedule::At {
1501 at: Utc::now() + ChronoDuration::minutes(10),
1502 },
1503 "echo once",
1504 None,
1505 )
1506 .unwrap();
1507 assert!(one_shot.delete_after_run);
1508
1509 let recurring = add_shell_job(
1510 &config,
1511 "default",
1512 None,
1513 Schedule::Every { every_ms: 60_000 },
1514 "echo recurring",
1515 None,
1516 )
1517 .unwrap();
1518 assert!(!recurring.delete_after_run);
1519 }
1520
1521 #[test]
1522 fn add_shell_job_persists_delivery() {
1523 let tmp = TempDir::new().unwrap();
1524 let config = test_config(&tmp);
1525
1526 let job = add_shell_job(
1527 &config,
1528 "default",
1529 Some("deliver-shell".into()),
1530 Schedule::Cron {
1531 expr: "*/5 * * * *".into(),
1532 tz: None,
1533 },
1534 "echo delivered",
1535 Some(DeliveryConfig {
1536 mode: "announce".into(),
1537 channel: Some("discord".into()),
1538 to: Some("1234567890".into()),
1539 thread_id: None,
1540 best_effort: true,
1541 }),
1542 )
1543 .unwrap();
1544
1545 assert_eq!(job.delivery.mode, "announce");
1546 assert_eq!(job.delivery.channel.as_deref(), Some("discord"));
1547 assert_eq!(job.delivery.to.as_deref(), Some("1234567890"));
1548
1549 let stored = get_job(&config, &job.id).unwrap();
1550 assert_eq!(stored.delivery.mode, "announce");
1551 assert_eq!(stored.delivery.channel.as_deref(), Some("discord"));
1552 assert_eq!(stored.delivery.to.as_deref(), Some("1234567890"));
1553 }
1554
1555 #[test]
1556 fn add_agent_job_rejects_invalid_announce_delivery() {
1557 let tmp = TempDir::new().unwrap();
1558 let config = test_config(&tmp);
1559
1560 let err = add_agent_job(
1561 &config,
1562 "default",
1563 Some("deliver-agent".into()),
1564 Schedule::Cron {
1565 expr: "*/5 * * * *".into(),
1566 tz: None,
1567 },
1568 "summarize logs",
1569 SessionTarget::Isolated,
1570 None,
1571 Some(DeliveryConfig {
1572 mode: "announce".into(),
1573 channel: Some("discord".into()),
1574 to: None,
1575 thread_id: None,
1576 best_effort: true,
1577 }),
1578 false,
1579 None,
1580 )
1581 .unwrap_err();
1582
1583 assert!(err.to_string().contains("delivery.to is required"));
1584 }
1585
1586 #[test]
1587 fn add_shell_job_rejects_invalid_delivery_mode() {
1588 let tmp = TempDir::new().unwrap();
1589 let config = test_config(&tmp);
1590
1591 let err = add_shell_job(
1592 &config,
1593 "default",
1594 Some("deliver-shell".into()),
1595 Schedule::Cron {
1596 expr: "*/5 * * * *".into(),
1597 tz: None,
1598 },
1599 "echo delivered",
1600 Some(DeliveryConfig {
1601 mode: "annouce".into(),
1602 channel: Some("discord".into()),
1603 to: Some("1234567890".into()),
1604 thread_id: None,
1605 best_effort: true,
1606 }),
1607 )
1608 .unwrap_err();
1609
1610 assert!(err.to_string().contains("unsupported delivery mode"));
1611 }
1612
1613 #[test]
1614 fn add_list_remove_roundtrip() {
1615 let tmp = TempDir::new().unwrap();
1616 let config = test_config(&tmp);
1617
1618 let job = add_job(&config, "test-agent", "*/10 * * * *", "echo roundtrip").unwrap();
1619 let listed = list_jobs(&config).unwrap();
1620 assert_eq!(listed.len(), 1);
1621 assert_eq!(listed[0].id, job.id);
1622
1623 remove_job(&config, &job.id).unwrap();
1624 assert!(list_jobs(&config).unwrap().is_empty());
1625 }
1626
1627 #[test]
1628 fn due_jobs_filters_by_timestamp_and_enabled() {
1629 let tmp = TempDir::new().unwrap();
1630 let config = test_config(&tmp);
1631
1632 let job = add_job(&config, "test-agent", "* * * * *", "echo due").unwrap();
1633
1634 let due_now = due_jobs(&config, Utc::now()).unwrap();
1635 assert!(due_now.is_empty(), "new job should not be due immediately");
1636
1637 let far_future = Utc::now() + ChronoDuration::days(365);
1638 let due_future = due_jobs(&config, far_future).unwrap();
1639 assert_eq!(due_future.len(), 1, "job should be due in far future");
1640
1641 let _ = update_job(
1642 &config,
1643 &job.id,
1644 CronJobPatch {
1645 enabled: Some(false),
1646 ..CronJobPatch::default()
1647 },
1648 )
1649 .unwrap();
1650 let due_after_disable = due_jobs(&config, far_future).unwrap();
1651 assert!(due_after_disable.is_empty());
1652 }
1653
1654 #[test]
1655 fn due_jobs_respects_scheduler_max_tasks_limit() {
1656 let tmp = TempDir::new().unwrap();
1657 let mut config = test_config(&tmp);
1658 config.scheduler.max_tasks = 2;
1659
1660 let _ = add_job(&config, "test-agent", "* * * * *", "echo due-1").unwrap();
1661 let _ = add_job(&config, "test-agent", "* * * * *", "echo due-2").unwrap();
1662 let _ = add_job(&config, "test-agent", "* * * * *", "echo due-3").unwrap();
1663
1664 let far_future = Utc::now() + ChronoDuration::days(365);
1665 let due = due_jobs(&config, far_future).unwrap();
1666 assert_eq!(due.len(), 2);
1667 }
1668
1669 #[test]
1670 fn all_overdue_jobs_ignores_max_tasks_limit() {
1671 let tmp = TempDir::new().unwrap();
1672 let mut config = test_config(&tmp);
1673 config.scheduler.max_tasks = 2;
1674
1675 let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-1").unwrap();
1676 let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-2").unwrap();
1677 let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-3").unwrap();
1678
1679 let far_future = Utc::now() + ChronoDuration::days(365);
1680 let due = due_jobs(&config, far_future).unwrap();
1682 assert_eq!(due.len(), 2);
1683 let overdue = all_overdue_jobs(&config, far_future).unwrap();
1685 assert_eq!(overdue.len(), 3);
1686 }
1687
1688 #[test]
1689 fn all_overdue_jobs_excludes_disabled_jobs() {
1690 let tmp = TempDir::new().unwrap();
1691 let config = test_config(&tmp);
1692
1693 let job = add_job(&config, "test-agent", "* * * * *", "echo disabled").unwrap();
1694 let _ = update_job(
1695 &config,
1696 &job.id,
1697 CronJobPatch {
1698 enabled: Some(false),
1699 ..CronJobPatch::default()
1700 },
1701 )
1702 .unwrap();
1703
1704 let far_future = Utc::now() + ChronoDuration::days(365);
1705 let overdue = all_overdue_jobs(&config, far_future).unwrap();
1706 assert!(overdue.is_empty());
1707 }
1708
1709 #[test]
1710 fn add_agent_job_persists_allowed_tools() {
1711 let tmp = TempDir::new().unwrap();
1712 let config = test_config(&tmp);
1713
1714 let job = add_agent_job(
1715 &config,
1716 "default",
1717 Some("agent".into()),
1718 Schedule::Every { every_ms: 60_000 },
1719 "do work",
1720 SessionTarget::Isolated,
1721 None,
1722 None,
1723 false,
1724 Some(vec!["file_read".into(), "web_search".into()]),
1725 )
1726 .unwrap();
1727
1728 assert_eq!(
1729 job.allowed_tools,
1730 Some(vec!["file_read".into(), "web_search".into()])
1731 );
1732
1733 let stored = get_job(&config, &job.id).unwrap();
1734 assert_eq!(stored.allowed_tools, job.allowed_tools);
1735 }
1736
1737 #[test]
1738 fn update_job_persists_allowed_tools_patch() {
1739 let tmp = TempDir::new().unwrap();
1740 let config = test_config(&tmp);
1741
1742 let job = add_agent_job(
1743 &config,
1744 "default",
1745 Some("agent".into()),
1746 Schedule::Every { every_ms: 60_000 },
1747 "do work",
1748 SessionTarget::Isolated,
1749 None,
1750 None,
1751 false,
1752 None,
1753 )
1754 .unwrap();
1755
1756 let updated = update_job(
1757 &config,
1758 &job.id,
1759 CronJobPatch {
1760 allowed_tools: Some(vec!["shell".into()]),
1761 ..CronJobPatch::default()
1762 },
1763 )
1764 .unwrap();
1765
1766 assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
1767 assert_eq!(
1768 get_job(&config, &job.id).unwrap().allowed_tools,
1769 Some(vec!["shell".into()])
1770 );
1771 }
1772
1773 #[test]
1774 fn reschedule_after_run_persists_last_status_and_last_run() {
1775 let tmp = TempDir::new().unwrap();
1776 let config = test_config(&tmp);
1777
1778 let job = add_job(&config, "test-agent", "*/15 * * * *", "echo run").unwrap();
1779 reschedule_after_run(&config, &job, false, "failed output").unwrap();
1780
1781 let listed = list_jobs(&config).unwrap();
1782 let stored = listed.iter().find(|j| j.id == job.id).unwrap();
1783 assert_eq!(stored.last_status.as_deref(), Some("error"));
1784 assert!(stored.last_run.is_some());
1785 assert_eq!(stored.last_output.as_deref(), Some("failed output"));
1786 }
1787
1788 #[test]
1789 fn job_type_from_sql_reads_valid_value() {
1790 let tmp = TempDir::new().unwrap();
1791 let config = test_config(&tmp);
1792 let now = Utc::now();
1793
1794 with_initialized_connection(&config, |conn| {
1795 conn.execute(
1796 "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1797 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1798 params![
1799 "job-type-valid",
1800 "*/5 * * * *",
1801 "echo ok",
1802 Option::<String>::None,
1803 "agent",
1804 now.to_rfc3339(),
1805 (now + ChronoDuration::minutes(5)).to_rfc3339(),
1806 ],
1807 )?;
1808 Ok(())
1809 })
1810 .unwrap();
1811
1812 let job = get_job(&config, "job-type-valid").unwrap();
1813 assert_eq!(job.job_type, JobType::Agent);
1814 }
1815
1816 #[test]
1817 fn job_type_from_sql_rejects_invalid_value() {
1818 let tmp = TempDir::new().unwrap();
1819 let config = test_config(&tmp);
1820 let now = Utc::now();
1821
1822 with_initialized_connection(&config, |conn| {
1823 conn.execute(
1824 "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1825 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1826 params![
1827 "job-type-invalid",
1828 "*/5 * * * *",
1829 "echo ok",
1830 Option::<String>::None,
1831 "unknown",
1832 now.to_rfc3339(),
1833 (now + ChronoDuration::minutes(5)).to_rfc3339(),
1834 ],
1835 )?;
1836 Ok(())
1837 })
1838 .unwrap();
1839
1840 assert!(get_job(&config, "job-type-invalid").is_err());
1841 }
1842
1843 #[test]
1844 fn migration_falls_back_to_legacy_expression() {
1845 let tmp = TempDir::new().unwrap();
1846 let config = test_config(&tmp);
1847
1848 with_initialized_connection(&config, |conn| {
1849 conn.execute(
1850 "INSERT INTO cron_jobs (id, expression, command, created_at, next_run)
1851 VALUES (?1, ?2, ?3, ?4, ?5)",
1852 params![
1853 "legacy-id",
1854 "*/5 * * * *",
1855 "echo legacy",
1856 Utc::now().to_rfc3339(),
1857 (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1858 ],
1859 )?;
1860 conn.execute(
1861 "UPDATE cron_jobs SET schedule = NULL WHERE id = 'legacy-id'",
1862 [],
1863 )?;
1864 Ok(())
1865 })
1866 .unwrap();
1867
1868 let job = get_job(&config, "legacy-id").unwrap();
1869 assert!(matches!(job.schedule, Schedule::Cron { .. }));
1870 }
1871
1872 #[test]
1873 fn record_and_prune_runs() {
1874 let tmp = TempDir::new().unwrap();
1875 let mut config = test_config(&tmp);
1876 config.scheduler.max_run_history = 2;
1877 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1878 let base = Utc::now();
1879
1880 for idx in 0..3 {
1881 let start = base + ChronoDuration::seconds(idx);
1882 let end = start + ChronoDuration::milliseconds(100);
1883 record_run(&config, &job.id, start, end, "ok", Some("done"), 100).unwrap();
1884 }
1885
1886 let runs = list_runs(&config, &job.id, 10).unwrap();
1887 assert_eq!(runs.len(), 2);
1888 }
1889
1890 #[test]
1891 fn remove_job_cascades_run_history() {
1892 let tmp = TempDir::new().unwrap();
1893 let config = test_config(&tmp);
1894 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1895 let start = Utc::now();
1896 record_run(
1897 &config,
1898 &job.id,
1899 start,
1900 start + ChronoDuration::milliseconds(5),
1901 "ok",
1902 Some("ok"),
1903 5,
1904 )
1905 .unwrap();
1906
1907 remove_job(&config, &job.id).unwrap();
1908 let runs = list_runs(&config, &job.id, 10).unwrap();
1909 assert!(runs.is_empty());
1910 }
1911
1912 #[test]
1913 fn record_run_truncates_large_output() {
1914 let tmp = TempDir::new().unwrap();
1915 let config = test_config(&tmp);
1916 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo trunc").unwrap();
1917 let output = "x".repeat(MAX_CRON_OUTPUT_BYTES + 512);
1918
1919 record_run(
1920 &config,
1921 &job.id,
1922 Utc::now(),
1923 Utc::now(),
1924 "ok",
1925 Some(&output),
1926 1,
1927 )
1928 .unwrap();
1929
1930 let runs = list_runs(&config, &job.id, 1).unwrap();
1931 let stored = runs[0].output.as_deref().unwrap_or_default();
1932 assert!(stored.ends_with(TRUNCATED_OUTPUT_MARKER));
1933 assert!(stored.len() <= MAX_CRON_OUTPUT_BYTES);
1934 }
1935
1936 #[test]
1937 fn reschedule_after_run_disables_at_schedule_job() {
1938 let tmp = TempDir::new().unwrap();
1939 let config = test_config(&tmp);
1940 let at = Utc::now() + ChronoDuration::minutes(10);
1941 let job = add_shell_job(
1942 &config,
1943 "test-agent",
1944 None,
1945 Schedule::At { at },
1946 "echo once",
1947 None,
1948 )
1949 .unwrap();
1950
1951 reschedule_after_run(&config, &job, true, "done").unwrap();
1952
1953 let stored = get_job(&config, &job.id).unwrap();
1954 assert!(
1955 !stored.enabled,
1956 "At schedule job should be disabled after reschedule"
1957 );
1958 assert_eq!(stored.last_status.as_deref(), Some("ok"));
1959 }
1960
1961 #[test]
1962 fn reschedule_after_run_disables_at_schedule_job_on_failure() {
1963 let tmp = TempDir::new().unwrap();
1964 let config = test_config(&tmp);
1965 let at = Utc::now() + ChronoDuration::minutes(10);
1966 let job = add_shell_job(
1967 &config,
1968 "test-agent",
1969 None,
1970 Schedule::At { at },
1971 "echo once",
1972 None,
1973 )
1974 .unwrap();
1975
1976 reschedule_after_run(&config, &job, false, "failed").unwrap();
1977
1978 let stored = get_job(&config, &job.id).unwrap();
1979 assert!(
1980 !stored.enabled,
1981 "At schedule job should be disabled after reschedule even on failure"
1982 );
1983 assert_eq!(stored.last_status.as_deref(), Some("error"));
1984 assert_eq!(stored.last_output.as_deref(), Some("failed"));
1985 }
1986
1987 #[test]
1988 fn reschedule_after_run_truncates_last_output() {
1989 let tmp = TempDir::new().unwrap();
1990 let config = test_config(&tmp);
1991 let job = add_job(&config, "test-agent", "*/5 * * * *", "echo trunc").unwrap();
1992 let output = "y".repeat(MAX_CRON_OUTPUT_BYTES + 1024);
1993
1994 reschedule_after_run(&config, &job, false, &output).unwrap();
1995
1996 let stored = get_job(&config, &job.id).unwrap();
1997 let last_output = stored.last_output.as_deref().unwrap_or_default();
1998 assert!(last_output.ends_with(TRUNCATED_OUTPUT_MARKER));
1999 assert!(last_output.len() <= MAX_CRON_OUTPUT_BYTES);
2000 }
2001
2002 fn make_shell_decl(
2005 id: &str,
2006 expr: &str,
2007 cmd: &str,
2008 ) -> (String, zeroclaw_config::schema::CronJobDecl) {
2009 (
2010 id.to_string(),
2011 zeroclaw_config::schema::CronJobDecl {
2012 name: Some(format!("decl-{id}")),
2013 job_type: "shell".to_string(),
2014 schedule: zeroclaw_config::schema::CronScheduleDecl::Cron {
2015 expr: expr.to_string(),
2016 tz: None,
2017 },
2018 command: Some(cmd.to_string()),
2019 prompt: None,
2020 enabled: true,
2021 model: None,
2022 allowed_tools: None,
2023 uses_memory: true,
2024 session_target: None,
2025 delivery: None,
2026 },
2027 )
2028 }
2029
2030 fn make_agent_decl(
2031 id: &str,
2032 expr: &str,
2033 prompt: &str,
2034 ) -> (String, zeroclaw_config::schema::CronJobDecl) {
2035 (
2036 id.to_string(),
2037 zeroclaw_config::schema::CronJobDecl {
2038 name: Some(format!("decl-{id}")),
2039 job_type: "agent".to_string(),
2040 schedule: zeroclaw_config::schema::CronScheduleDecl::Cron {
2041 expr: expr.to_string(),
2042 tz: None,
2043 },
2044 command: None,
2045 prompt: Some(prompt.to_string()),
2046 enabled: true,
2047 model: None,
2048 allowed_tools: None,
2049 uses_memory: true,
2050 session_target: None,
2051 delivery: None,
2052 },
2053 )
2054 }
2055
2056 fn decls_map(
2057 items: Vec<(String, zeroclaw_config::schema::CronJobDecl)>,
2058 ) -> std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl> {
2059 items.into_iter().collect()
2060 }
2061
2062 fn seed_claiming_agent(config: &mut Config, ids: &[&str]) {
2065 config.agents.insert(
2066 "test-agent".to_string(),
2067 zeroclaw_config::schema::AliasedAgentConfig {
2068 enabled: true,
2069 cron_jobs: ids.iter().map(|s| (*s).to_string()).collect(),
2070 ..Default::default()
2071 },
2072 );
2073 }
2074
2075 #[test]
2076 fn sync_inserts_new_declarative_job() {
2077 let tmp = TempDir::new().unwrap();
2078 let mut config = test_config(&tmp);
2079 seed_claiming_agent(&mut config, &["daily-backup"]);
2080
2081 let decls = decls_map(vec![make_shell_decl(
2082 "daily-backup",
2083 "0 2 * * *",
2084 "echo backup",
2085 )]);
2086 sync_declarative_jobs(&config, &decls).unwrap();
2087
2088 let job = get_job(&config, "daily-backup").unwrap();
2089 assert_eq!(job.command, "echo backup");
2090 assert_eq!(job.source, "declarative");
2091 assert_eq!(job.name.as_deref(), Some("decl-daily-backup"));
2092 }
2093
2094 #[test]
2095 fn sync_updates_existing_declarative_job() {
2096 let tmp = TempDir::new().unwrap();
2097 let mut config = test_config(&tmp);
2098 seed_claiming_agent(&mut config, &["updatable"]);
2099
2100 let decls = decls_map(vec![make_shell_decl("updatable", "0 2 * * *", "echo v1")]);
2101 sync_declarative_jobs(&config, &decls).unwrap();
2102
2103 let job_v1 = get_job(&config, "updatable").unwrap();
2104 assert_eq!(job_v1.command, "echo v1");
2105
2106 let decls_v2 = decls_map(vec![make_shell_decl("updatable", "0 3 * * *", "echo v2")]);
2107 sync_declarative_jobs(&config, &decls_v2).unwrap();
2108
2109 let job_v2 = get_job(&config, "updatable").unwrap();
2110 assert_eq!(job_v2.command, "echo v2");
2111 assert_eq!(job_v2.expression, "0 3 * * *");
2112 assert_eq!(job_v2.source, "declarative");
2113 }
2114
2115 #[test]
2116 fn sync_does_not_delete_imperative_jobs() {
2117 let tmp = TempDir::new().unwrap();
2118 let mut config = test_config(&tmp);
2119 seed_claiming_agent(&mut config, &["my-decl"]);
2120
2121 let imperative = add_job(&config, "test-agent", "*/10 * * * *", "echo imperative").unwrap();
2123
2124 let decls = decls_map(vec![make_shell_decl("my-decl", "0 2 * * *", "echo decl")]);
2126 sync_declarative_jobs(&config, &decls).unwrap();
2127
2128 let still_there = get_job(&config, &imperative.id).unwrap();
2130 assert_eq!(still_there.command, "echo imperative");
2131 assert_eq!(still_there.source, "imperative");
2132
2133 let decl_job = get_job(&config, "my-decl").unwrap();
2135 assert_eq!(decl_job.command, "echo decl");
2136 }
2137
2138 #[test]
2139 fn sync_removes_stale_declarative_jobs() {
2140 let tmp = TempDir::new().unwrap();
2141 let mut config = test_config(&tmp);
2142 seed_claiming_agent(&mut config, &["keeper", "stale"]);
2143
2144 let decls = decls_map(vec![
2146 make_shell_decl("keeper", "0 2 * * *", "echo keep"),
2147 make_shell_decl("stale", "0 3 * * *", "echo stale"),
2148 ]);
2149 sync_declarative_jobs(&config, &decls).unwrap();
2150
2151 let decls_v2 = decls_map(vec![make_shell_decl("keeper", "0 2 * * *", "echo keep")]);
2153 sync_declarative_jobs(&config, &decls_v2).unwrap();
2154
2155 assert!(get_job(&config, "stale").is_err());
2156 assert!(get_job(&config, "keeper").is_ok());
2157 }
2158
2159 #[test]
2160 fn sync_empty_removes_all_declarative_jobs() {
2161 let tmp = TempDir::new().unwrap();
2162 let mut config = test_config(&tmp);
2163 seed_claiming_agent(&mut config, &["to-remove"]);
2164
2165 let decls = decls_map(vec![make_shell_decl("to-remove", "0 2 * * *", "echo bye")]);
2166 sync_declarative_jobs(&config, &decls).unwrap();
2167 assert!(get_job(&config, "to-remove").is_ok());
2168
2169 sync_declarative_jobs(&config, &std::collections::HashMap::new()).unwrap();
2171 assert!(get_job(&config, "to-remove").is_err());
2172 }
2173
2174 #[test]
2175 fn sync_validates_shell_job_requires_command() {
2176 let tmp = TempDir::new().unwrap();
2177 let config = test_config(&tmp);
2178
2179 let (id, mut decl) = make_shell_decl("bad", "0 2 * * *", "echo ok");
2180 decl.command = None;
2181
2182 let decls = decls_map(vec![(id, decl)]);
2183 let result = sync_declarative_jobs(&config, &decls);
2184 assert!(result.is_err());
2185 assert!(result.unwrap_err().to_string().contains("command"));
2186 }
2187
2188 #[test]
2189 fn sync_validates_agent_job_requires_prompt() {
2190 let tmp = TempDir::new().unwrap();
2191 let config = test_config(&tmp);
2192
2193 let (id, mut decl) = make_agent_decl("bad-agent", "0 2 * * *", "do stuff");
2194 decl.prompt = None;
2195
2196 let decls = decls_map(vec![(id, decl)]);
2197 let result = sync_declarative_jobs(&config, &decls);
2198 assert!(result.is_err());
2199 assert!(result.unwrap_err().to_string().contains("prompt"));
2200 }
2201
2202 #[test]
2203 fn sync_agent_job_inserts_correctly() {
2204 let tmp = TempDir::new().unwrap();
2205 let mut config = test_config(&tmp);
2206 seed_claiming_agent(&mut config, &["agent-check"]);
2207
2208 let decls = decls_map(vec![make_agent_decl(
2209 "agent-check",
2210 "*/15 * * * *",
2211 "check health",
2212 )]);
2213 sync_declarative_jobs(&config, &decls).unwrap();
2214
2215 let job = get_job(&config, "agent-check").unwrap();
2216 assert_eq!(job.job_type, JobType::Agent);
2217 assert_eq!(job.prompt.as_deref(), Some("check health"));
2218 assert_eq!(job.source, "declarative");
2219 }
2220
2221 #[test]
2222 fn sync_every_schedule_works() {
2223 let tmp = TempDir::new().unwrap();
2224 let mut config = test_config(&tmp);
2225 seed_claiming_agent(&mut config, &["interval-job"]);
2226
2227 let decl = zeroclaw_config::schema::CronJobDecl {
2228 name: None,
2229 job_type: "shell".to_string(),
2230 schedule: zeroclaw_config::schema::CronScheduleDecl::Every { every_ms: 60000 },
2231 command: Some("echo interval".to_string()),
2232 prompt: None,
2233 enabled: true,
2234 model: None,
2235 allowed_tools: None,
2236 uses_memory: true,
2237 session_target: None,
2238 delivery: None,
2239 };
2240
2241 let mut decls = std::collections::HashMap::new();
2242 decls.insert("interval-job".to_string(), decl);
2243 sync_declarative_jobs(&config, &decls).unwrap();
2244
2245 let job = get_job(&config, "interval-job").unwrap();
2246 assert!(matches!(job.schedule, Schedule::Every { every_ms: 60000 }));
2247 assert_eq!(job.command, "echo interval");
2248 }
2249
2250 #[test]
2251 fn declarative_config_parses_from_toml() {
2252 let toml_str = r#"
2254[cron.daily-report]
2255name = "Daily Report"
2256job_type = "shell"
2257command = "echo report"
2258schedule = { kind = "cron", expr = "0 9 * * *" }
2259
2260[cron.health-check]
2261job_type = "agent"
2262prompt = "Check server health"
2263schedule = { kind = "every", every_ms = 300000 }
2264 "#;
2265
2266 #[derive(serde::Deserialize)]
2267 struct Wrap {
2268 cron: std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl>,
2269 }
2270 let parsed: Wrap = toml::from_str(toml_str).unwrap();
2271 assert_eq!(parsed.cron.len(), 2);
2272
2273 let report = parsed.cron.get("daily-report").unwrap();
2274 assert_eq!(report.command.as_deref(), Some("echo report"));
2275 assert!(matches!(
2276 report.schedule,
2277 zeroclaw_config::schema::CronScheduleDecl::Cron { ref expr, .. } if expr == "0 9 * * *"
2278 ));
2279
2280 let health = parsed.cron.get("health-check").unwrap();
2281 assert_eq!(health.job_type, "agent");
2282 assert_eq!(health.prompt.as_deref(), Some("Check server health"));
2283 assert!(matches!(
2284 health.schedule,
2285 zeroclaw_config::schema::CronScheduleDecl::Every { every_ms: 300_000 }
2286 ));
2287 }
2288
2289 #[test]
2290 fn skip_missed_run_advances_recurring_job_next_run() {
2291 let tmp = TempDir::new().unwrap();
2292 let config = test_config(&tmp);
2293
2294 let job = add_job(&config, "test-agent", "* * * * *", "echo test").unwrap();
2297
2298 let past = Utc::now() - ChronoDuration::hours(1);
2300 with_initialized_connection(&config, |conn| {
2301 conn.execute(
2302 "UPDATE cron_jobs SET next_run = ?1 WHERE id = ?2",
2303 params![past.to_rfc3339(), job.id],
2304 )
2305 .unwrap();
2306 Ok(())
2307 })
2308 .unwrap();
2309
2310 assert!(
2312 !all_overdue_jobs(&config, Utc::now()).unwrap().is_empty(),
2313 "job with past next_run must appear in overdue"
2314 );
2315
2316 let reloaded = get_job(&config, &job.id).unwrap();
2318 skip_missed_run(&config, &reloaded, Utc::now()).unwrap();
2319
2320 let updated = get_job(&config, &job.id).unwrap();
2322 assert!(
2323 updated.next_run > Utc::now(),
2324 "skip_missed_run must advance next_run to the future"
2325 );
2326 assert!(updated.enabled, "recurring job must stay enabled");
2327 }
2328
2329 #[test]
2330 fn skip_missed_run_disables_overdue_oneshot_job() {
2331 let tmp = TempDir::new().unwrap();
2332 let config = test_config(&tmp);
2333
2334 let run_at = Utc::now() - ChronoDuration::hours(2);
2335 let schedule = Schedule::At { at: run_at };
2336 let job = add_job_with_schedule(&config, "test-agent", &schedule, "echo once").unwrap();
2337
2338 assert!(
2341 !all_overdue_jobs(&config, Utc::now()).unwrap().is_empty(),
2342 "one-shot job with past at-time must be overdue"
2343 );
2344
2345 let reloaded = get_job(&config, &job.id).unwrap();
2346 skip_missed_run(&config, &reloaded, Utc::now()).unwrap();
2347
2348 let updated = get_job(&config, &job.id).unwrap();
2349 assert!(
2350 !updated.enabled,
2351 "overdue one-shot job must be disabled after skip"
2352 );
2353 assert_eq!(
2354 updated.last_status.as_deref(),
2355 Some("skipped"),
2356 "one-shot job last_status must be 'skipped'"
2357 );
2358 }
2359
2360 fn add_job_with_schedule(
2361 config: &Config,
2362 agent_alias: &str,
2363 schedule: &Schedule,
2364 command: &str,
2365 ) -> Result<CronJob> {
2366 let now = Utc::now();
2367 let job = CronJob {
2368 id: format!("test-job-{}", Uuid::new_v4()),
2369 expression: String::new(),
2370 schedule: schedule.clone(),
2371 command: command.to_string(),
2372 prompt: None,
2373 name: None,
2374 job_type: JobType::Shell,
2375 session_target: SessionTarget::Isolated,
2376 model: None,
2377 agent_alias: agent_alias.to_string(),
2378 enabled: true,
2379 delivery: DeliveryConfig::default(),
2380 delete_after_run: false,
2381 allowed_tools: None,
2382 uses_memory: false,
2383 source: "imperative".to_string(),
2384 created_at: now,
2385 next_run: next_run_for_schedule(schedule, now).unwrap_or(now),
2386 last_run: None,
2387 last_status: None,
2388 last_output: None,
2389 };
2390 let job_type_str: String = match &job.job_type {
2391 JobType::Shell => "shell".to_string(),
2392 JobType::Agent => "agent".to_string(),
2393 };
2394 let schedule_json = serde_json::to_string(&job.schedule).unwrap();
2395 let delivery_json = serde_json::to_string(&job.delivery).unwrap();
2396 let allowed_tools_json =
2397 crate::cron::store::encode_allowed_tools(job.allowed_tools.as_ref()).unwrap();
2398 with_initialized_connection(config, |conn| {
2399 conn.execute(
2400 "INSERT INTO cron_jobs
2401 (id, expression, command, schedule, job_type, prompt, name,
2402 session_target, model, enabled, delivery, delete_after_run,
2403 allowed_tools, next_run, last_run, last_status, last_output,
2404 uses_memory, source, created_at, agent_alias)
2405 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21)",
2406 params![
2407 job.id,
2408 job.expression,
2409 job.command,
2410 schedule_json,
2411 job_type_str.to_string(),
2412 job.prompt,
2413 job.name,
2414 job.session_target.as_str(),
2415 job.model,
2416 if job.enabled { 1 } else { 0 },
2417 delivery_json,
2418 if job.delete_after_run { 1 } else { 0 },
2419 allowed_tools_json,
2420 job.next_run.to_rfc3339(),
2421 job.last_run.map(|t| t.to_rfc3339()),
2422 job.last_status,
2423 job.last_output,
2424 if job.uses_memory { 1 } else { 0 },
2425 job.source,
2426 job.created_at.to_rfc3339(),
2427 job.agent_alias,
2428 ],
2429 )
2430 .context("Failed to insert test cron job")?;
2431 Ok(())
2432 })?;
2433 Ok(job)
2434 }
2435}