Skip to main content

zeroclaw_runtime/cron/
store.rs

1use crate::cron::{
2    CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
3    next_run_for_schedule, schedule_cron_expression, validate_delivery_config, validate_schedule,
4};
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use rusqlite::types::{FromSqlResult, ValueRef};
8use rusqlite::{Connection, OpenFlags, params};
9use uuid::Uuid;
10use zeroclaw_config::schema::Config;
11
12const MAX_CRON_OUTPUT_BYTES: usize = 16 * 1024;
13const TRUNCATED_OUTPUT_MARKER: &str = "\n...[truncated]";
14
15#[derive(Clone, Copy, Debug, Eq, PartialEq)]
16pub(crate) enum RunCompletionAction {
17    Reschedule,
18    Disable,
19    Delete,
20}
21
22#[cfg(test)]
23static WRITE_CONNECTION_COUNTS_FOR_TESTS: std::sync::LazyLock<
24    std::sync::Mutex<std::collections::HashMap<std::path::PathBuf, usize>>,
25> = std::sync::LazyLock::new(|| std::sync::Mutex::new(std::collections::HashMap::new()));
26
27#[cfg(test)]
28pub(crate) fn reset_write_connection_count_for_tests(config: &Config) {
29    let mut counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
30        .lock()
31        .unwrap_or_else(|e| e.into_inner());
32    counts.insert(cron_db_path(config), 0);
33}
34
35#[cfg(test)]
36pub(crate) fn write_connection_count_for_tests(config: &Config) -> usize {
37    let counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
38        .lock()
39        .unwrap_or_else(|e| e.into_inner());
40    counts.get(&cron_db_path(config)).copied().unwrap_or(0)
41}
42
43impl rusqlite::types::FromSql for JobType {
44    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
45        let text = value.as_str()?;
46        JobType::try_from(text).map_err(|e| rusqlite::types::FromSqlError::Other(e.into()))
47    }
48}
49
50#[cfg(test)]
51pub fn add_job(
52    config: &Config,
53    agent_alias: &str,
54    expression: &str,
55    command: &str,
56) -> Result<CronJob> {
57    let schedule = Schedule::Cron {
58        expr: expression.to_string(),
59        tz: None,
60    };
61    add_shell_job(config, agent_alias, None, schedule, command, None)
62}
63
64pub fn add_shell_job(
65    config: &Config,
66    agent_alias: &str,
67    name: Option<String>,
68    schedule: Schedule,
69    command: &str,
70    delivery: Option<DeliveryConfig>,
71) -> Result<CronJob> {
72    let now = Utc::now();
73    validate_schedule(&schedule, now)?;
74    validate_delivery_config(delivery.as_ref())?;
75    let next_run = next_run_for_schedule(&schedule, now)?;
76    let id = Uuid::new_v4().to_string();
77    let expression = schedule_cron_expression(&schedule).unwrap_or_default();
78    let schedule_json = serde_json::to_string(&schedule)?;
79    let delivery = delivery.unwrap_or_default();
80
81    let delete_after_run = matches!(schedule, Schedule::At { .. });
82    let agent_alias = agent_alias.trim();
83    if agent_alias.is_empty() {
84        anyhow::bail!("agent_alias is required; cron jobs must name an owning agent");
85    }
86
87    with_initialized_connection(config, |conn| {
88        conn.execute(
89            "INSERT INTO cron_jobs (
90                id, expression, command, schedule, job_type, prompt, name, session_target, model,
91                enabled, delivery, delete_after_run, agent_alias, created_at, next_run
92             ) VALUES (?1, ?2, ?3, ?4, 'shell', NULL, ?5, 'isolated', NULL, 1, ?6, ?7, ?8, ?9, ?10)",
93            params![
94                id,
95                expression,
96                command,
97                schedule_json,
98                name,
99                serde_json::to_string(&delivery)?,
100                if delete_after_run { 1 } else { 0 },
101                agent_alias,
102                now.to_rfc3339(),
103                next_run.to_rfc3339(),
104            ],
105        )
106        .context("Failed to insert cron shell job")?;
107        Ok(())
108    })?;
109
110    get_job(config, &id)
111}
112
113#[allow(clippy::too_many_arguments)]
114pub fn add_agent_job(
115    config: &Config,
116    agent_alias: &str,
117    name: Option<String>,
118    schedule: Schedule,
119    prompt: &str,
120    session_target: SessionTarget,
121    model: Option<String>,
122    delivery: Option<DeliveryConfig>,
123    delete_after_run: bool,
124    allowed_tools: Option<Vec<String>>,
125) -> Result<CronJob> {
126    let now = Utc::now();
127    validate_schedule(&schedule, now)?;
128    validate_delivery_config(delivery.as_ref())?;
129    let next_run = next_run_for_schedule(&schedule, now)?;
130    let id = Uuid::new_v4().to_string();
131    let expression = schedule_cron_expression(&schedule).unwrap_or_default();
132    let schedule_json = serde_json::to_string(&schedule)?;
133    let delivery = delivery.unwrap_or_default();
134    let agent_alias = agent_alias.trim();
135    if agent_alias.is_empty() {
136        anyhow::bail!("agent_alias is required; cron jobs must name an owning agent");
137    }
138
139    with_initialized_connection(config, |conn| {
140        conn.execute(
141            "INSERT INTO cron_jobs (
142                id, expression, command, schedule, job_type, prompt, name, session_target, model,
143                enabled, delivery, delete_after_run, allowed_tools, agent_alias, created_at, next_run
144             ) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11, ?12, ?13)",
145            params![
146                id,
147                expression,
148                schedule_json,
149                prompt,
150                name,
151                session_target.as_str(),
152                model,
153                serde_json::to_string(&delivery)?,
154                if delete_after_run { 1 } else { 0 },
155                encode_allowed_tools(allowed_tools.as_ref())?,
156                agent_alias,
157                now.to_rfc3339(),
158                next_run.to_rfc3339(),
159            ],
160        )
161        .context("Failed to insert cron agent job")?;
162        Ok(())
163    })?;
164
165    get_job(config, &id)
166}
167
168pub fn list_jobs(config: &Config) -> Result<Vec<CronJob>> {
169    let Some(jobs) = with_read_connection(config, |conn| {
170        let mut stmt = conn.prepare(
171            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
172                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
173                    allowed_tools, source, uses_memory, agent_alias
174             FROM cron_jobs ORDER BY next_run ASC",
175        )?;
176
177        let rows = stmt.query_map([], map_cron_job_row)?;
178
179        let mut jobs = Vec::new();
180        for row in rows {
181            jobs.push(row?);
182        }
183        Ok(jobs)
184    })?
185    else {
186        return Ok(Vec::new());
187    };
188
189    Ok(jobs)
190}
191
192pub fn get_job(config: &Config, job_id: &str) -> Result<CronJob> {
193    let Some(job) = with_read_connection(config, |conn| {
194        let mut stmt = conn.prepare(
195            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
196                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
197                    allowed_tools, source, uses_memory, agent_alias
198             FROM cron_jobs WHERE id = ?1",
199        )?;
200
201        let mut rows = stmt.query(params![job_id])?;
202        if let Some(row) = rows.next()? {
203            map_cron_job_row(row).map_err(Into::into)
204        } else {
205            anyhow::bail!("Cron job '{job_id}' not found")
206        }
207    })?
208    else {
209        anyhow::bail!("Cron job '{job_id}' not found")
210    };
211
212    Ok(job)
213}
214
215pub fn remove_job(config: &Config, id: &str) -> Result<()> {
216    let changed = with_initialized_connection(config, |conn| {
217        conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id])
218            .context("Failed to delete cron job")
219    })?;
220
221    if changed == 0 {
222        anyhow::bail!("Cron job '{id}' not found");
223    }
224
225    println!("✅ Removed cron job {id}");
226    Ok(())
227}
228
229pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
230    let lim = i64::try_from(config.scheduler.max_tasks.max(1))
231        .context("Scheduler max_tasks overflows i64")?;
232    let Some(jobs) = with_read_connection(config, |conn| {
233        let mut stmt = conn.prepare(
234            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
235                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
236                    allowed_tools, source, uses_memory, agent_alias
237             FROM cron_jobs
238             WHERE enabled = 1 AND next_run <= ?1
239             ORDER BY next_run ASC
240             LIMIT ?2",
241        )?;
242
243        let rows = stmt.query_map(params![now.to_rfc3339(), lim], map_cron_job_row)?;
244
245        let mut jobs = Vec::new();
246        for row in rows {
247            match row {
248                Ok(job) => jobs.push(job),
249                Err(e) => ::zeroclaw_log::record!(
250                    WARN,
251                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
252                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
253                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
254                    "Skipping cron job with unparseable row data"
255                ),
256            }
257        }
258        Ok(jobs)
259    })?
260    else {
261        return Ok(Vec::new());
262    };
263
264    Ok(jobs)
265}
266
267/// Return **all** enabled overdue jobs without the `max_tasks` limit.
268///
269/// Used by the scheduler startup catch-up to ensure every missed job is
270/// executed at least once after a period of downtime (late boot, daemon
271/// restart, etc.).
272pub fn all_overdue_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
273    let Some(jobs) = with_read_connection(config, |conn| {
274        let mut stmt = conn.prepare(
275            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
276                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
277                    allowed_tools, source, uses_memory, agent_alias
278             FROM cron_jobs
279             WHERE enabled = 1 AND next_run <= ?1
280             ORDER BY next_run ASC",
281        )?;
282
283        let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?;
284
285        let mut jobs = Vec::new();
286        for row in rows {
287            match row {
288                Ok(job) => jobs.push(job),
289                Err(e) => ::zeroclaw_log::record!(
290                    WARN,
291                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
292                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
293                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
294                    "Skipping cron job with unparseable row data"
295                ),
296            }
297        }
298        Ok(jobs)
299    })?
300    else {
301        return Ok(Vec::new());
302    };
303
304    Ok(jobs)
305}
306
307pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<CronJob> {
308    let mut job = get_job(config, job_id)?;
309    let mut schedule_changed = false;
310
311    if let Some(schedule) = patch.schedule {
312        validate_schedule(&schedule, Utc::now())?;
313        job.schedule = schedule;
314        job.expression = schedule_cron_expression(&job.schedule).unwrap_or_default();
315        schedule_changed = true;
316    }
317    if let Some(command) = patch.command {
318        job.command = command;
319    }
320    if let Some(prompt) = patch.prompt {
321        job.prompt = Some(prompt);
322    }
323    if let Some(name) = patch.name {
324        job.name = Some(name);
325    }
326    if let Some(enabled) = patch.enabled {
327        job.enabled = enabled;
328    }
329    if let Some(delivery) = patch.delivery {
330        job.delivery = delivery;
331    }
332    if let Some(model) = patch.model {
333        job.model = Some(model);
334    }
335    if let Some(target) = patch.session_target {
336        job.session_target = target;
337    }
338    if let Some(delete_after_run) = patch.delete_after_run {
339        job.delete_after_run = delete_after_run;
340    }
341    if let Some(allowed_tools) = patch.allowed_tools {
342        // Empty list means "clear the allowlist" (all tools available),
343        // not "allow zero tools".
344        if allowed_tools.is_empty() {
345            job.allowed_tools = None;
346        } else {
347            job.allowed_tools = Some(allowed_tools);
348        }
349    }
350    if let Some(uses_memory) = patch.uses_memory {
351        job.uses_memory = uses_memory;
352    }
353
354    if schedule_changed {
355        job.next_run = next_run_for_schedule(&job.schedule, Utc::now())?;
356    }
357
358    with_initialized_connection(config, |conn| {
359        conn.execute(
360            "UPDATE cron_jobs
361             SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4, prompt = ?5, name = ?6,
362                 session_target = ?7, model = ?8, enabled = ?9, delivery = ?10, delete_after_run = ?11,
363                 allowed_tools = ?12, next_run = ?13, uses_memory = ?14
364             WHERE id = ?15",
365            params![
366                job.expression,
367                job.command,
368                serde_json::to_string(&job.schedule)?,
369                <JobType as Into<&str>>::into(job.job_type).to_string(),
370                job.prompt,
371                job.name,
372                job.session_target.as_str(),
373                job.model,
374                if job.enabled { 1 } else { 0 },
375                serde_json::to_string(&job.delivery)?,
376                if job.delete_after_run { 1 } else { 0 },
377                encode_allowed_tools(job.allowed_tools.as_ref())?,
378                job.next_run.to_rfc3339(),
379                if job.uses_memory { 1 } else { 0 },
380                job.id,
381            ],
382        )
383        .context("Failed to update cron job")?;
384        Ok(())
385    })?;
386
387    get_job(config, job_id)
388}
389
390pub fn record_last_run(
391    config: &Config,
392    job_id: &str,
393    finished_at: DateTime<Utc>,
394    success: bool,
395    output: &str,
396) -> Result<()> {
397    let status = if success { "ok" } else { "error" };
398    record_last_run_with_status(config, job_id, finished_at, status, output)
399}
400
401pub fn record_last_run_with_status(
402    config: &Config,
403    job_id: &str,
404    finished_at: DateTime<Utc>,
405    status: &str,
406    output: &str,
407) -> Result<()> {
408    let bounded_output = truncate_cron_output(output);
409    with_initialized_connection(config, |conn| {
410        conn.execute(
411            "UPDATE cron_jobs
412             SET last_run = ?1, last_status = ?2, last_output = ?3
413             WHERE id = ?4",
414            params![finished_at.to_rfc3339(), status, bounded_output, job_id],
415        )
416        .context("Failed to update cron last run fields")?;
417        Ok(())
418    })
419}
420
421pub fn reschedule_after_run(
422    config: &Config,
423    job: &CronJob,
424    success: bool,
425    output: &str,
426) -> Result<()> {
427    let status = if success { "ok" } else { "error" };
428    reschedule_after_run_with_status(config, job, status, output)
429}
430
431pub fn reschedule_after_run_with_status(
432    config: &Config,
433    job: &CronJob,
434    status: &str,
435    output: &str,
436) -> Result<()> {
437    let now = Utc::now();
438    let bounded_output = truncate_cron_output(output);
439
440    // One-shot `At` schedules have no future occurrence — record the run
441    // result and disable the job so it won't be picked up again.
442    if matches!(job.schedule, Schedule::At { .. }) {
443        with_initialized_connection(config, |conn| {
444            conn.execute(
445                "UPDATE cron_jobs
446                 SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
447                 WHERE id = ?4",
448                params![now.to_rfc3339(), status, bounded_output, job.id],
449            )
450            .context("Failed to disable completed one-shot cron job")?;
451            Ok(())
452        })
453    } else {
454        let next_run = next_run_for_schedule(&job.schedule, now)?;
455        with_initialized_connection(config, |conn| {
456            conn.execute(
457                "UPDATE cron_jobs
458                 SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
459                 WHERE id = ?5",
460                params![
461                    next_run.to_rfc3339(),
462                    now.to_rfc3339(),
463                    status,
464                    bounded_output,
465                    job.id
466                ],
467            )
468            .context("Failed to update cron job run state")?;
469            Ok(())
470        })
471    }
472}
473
474/// Advance `next_run` of an overdue recurring job to its next future
475/// occurrence without executing the missed run.  For one-shot `At` jobs
476/// there is no future occurrence, so the job is disabled and its last
477/// status is recorded as `skipped`.
478///
479/// Called at scheduler startup when `catch_up_on_startup` is disabled,
480/// so the subsequent normal polling loop won't pick up jobs whose
481/// `next_run` is still in the past.
482pub fn skip_missed_run(config: &Config, job: &CronJob, now: DateTime<Utc>) -> Result<()> {
483    if matches!(job.schedule, Schedule::At { .. }) {
484        // One-shot job whose scheduled moment has already passed —
485        // disable it so it won't execute late.
486        let bounded_output = truncate_cron_output("skipped — catch_up_on_startup disabled");
487        with_initialized_connection(config, |conn| {
488            conn.execute(
489                "UPDATE cron_jobs
490                 SET enabled = 0, last_run = ?1, last_status = 'skipped', last_output = ?2
491                 WHERE id = ?3",
492                params![now.to_rfc3339(), bounded_output, job.id],
493            )
494            .context("Failed to disable overdue one-shot cron job on startup skip")?;
495            Ok(())
496        })
497    } else {
498        // Recurring job — advance next_run to the next future occurrence.
499        let next_run = next_run_for_schedule(&job.schedule, now)?;
500        with_initialized_connection(config, |conn| {
501            conn.execute(
502                "UPDATE cron_jobs SET next_run = ?1 WHERE id = ?2",
503                params![next_run.to_rfc3339(), job.id],
504            )
505            .context("Failed to advance next_run on startup skip")?;
506            Ok(())
507        })
508    }
509}
510
511pub fn record_run(
512    config: &Config,
513    job_id: &str,
514    started_at: DateTime<Utc>,
515    finished_at: DateTime<Utc>,
516    status: &str,
517    output: Option<&str>,
518    duration_ms: i64,
519) -> Result<()> {
520    let bounded_output = output.map(truncate_cron_output);
521    with_initialized_connection(config, |conn| {
522        // Wrap INSERT + pruning DELETE in an explicit transaction so that
523        // if the DELETE fails, the INSERT is rolled back and the run table
524        // cannot grow unboundedly.
525        let tx = conn.unchecked_transaction()?;
526
527        insert_run_and_prune(
528            &tx,
529            config,
530            job_id,
531            started_at,
532            finished_at,
533            status,
534            bounded_output.as_deref(),
535            duration_ms,
536        )?;
537
538        tx.commit()
539            .context("Failed to commit cron run transaction")?;
540        Ok(())
541    })
542}
543
544#[allow(clippy::too_many_arguments)]
545pub(crate) fn persist_run_result(
546    config: &Config,
547    job: &CronJob,
548    started_at: DateTime<Utc>,
549    finished_at: DateTime<Utc>,
550    job_state_at: DateTime<Utc>,
551    status: &str,
552    output: Option<&str>,
553    duration_ms: i64,
554    action: RunCompletionAction,
555) -> Result<()> {
556    let bounded_output = output.map(truncate_cron_output);
557
558    with_initialized_connection(config, |conn| {
559        let tx = conn.unchecked_transaction()?;
560
561        insert_run_and_prune(
562            &tx,
563            config,
564            &job.id,
565            started_at,
566            finished_at,
567            status,
568            bounded_output.as_deref(),
569            duration_ms,
570        )?;
571
572        apply_run_completion_state(
573            &tx,
574            job,
575            job_state_at,
576            status,
577            bounded_output.as_deref(),
578            action,
579        )?;
580
581        tx.commit()
582            .context("Failed to commit cron run result transaction")?;
583        Ok(())
584    })
585}
586
587/// Persist only the job-state side of a completed cron run.
588///
589/// This is intentionally separate from `persist_run_result` so the scheduler
590/// can recover job state even when run-history persistence fails. The SQL
591/// mutation itself stays in the store layer.
592pub(crate) fn persist_run_completion_state(
593    config: &Config,
594    job: &CronJob,
595    job_state_at: DateTime<Utc>,
596    status: &str,
597    output: Option<&str>,
598    action: RunCompletionAction,
599) -> Result<()> {
600    with_initialized_connection(config, |conn| {
601        apply_run_completion_state(conn, job, job_state_at, status, output, action)
602    })
603}
604
605#[allow(clippy::too_many_arguments)]
606fn insert_run_and_prune(
607    conn: &Connection,
608    config: &Config,
609    job_id: &str,
610    started_at: DateTime<Utc>,
611    finished_at: DateTime<Utc>,
612    status: &str,
613    output: Option<&str>,
614    duration_ms: i64,
615) -> Result<()> {
616    conn.execute(
617        "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
618         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
619        params![
620            job_id,
621            started_at.to_rfc3339(),
622            finished_at.to_rfc3339(),
623            status,
624            output,
625            duration_ms,
626        ],
627    )
628    .context("Failed to insert cron run")?;
629
630    let keep = i64::from(config.scheduler.max_run_history.max(1));
631    conn.execute(
632        "DELETE FROM cron_runs
633         WHERE job_id = ?1
634           AND id NOT IN (
635             SELECT id FROM cron_runs
636             WHERE job_id = ?1
637             ORDER BY started_at DESC, id DESC
638             LIMIT ?2
639           )",
640        params![job_id, keep],
641    )
642    .context("Failed to prune cron run history")?;
643
644    Ok(())
645}
646
647fn truncate_cron_output(output: &str) -> String {
648    if output.len() <= MAX_CRON_OUTPUT_BYTES {
649        return output.to_string();
650    }
651
652    if MAX_CRON_OUTPUT_BYTES <= TRUNCATED_OUTPUT_MARKER.len() {
653        return TRUNCATED_OUTPUT_MARKER.to_string();
654    }
655
656    let mut cutoff = MAX_CRON_OUTPUT_BYTES - TRUNCATED_OUTPUT_MARKER.len();
657    while cutoff > 0 && !output.is_char_boundary(cutoff) {
658        cutoff -= 1;
659    }
660
661    let mut truncated = output[..cutoff].to_string();
662    truncated.push_str(TRUNCATED_OUTPUT_MARKER);
663    truncated
664}
665
666pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result<Vec<CronRun>> {
667    let Some(runs) = with_read_connection(config, |conn| {
668        let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
669        let mut stmt = conn.prepare(
670            "SELECT id, job_id, started_at, finished_at, status, output, duration_ms
671             FROM cron_runs
672             WHERE job_id = ?1
673             ORDER BY started_at DESC, id DESC
674             LIMIT ?2",
675        )?;
676
677        let rows = stmt.query_map(params![job_id, lim], |row| {
678            Ok(CronRun {
679                id: row.get(0)?,
680                job_id: row.get(1)?,
681                started_at: parse_rfc3339(&row.get::<_, String>(2)?)
682                    .map_err(sql_conversion_error)?,
683                finished_at: parse_rfc3339(&row.get::<_, String>(3)?)
684                    .map_err(sql_conversion_error)?,
685                status: row.get(4)?,
686                output: row.get(5)?,
687                duration_ms: row.get(6)?,
688            })
689        })?;
690
691        let mut runs = Vec::new();
692        for row in rows {
693            runs.push(row?);
694        }
695        Ok(runs)
696    })?
697    else {
698        return Ok(Vec::new());
699    };
700
701    Ok(runs)
702}
703
704fn parse_rfc3339(raw: &str) -> Result<DateTime<Utc>> {
705    let parsed = DateTime::parse_from_rfc3339(raw)
706        .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?;
707    Ok(parsed.with_timezone(&Utc))
708}
709
710fn sql_conversion_error(err: anyhow::Error) -> rusqlite::Error {
711    rusqlite::Error::ToSqlConversionFailure(err.into())
712}
713
714fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
715    let expression: String = row.get(1)?;
716    let schedule_raw: Option<String> = row.get(3)?;
717    let schedule =
718        decode_schedule(schedule_raw.as_deref(), &expression).map_err(sql_conversion_error)?;
719
720    let delivery_raw: Option<String> = row.get(10)?;
721    let delivery = decode_delivery(delivery_raw.as_deref()).map_err(sql_conversion_error)?;
722
723    let next_run_raw: String = row.get(13)?;
724    let last_run_raw: Option<String> = row.get(14)?;
725    let created_at_raw: String = row.get(12)?;
726    let allowed_tools_raw: Option<String> = row.get(17)?;
727    let source: Option<String> = row.get(18)?;
728    let uses_memory: Option<i64> = row.get(19)?;
729    let agent_alias: Option<String> = row.get(20)?;
730
731    Ok(CronJob {
732        id: row.get(0)?,
733        expression,
734        schedule,
735        command: row.get(2)?,
736        job_type: row.get(4)?,
737        prompt: row.get(5)?,
738        name: row.get(6)?,
739        session_target: SessionTarget::parse(&row.get::<_, String>(7)?),
740        model: row.get(8)?,
741        agent_alias: agent_alias
742            .map(|s| s.trim().to_string())
743            .unwrap_or_default(),
744        enabled: row.get::<_, i64>(9)? != 0,
745        delivery,
746        delete_after_run: row.get::<_, i64>(11)? != 0,
747        source: source.unwrap_or_else(|| "imperative".to_string()),
748        uses_memory: uses_memory != Some(0),
749        created_at: parse_rfc3339(&created_at_raw).map_err(sql_conversion_error)?,
750        next_run: parse_rfc3339(&next_run_raw).map_err(sql_conversion_error)?,
751        last_run: match last_run_raw {
752            Some(raw) => Some(parse_rfc3339(&raw).map_err(sql_conversion_error)?),
753            None => None,
754        },
755        last_status: row.get(15)?,
756        last_output: row.get(16)?,
757        allowed_tools: decode_allowed_tools(allowed_tools_raw.as_deref())
758            .map_err(sql_conversion_error)?,
759    })
760}
761
762fn decode_schedule(schedule_raw: Option<&str>, expression: &str) -> Result<Schedule> {
763    if let Some(raw) = schedule_raw {
764        let trimmed = raw.trim();
765        if !trimmed.is_empty() {
766            return serde_json::from_str(trimmed)
767                .with_context(|| format!("Failed to parse cron schedule JSON: {trimmed}"));
768        }
769    }
770
771    if expression.trim().is_empty() {
772        anyhow::bail!("Missing schedule and legacy expression for cron job")
773    }
774
775    Ok(Schedule::Cron {
776        expr: expression.to_string(),
777        tz: None,
778    })
779}
780
781fn decode_delivery(delivery_raw: Option<&str>) -> Result<DeliveryConfig> {
782    if let Some(raw) = delivery_raw {
783        let trimmed = raw.trim();
784        if !trimmed.is_empty() {
785            return serde_json::from_str(trimmed)
786                .with_context(|| format!("Failed to parse cron delivery JSON: {trimmed}"));
787        }
788    }
789    Ok(DeliveryConfig::default())
790}
791
792fn encode_allowed_tools(allowed_tools: Option<&Vec<String>>) -> Result<Option<String>> {
793    allowed_tools
794        .map(serde_json::to_string)
795        .transpose()
796        .context("Failed to serialize cron allowed_tools")
797}
798
799fn decode_allowed_tools(raw: Option<&str>) -> Result<Option<Vec<String>>> {
800    if let Some(raw) = raw {
801        let trimmed = raw.trim();
802        if !trimmed.is_empty() {
803            return serde_json::from_str(trimmed)
804                .map(Some)
805                .with_context(|| format!("Failed to parse cron allowed_tools JSON: {trimmed}"));
806        }
807    }
808    Ok(None)
809}
810
811/// Synchronize declarative cron job definitions from config into the database.
812///
813/// For each declarative job (identified by `id`):
814/// - If the job exists in DB: update it to match the config definition.
815/// - If the job does not exist: insert it.
816///
817/// Jobs created imperatively (via CLI/API) are never modified or deleted.
818/// Declarative jobs that are no longer present in config are removed.
819pub fn sync_declarative_jobs(
820    config: &Config,
821    decls: &std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl>,
822) -> Result<()> {
823    use zeroclaw_config::schema::CronScheduleDecl;
824
825    if decls.is_empty() {
826        // If no declarative jobs are defined, clean up previously synced
827        // declarative jobs only when cron storage already exists. A fresh
828        // workspace with nothing to sync should stay DB-free on daemon start.
829        let _ = with_existing_initialized_connection(config, |conn| {
830            let deleted = conn
831                .execute("DELETE FROM cron_jobs WHERE source = 'declarative'", [])
832                .context("Failed to remove stale declarative cron jobs")?;
833            if deleted > 0 {
834                ::zeroclaw_log::record!(
835                    INFO,
836                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
837                        .with_attrs(::serde_json::json!({"count": deleted})),
838                    "Removed declarative cron jobs no longer in config"
839                );
840            }
841            Ok(())
842        })?;
843        return Ok(());
844    }
845
846    // Validate declarations before touching the DB.
847    for (id, decl) in decls {
848        validate_decl(id, decl)?;
849    }
850
851    let now = Utc::now();
852
853    with_initialized_connection(config, |conn| {
854        // Collect IDs of all declarative jobs currently defined in config.
855        let config_ids: std::collections::HashSet<&str> =
856            decls.keys().map(String::as_str).collect();
857
858        // Remove declarative jobs no longer in config.
859        {
860            let mut stmt = conn.prepare("SELECT id FROM cron_jobs WHERE source = 'declarative'")?;
861            let db_ids: Vec<String> = stmt
862                .query_map([], |row| row.get(0))?
863                .filter_map(|r| r.ok())
864                .collect();
865
866            for db_id in &db_ids {
867                if !config_ids.contains(db_id.as_str()) {
868                    conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![db_id])
869                        .with_context(|| {
870                            format!("Failed to remove stale declarative cron job '{db_id}'")
871                        })?;
872                    ::zeroclaw_log::record!(
873                        INFO,
874                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
875                            .with_attrs(::serde_json::json!({"job_id": db_id})),
876                        "Removed declarative cron job no longer in config"
877                    );
878                }
879            }
880        }
881
882        for (id, decl) in decls {
883            let schedule = convert_schedule_decl(&decl.schedule)?;
884            let expression = schedule_cron_expression(&schedule).unwrap_or_default();
885            let schedule_json = serde_json::to_string(&schedule)?;
886            let job_type = &decl.job_type;
887            let session_target = decl.session_target.as_deref().unwrap_or("isolated");
888            let delivery = match &decl.delivery {
889                Some(d) => convert_delivery_decl(d),
890                None => DeliveryConfig::default(),
891            };
892            let delivery_json = serde_json::to_string(&delivery)?;
893            let allowed_tools_json = encode_allowed_tools(decl.allowed_tools.as_ref())?;
894            let command = decl.command.as_deref().unwrap_or("");
895            let delete_after_run = matches!(decl.schedule, CronScheduleDecl::At { .. });
896
897            // Check if job already exists.
898            let exists: bool = conn
899                .prepare("SELECT COUNT(*) FROM cron_jobs WHERE id = ?1")?
900                .query_row(params![id], |row| row.get::<_, i64>(0))
901                .map(|c| c > 0)
902                .unwrap_or(false);
903
904            if exists {
905                // Update existing declarative job — preserve runtime state
906                // (next_run, last_run, last_status, last_output, created_at).
907                // Only update the schedule's next_run if the schedule itself changed.
908                let current_schedule_raw: Option<String> = conn
909                    .prepare("SELECT schedule FROM cron_jobs WHERE id = ?1")?
910                    .query_row(params![id], |row| row.get(0))
911                    .ok();
912
913                let schedule_changed = current_schedule_raw.as_deref() != Some(&schedule_json);
914
915                if schedule_changed {
916                    let next_run = next_run_for_schedule(&schedule, now)?;
917                    conn.execute(
918                        "UPDATE cron_jobs
919                         SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
920                             prompt = ?5, name = ?6, session_target = ?7, model = ?8,
921                             enabled = ?9, delivery = ?10, delete_after_run = ?11,
922                             allowed_tools = ?12, source = 'declarative', next_run = ?13,
923                             uses_memory = ?14
924                         WHERE id = ?15",
925                        params![
926                            expression,
927                            command,
928                            schedule_json,
929                            job_type,
930                            decl.prompt,
931                            decl.name,
932                            session_target,
933                            decl.model,
934                            i32::from(decl.enabled),
935                            delivery_json,
936                            i32::from(delete_after_run),
937                            allowed_tools_json,
938                            next_run.to_rfc3339(),
939                            i32::from(decl.uses_memory),
940                            id,
941                        ],
942                    )
943                    .with_context(|| format!("Failed to update declarative cron job '{id}'"))?;
944                } else {
945                    conn.execute(
946                        "UPDATE cron_jobs
947                         SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
948                             prompt = ?5, name = ?6, session_target = ?7, model = ?8,
949                             enabled = ?9, delivery = ?10, delete_after_run = ?11,
950                             allowed_tools = ?12, source = 'declarative',
951                             uses_memory = ?13
952                         WHERE id = ?14",
953                        params![
954                            expression,
955                            command,
956                            schedule_json,
957                            job_type,
958                            decl.prompt,
959                            decl.name,
960                            session_target,
961                            decl.model,
962                            i32::from(decl.enabled),
963                            delivery_json,
964                            i32::from(delete_after_run),
965                            allowed_tools_json,
966                            i32::from(decl.uses_memory),
967                            id,
968                        ],
969                    )
970                    .with_context(|| format!("Failed to update declarative cron job '{id}'"))?;
971                }
972
973                ::zeroclaw_log::record!(
974                    DEBUG,
975                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
976                        .with_attrs(::serde_json::json!({"job_id": id})),
977                    "Updated declarative cron job"
978                );
979            } else {
980                // Reverse-resolve the owning agent from
981                // `[agents.<x>].cron_jobs` membership. Orphan declarative
982                // entries that no agent claims are skipped with a warning
983                // rather than silently bound to a magic alias.
984                let Some(agent_alias) = config.agent_for_cron_job(id) else {
985                    ::zeroclaw_log::record!(
986                        WARN,
987                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
988                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
989                            .with_attrs(::serde_json::json!({"job_id": id})),
990                        "Skipping declarative cron job: no [agents.<x>].cron_jobs entry claims this id"
991                    );
992                    continue;
993                };
994                let next_run = next_run_for_schedule(&schedule, now)?;
995                conn.execute(
996                    "INSERT INTO cron_jobs (
997                        id, expression, command, schedule, job_type, prompt, name,
998                        session_target, model, enabled, delivery, delete_after_run,
999                        allowed_tools, source, uses_memory, agent_alias, created_at, next_run
1000                     ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, 'declarative', ?14, ?15, ?16, ?17)",
1001                    params![
1002                        id,
1003                        expression,
1004                        command,
1005                        schedule_json,
1006                        job_type,
1007                        decl.prompt,
1008                        decl.name,
1009                        session_target,
1010                        decl.model,
1011                        i32::from(decl.enabled),
1012                        delivery_json,
1013                        i32::from(delete_after_run),
1014                        allowed_tools_json,
1015                        i32::from(decl.uses_memory),
1016                        agent_alias,
1017                        now.to_rfc3339(),
1018                        next_run.to_rfc3339(),
1019                    ],
1020                )
1021                .with_context(|| {
1022                    format!("Failed to insert declarative cron job '{id}'")
1023                })?;
1024
1025                ::zeroclaw_log::record!(
1026                    INFO,
1027                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1028                        .with_attrs(::serde_json::json!({"job_id": id})),
1029                    "Inserted declarative cron job from config"
1030                );
1031            }
1032        }
1033
1034        Ok(())
1035    })
1036}
1037
1038/// Validate a declarative cron job definition.
1039fn validate_decl(id: &str, decl: &zeroclaw_config::schema::CronJobDecl) -> Result<()> {
1040    if id.trim().is_empty() {
1041        anyhow::bail!("Declarative cron job has empty id");
1042    }
1043
1044    match decl.job_type.to_lowercase().as_str() {
1045        "shell" => {
1046            if decl.command.as_deref().is_none_or(|c| c.trim().is_empty()) {
1047                anyhow::bail!(
1048                    "Declarative cron job '{id}': shell job requires a non-empty 'command'"
1049                );
1050            }
1051        }
1052        "agent" => {
1053            if decl.prompt.as_deref().is_none_or(|p| p.trim().is_empty()) {
1054                anyhow::bail!(
1055                    "Declarative cron job '{id}': agent job requires a non-empty 'prompt'"
1056                );
1057            }
1058        }
1059        other => {
1060            anyhow::bail!(
1061                "Declarative cron job '{id}': invalid job_type '{other}', expected 'shell' or 'agent'"
1062            );
1063        }
1064    }
1065
1066    Ok(())
1067}
1068
1069/// Convert a `CronScheduleDecl` to the runtime `Schedule` type.
1070fn convert_schedule_decl(decl: &zeroclaw_config::schema::CronScheduleDecl) -> Result<Schedule> {
1071    use zeroclaw_config::schema::CronScheduleDecl;
1072    match decl {
1073        CronScheduleDecl::Cron { expr, tz } => Ok(Schedule::Cron {
1074            expr: expr.clone(),
1075            tz: tz.clone(),
1076        }),
1077        CronScheduleDecl::Every { every_ms } => Ok(Schedule::Every {
1078            every_ms: *every_ms,
1079        }),
1080        CronScheduleDecl::At { at } => {
1081            let parsed = DateTime::parse_from_rfc3339(at)
1082                .with_context(|| {
1083                    format!("Invalid RFC3339 timestamp in declarative cron 'at': {at}")
1084                })?
1085                .with_timezone(&Utc);
1086            Ok(Schedule::At { at: parsed })
1087        }
1088    }
1089}
1090
1091/// Convert a `DeliveryConfigDecl` to the runtime `DeliveryConfig`.
1092fn convert_delivery_decl(decl: &zeroclaw_config::schema::DeliveryConfigDecl) -> DeliveryConfig {
1093    DeliveryConfig {
1094        mode: decl.mode.clone(),
1095        channel: decl.channel.clone(),
1096        to: decl.to.clone(),
1097        thread_id: decl.thread_id.clone(),
1098        best_effort: decl.best_effort,
1099    }
1100}
1101
1102fn add_column_if_missing(conn: &Connection, name: &str, sql_type: &str) -> Result<()> {
1103    let mut stmt = conn.prepare("PRAGMA table_info(cron_jobs)")?;
1104    let mut rows = stmt.query([])?;
1105    while let Some(row) = rows.next()? {
1106        let col_name: String = row.get(1)?;
1107        if col_name == name {
1108            return Ok(());
1109        }
1110    }
1111    // Drop the statement/rows before executing ALTER to release any locks
1112    drop(rows);
1113    drop(stmt);
1114
1115    // Tolerate "duplicate column name" errors to handle the race where
1116    // another process adds the column between our PRAGMA check and ALTER.
1117    match conn.execute(
1118        &format!("ALTER TABLE cron_jobs ADD COLUMN {name} {sql_type}"),
1119        [],
1120    ) {
1121        Ok(_) => Ok(()),
1122        Err(rusqlite::Error::SqliteFailure(err, Some(ref msg)))
1123            if msg.contains("duplicate column name") =>
1124        {
1125            ::zeroclaw_log::record!(
1126                DEBUG,
1127                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1128                    .with_attrs(::serde_json::json!({"error": format!("{}", err), "name": name})),
1129                "Column cron_jobs. already exists (concurrent migration)"
1130            );
1131            Ok(())
1132        }
1133        Err(e) => Err(e).with_context(|| format!("Failed to add cron_jobs.{name}")),
1134    }
1135}
1136
1137fn cron_db_path(config: &Config) -> std::path::PathBuf {
1138    config.data_dir.join("cron").join("jobs.db")
1139}
1140
1141// Read paths must not create the cron directory or jobs.db. If the DB already
1142// exists, however, reads still need the lightweight schema/migration ensure
1143// step before selecting columns added by newer releases.
1144fn with_read_connection<T>(
1145    config: &Config,
1146    f: impl FnOnce(&Connection) -> Result<T>,
1147) -> Result<Option<T>> {
1148    with_existing_initialized_connection(config, f)
1149}
1150
1151fn with_existing_initialized_connection<T>(
1152    config: &Config,
1153    f: impl FnOnce(&Connection) -> Result<T>,
1154) -> Result<Option<T>> {
1155    let db_path = cron_db_path(config);
1156    if !db_path.exists() {
1157        return Ok(None);
1158    }
1159
1160    let conn = Connection::open_with_flags(
1161        &db_path,
1162        OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
1163    )
1164    .with_context(|| {
1165        format!(
1166            "Failed to open existing cron DB: {}",
1167            db_path.display().to_string()
1168        )
1169    })?;
1170
1171    initialize_schema(&conn)?;
1172
1173    f(&conn).map(Some)
1174}
1175
1176fn with_initialized_connection<T>(
1177    config: &Config,
1178    f: impl FnOnce(&Connection) -> Result<T>,
1179) -> Result<T> {
1180    let db_path = cron_db_path(config);
1181    #[cfg(test)]
1182    {
1183        let mut counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
1184            .lock()
1185            .unwrap_or_else(|e| e.into_inner());
1186        if let Some(count) = counts.get_mut(&db_path) {
1187            *count += 1;
1188        }
1189    }
1190
1191    if let Some(parent) = db_path.parent() {
1192        std::fs::create_dir_all(parent).with_context(|| {
1193            format!(
1194                "Failed to create cron directory: {}",
1195                parent.display().to_string()
1196            )
1197        })?;
1198    }
1199
1200    let conn = Connection::open(&db_path)
1201        .with_context(|| format!("Failed to open cron DB: {}", db_path.display().to_string()))?;
1202
1203    initialize_schema(&conn)?;
1204
1205    f(&conn)
1206}
1207
1208/// Apply the completion state change for a cron job inside an existing connection.
1209///
1210/// This keeps the scheduler's normal path and the fallback path using the same
1211/// SQL mutation logic while allowing the caller to decide whether the
1212/// run-history write should be attempted first.
1213fn apply_run_completion_state(
1214    conn: &Connection,
1215    job: &CronJob,
1216    job_state_at: DateTime<Utc>,
1217    status: &str,
1218    output: Option<&str>,
1219    action: RunCompletionAction,
1220) -> Result<()> {
1221    let bounded_output = output.map(truncate_cron_output);
1222
1223    match action {
1224        RunCompletionAction::Reschedule => {
1225            let next_run = next_run_for_schedule(&job.schedule, job_state_at)?;
1226            let changed = conn
1227                .execute(
1228                    "UPDATE cron_jobs
1229                     SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
1230                     WHERE id = ?5",
1231                    params![
1232                        next_run.to_rfc3339(),
1233                        job_state_at.to_rfc3339(),
1234                        status,
1235                        bounded_output.as_deref(),
1236                        job.id,
1237                    ],
1238                )
1239                .context("Failed to update cron job run state")?;
1240            if changed == 0 {
1241                anyhow::bail!("Cron job '{}' not found", job.id);
1242            }
1243        }
1244        RunCompletionAction::Disable => {
1245            let changed = conn
1246                .execute(
1247                    "UPDATE cron_jobs
1248                     SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
1249                     WHERE id = ?4",
1250                    params![
1251                        job_state_at.to_rfc3339(),
1252                        status,
1253                        bounded_output.as_deref(),
1254                        job.id,
1255                    ],
1256                )
1257                .context("Failed to disable completed one-shot cron job")?;
1258            if changed == 0 {
1259                anyhow::bail!("Cron job '{}' not found", job.id);
1260            }
1261        }
1262        RunCompletionAction::Delete => {
1263            let changed = conn
1264                .execute("DELETE FROM cron_jobs WHERE id = ?1", params![job.id])
1265                .context("Failed to delete completed one-shot cron job")?;
1266            if changed == 0 {
1267                anyhow::bail!("Cron job '{}' not found", job.id);
1268            }
1269        }
1270    }
1271
1272    Ok(())
1273}
1274
1275fn initialize_schema(conn: &Connection) -> Result<()> {
1276    conn.execute_batch(
1277        "PRAGMA foreign_keys = ON;
1278         CREATE TABLE IF NOT EXISTS cron_jobs (
1279            id               TEXT PRIMARY KEY,
1280            expression       TEXT NOT NULL,
1281            command          TEXT NOT NULL,
1282            schedule         TEXT,
1283            job_type         TEXT NOT NULL DEFAULT 'shell',
1284            prompt           TEXT,
1285            name             TEXT,
1286            session_target   TEXT NOT NULL DEFAULT 'isolated',
1287            model            TEXT,
1288            enabled          INTEGER NOT NULL DEFAULT 1,
1289            delivery         TEXT,
1290            delete_after_run INTEGER NOT NULL DEFAULT 0,
1291            allowed_tools    TEXT,
1292            created_at       TEXT NOT NULL,
1293            next_run         TEXT NOT NULL,
1294            last_run         TEXT,
1295            last_status      TEXT,
1296            last_output      TEXT
1297        );
1298        CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run);
1299
1300        CREATE TABLE IF NOT EXISTS cron_runs (
1301            id          INTEGER PRIMARY KEY AUTOINCREMENT,
1302            job_id      TEXT NOT NULL,
1303            started_at  TEXT NOT NULL,
1304            finished_at TEXT NOT NULL,
1305            status      TEXT NOT NULL,
1306            output      TEXT,
1307            duration_ms INTEGER,
1308            FOREIGN KEY (job_id) REFERENCES cron_jobs(id) ON DELETE CASCADE
1309        );
1310        CREATE INDEX IF NOT EXISTS idx_cron_runs_job_id ON cron_runs(job_id);
1311        CREATE INDEX IF NOT EXISTS idx_cron_runs_started_at ON cron_runs(started_at);
1312        CREATE INDEX IF NOT EXISTS idx_cron_runs_job_started ON cron_runs(job_id, started_at);",
1313    )
1314    .context("Failed to initialize cron schema")?;
1315
1316    add_column_if_missing(conn, "schedule", "TEXT")?;
1317    add_column_if_missing(conn, "job_type", "TEXT NOT NULL DEFAULT 'shell'")?;
1318    add_column_if_missing(conn, "prompt", "TEXT")?;
1319    add_column_if_missing(conn, "name", "TEXT")?;
1320    add_column_if_missing(conn, "session_target", "TEXT NOT NULL DEFAULT 'isolated'")?;
1321    add_column_if_missing(conn, "model", "TEXT")?;
1322    add_column_if_missing(conn, "enabled", "INTEGER NOT NULL DEFAULT 1")?;
1323    add_column_if_missing(conn, "delivery", "TEXT")?;
1324    add_column_if_missing(conn, "delete_after_run", "INTEGER NOT NULL DEFAULT 0")?;
1325    add_column_if_missing(conn, "allowed_tools", "TEXT")?;
1326    add_column_if_missing(conn, "source", "TEXT DEFAULT 'imperative'")?;
1327    add_column_if_missing(conn, "uses_memory", "INTEGER NOT NULL DEFAULT 1")?;
1328    // Rows written before the column existed get an empty alias; the
1329    // scheduler treats those as orphans (skip with warning) rather than
1330    // coercing them to a magic alias.
1331    add_column_if_missing(conn, "agent_alias", "TEXT NOT NULL DEFAULT ''")?;
1332
1333    Ok(())
1334}
1335
1336#[cfg(test)]
1337mod tests {
1338    use super::*;
1339    use chrono::Duration as ChronoDuration;
1340    use tempfile::TempDir;
1341    use zeroclaw_config::schema::Config;
1342
1343    fn test_config(tmp: &TempDir) -> Config {
1344        let config = Config {
1345            data_dir: tmp.path().join("data"),
1346            config_path: tmp.path().join("config.toml"),
1347            ..Config::default()
1348        };
1349        std::fs::create_dir_all(&config.data_dir).unwrap();
1350        config
1351    }
1352
1353    fn cron_dir(config: &Config) -> std::path::PathBuf {
1354        config.data_dir.join("cron")
1355    }
1356
1357    fn cron_db(config: &Config) -> std::path::PathBuf {
1358        cron_dir(config).join("jobs.db")
1359    }
1360
1361    #[test]
1362    fn read_only_queries_on_empty_workspace_do_not_initialize_cron_db() {
1363        let tmp = TempDir::new().unwrap();
1364        let config = test_config(&tmp);
1365
1366        assert!(list_jobs(&config).unwrap().is_empty());
1367        assert!(due_jobs(&config, Utc::now()).unwrap().is_empty());
1368        assert!(all_overdue_jobs(&config, Utc::now()).unwrap().is_empty());
1369        assert!(list_runs(&config, "missing", 10).unwrap().is_empty());
1370
1371        let err = get_job(&config, "missing").unwrap_err();
1372        assert!(err.to_string().contains("not found"));
1373
1374        assert!(
1375            !cron_dir(&config).exists(),
1376            "read-only queries should not create the cron directory"
1377        );
1378        assert!(
1379            !cron_db(&config).exists(),
1380            "read-only queries should not create jobs.db"
1381        );
1382    }
1383
1384    #[test]
1385    fn first_write_initializes_schema_and_follow_up_reads_work() {
1386        let tmp = TempDir::new().unwrap();
1387        let config = test_config(&tmp);
1388
1389        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1390
1391        assert!(cron_db(&config).exists());
1392        assert_eq!(get_job(&config, &job.id).unwrap().id, job.id);
1393        assert_eq!(list_jobs(&config).unwrap().len(), 1);
1394    }
1395
1396    #[test]
1397    fn empty_declarative_sync_on_empty_workspace_does_not_initialize_cron_db() {
1398        let tmp = TempDir::new().unwrap();
1399        let config = test_config(&tmp);
1400
1401        sync_declarative_jobs(&config, &std::collections::HashMap::new()).unwrap();
1402
1403        assert!(
1404            !cron_dir(&config).exists(),
1405            "empty declarative sync should not create the cron directory"
1406        );
1407        assert!(
1408            !cron_db(&config).exists(),
1409            "empty declarative sync should not create jobs.db"
1410        );
1411    }
1412
1413    #[test]
1414    fn read_existing_old_schema_db_migrates_before_querying_new_columns() {
1415        let tmp = TempDir::new().unwrap();
1416        let config = test_config(&tmp);
1417        let cron_dir = cron_dir(&config);
1418        std::fs::create_dir_all(&cron_dir).unwrap();
1419        let db_path = cron_db(&config);
1420        let conn = Connection::open(&db_path).unwrap();
1421        conn.execute_batch(
1422            "CREATE TABLE cron_jobs (
1423                id               TEXT PRIMARY KEY,
1424                expression       TEXT NOT NULL,
1425                command          TEXT NOT NULL,
1426                schedule         TEXT,
1427                job_type         TEXT NOT NULL DEFAULT 'shell',
1428                prompt           TEXT,
1429                name             TEXT,
1430                session_target   TEXT NOT NULL DEFAULT 'isolated',
1431                model            TEXT,
1432                enabled          INTEGER NOT NULL DEFAULT 1,
1433                delivery         TEXT,
1434                delete_after_run INTEGER NOT NULL DEFAULT 0,
1435                allowed_tools    TEXT,
1436                created_at       TEXT NOT NULL,
1437                next_run         TEXT NOT NULL,
1438                last_run         TEXT,
1439                last_status      TEXT,
1440                last_output      TEXT
1441            );",
1442        )
1443        .unwrap();
1444        conn.execute(
1445            "INSERT INTO cron_jobs (
1446                id, expression, command, schedule, job_type, session_target,
1447                enabled, delete_after_run, created_at, next_run
1448             ) VALUES (?1, ?2, ?3, ?4, 'shell', 'isolated', 1, 0, ?5, ?6)",
1449            params![
1450                "legacy-schema",
1451                "*/5 * * * *",
1452                "echo legacy",
1453                Option::<String>::None,
1454                Utc::now().to_rfc3339(),
1455                (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1456            ],
1457        )
1458        .unwrap();
1459        drop(conn);
1460
1461        let jobs = list_jobs(&config).unwrap();
1462
1463        assert_eq!(jobs.len(), 1);
1464        assert_eq!(jobs[0].id, "legacy-schema");
1465        assert_eq!(jobs[0].source, "imperative");
1466        assert!(jobs[0].uses_memory);
1467
1468        let conn = Connection::open(&db_path).unwrap();
1469        let columns: Vec<String> = conn
1470            .prepare("PRAGMA table_info(cron_jobs)")
1471            .unwrap()
1472            .query_map([], |row| row.get(1))
1473            .unwrap()
1474            .collect::<Result<_, _>>()
1475            .unwrap();
1476        assert!(columns.iter().any(|name| name == "source"));
1477        assert!(columns.iter().any(|name| name == "uses_memory"));
1478    }
1479
1480    #[test]
1481    fn add_job_accepts_five_field_expression() {
1482        let tmp = TempDir::new().unwrap();
1483        let config = test_config(&tmp);
1484
1485        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1486        assert_eq!(job.expression, "*/5 * * * *");
1487        assert_eq!(job.command, "echo ok");
1488        assert!(matches!(job.schedule, Schedule::Cron { .. }));
1489    }
1490
1491    #[test]
1492    fn add_shell_job_marks_at_schedule_for_auto_delete() {
1493        let tmp = TempDir::new().unwrap();
1494        let config = test_config(&tmp);
1495
1496        let one_shot = add_shell_job(
1497            &config,
1498            "default",
1499            None,
1500            Schedule::At {
1501                at: Utc::now() + ChronoDuration::minutes(10),
1502            },
1503            "echo once",
1504            None,
1505        )
1506        .unwrap();
1507        assert!(one_shot.delete_after_run);
1508
1509        let recurring = add_shell_job(
1510            &config,
1511            "default",
1512            None,
1513            Schedule::Every { every_ms: 60_000 },
1514            "echo recurring",
1515            None,
1516        )
1517        .unwrap();
1518        assert!(!recurring.delete_after_run);
1519    }
1520
1521    #[test]
1522    fn add_shell_job_persists_delivery() {
1523        let tmp = TempDir::new().unwrap();
1524        let config = test_config(&tmp);
1525
1526        let job = add_shell_job(
1527            &config,
1528            "default",
1529            Some("deliver-shell".into()),
1530            Schedule::Cron {
1531                expr: "*/5 * * * *".into(),
1532                tz: None,
1533            },
1534            "echo delivered",
1535            Some(DeliveryConfig {
1536                mode: "announce".into(),
1537                channel: Some("discord".into()),
1538                to: Some("1234567890".into()),
1539                thread_id: None,
1540                best_effort: true,
1541            }),
1542        )
1543        .unwrap();
1544
1545        assert_eq!(job.delivery.mode, "announce");
1546        assert_eq!(job.delivery.channel.as_deref(), Some("discord"));
1547        assert_eq!(job.delivery.to.as_deref(), Some("1234567890"));
1548
1549        let stored = get_job(&config, &job.id).unwrap();
1550        assert_eq!(stored.delivery.mode, "announce");
1551        assert_eq!(stored.delivery.channel.as_deref(), Some("discord"));
1552        assert_eq!(stored.delivery.to.as_deref(), Some("1234567890"));
1553    }
1554
1555    #[test]
1556    fn add_agent_job_rejects_invalid_announce_delivery() {
1557        let tmp = TempDir::new().unwrap();
1558        let config = test_config(&tmp);
1559
1560        let err = add_agent_job(
1561            &config,
1562            "default",
1563            Some("deliver-agent".into()),
1564            Schedule::Cron {
1565                expr: "*/5 * * * *".into(),
1566                tz: None,
1567            },
1568            "summarize logs",
1569            SessionTarget::Isolated,
1570            None,
1571            Some(DeliveryConfig {
1572                mode: "announce".into(),
1573                channel: Some("discord".into()),
1574                to: None,
1575                thread_id: None,
1576                best_effort: true,
1577            }),
1578            false,
1579            None,
1580        )
1581        .unwrap_err();
1582
1583        assert!(err.to_string().contains("delivery.to is required"));
1584    }
1585
1586    #[test]
1587    fn add_shell_job_rejects_invalid_delivery_mode() {
1588        let tmp = TempDir::new().unwrap();
1589        let config = test_config(&tmp);
1590
1591        let err = add_shell_job(
1592            &config,
1593            "default",
1594            Some("deliver-shell".into()),
1595            Schedule::Cron {
1596                expr: "*/5 * * * *".into(),
1597                tz: None,
1598            },
1599            "echo delivered",
1600            Some(DeliveryConfig {
1601                mode: "annouce".into(),
1602                channel: Some("discord".into()),
1603                to: Some("1234567890".into()),
1604                thread_id: None,
1605                best_effort: true,
1606            }),
1607        )
1608        .unwrap_err();
1609
1610        assert!(err.to_string().contains("unsupported delivery mode"));
1611    }
1612
1613    #[test]
1614    fn add_list_remove_roundtrip() {
1615        let tmp = TempDir::new().unwrap();
1616        let config = test_config(&tmp);
1617
1618        let job = add_job(&config, "test-agent", "*/10 * * * *", "echo roundtrip").unwrap();
1619        let listed = list_jobs(&config).unwrap();
1620        assert_eq!(listed.len(), 1);
1621        assert_eq!(listed[0].id, job.id);
1622
1623        remove_job(&config, &job.id).unwrap();
1624        assert!(list_jobs(&config).unwrap().is_empty());
1625    }
1626
1627    #[test]
1628    fn due_jobs_filters_by_timestamp_and_enabled() {
1629        let tmp = TempDir::new().unwrap();
1630        let config = test_config(&tmp);
1631
1632        let job = add_job(&config, "test-agent", "* * * * *", "echo due").unwrap();
1633
1634        let due_now = due_jobs(&config, Utc::now()).unwrap();
1635        assert!(due_now.is_empty(), "new job should not be due immediately");
1636
1637        let far_future = Utc::now() + ChronoDuration::days(365);
1638        let due_future = due_jobs(&config, far_future).unwrap();
1639        assert_eq!(due_future.len(), 1, "job should be due in far future");
1640
1641        let _ = update_job(
1642            &config,
1643            &job.id,
1644            CronJobPatch {
1645                enabled: Some(false),
1646                ..CronJobPatch::default()
1647            },
1648        )
1649        .unwrap();
1650        let due_after_disable = due_jobs(&config, far_future).unwrap();
1651        assert!(due_after_disable.is_empty());
1652    }
1653
1654    #[test]
1655    fn due_jobs_respects_scheduler_max_tasks_limit() {
1656        let tmp = TempDir::new().unwrap();
1657        let mut config = test_config(&tmp);
1658        config.scheduler.max_tasks = 2;
1659
1660        let _ = add_job(&config, "test-agent", "* * * * *", "echo due-1").unwrap();
1661        let _ = add_job(&config, "test-agent", "* * * * *", "echo due-2").unwrap();
1662        let _ = add_job(&config, "test-agent", "* * * * *", "echo due-3").unwrap();
1663
1664        let far_future = Utc::now() + ChronoDuration::days(365);
1665        let due = due_jobs(&config, far_future).unwrap();
1666        assert_eq!(due.len(), 2);
1667    }
1668
1669    #[test]
1670    fn all_overdue_jobs_ignores_max_tasks_limit() {
1671        let tmp = TempDir::new().unwrap();
1672        let mut config = test_config(&tmp);
1673        config.scheduler.max_tasks = 2;
1674
1675        let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-1").unwrap();
1676        let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-2").unwrap();
1677        let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-3").unwrap();
1678
1679        let far_future = Utc::now() + ChronoDuration::days(365);
1680        // due_jobs respects the limit
1681        let due = due_jobs(&config, far_future).unwrap();
1682        assert_eq!(due.len(), 2);
1683        // all_overdue_jobs returns everything
1684        let overdue = all_overdue_jobs(&config, far_future).unwrap();
1685        assert_eq!(overdue.len(), 3);
1686    }
1687
1688    #[test]
1689    fn all_overdue_jobs_excludes_disabled_jobs() {
1690        let tmp = TempDir::new().unwrap();
1691        let config = test_config(&tmp);
1692
1693        let job = add_job(&config, "test-agent", "* * * * *", "echo disabled").unwrap();
1694        let _ = update_job(
1695            &config,
1696            &job.id,
1697            CronJobPatch {
1698                enabled: Some(false),
1699                ..CronJobPatch::default()
1700            },
1701        )
1702        .unwrap();
1703
1704        let far_future = Utc::now() + ChronoDuration::days(365);
1705        let overdue = all_overdue_jobs(&config, far_future).unwrap();
1706        assert!(overdue.is_empty());
1707    }
1708
1709    #[test]
1710    fn add_agent_job_persists_allowed_tools() {
1711        let tmp = TempDir::new().unwrap();
1712        let config = test_config(&tmp);
1713
1714        let job = add_agent_job(
1715            &config,
1716            "default",
1717            Some("agent".into()),
1718            Schedule::Every { every_ms: 60_000 },
1719            "do work",
1720            SessionTarget::Isolated,
1721            None,
1722            None,
1723            false,
1724            Some(vec!["file_read".into(), "web_search".into()]),
1725        )
1726        .unwrap();
1727
1728        assert_eq!(
1729            job.allowed_tools,
1730            Some(vec!["file_read".into(), "web_search".into()])
1731        );
1732
1733        let stored = get_job(&config, &job.id).unwrap();
1734        assert_eq!(stored.allowed_tools, job.allowed_tools);
1735    }
1736
1737    #[test]
1738    fn update_job_persists_allowed_tools_patch() {
1739        let tmp = TempDir::new().unwrap();
1740        let config = test_config(&tmp);
1741
1742        let job = add_agent_job(
1743            &config,
1744            "default",
1745            Some("agent".into()),
1746            Schedule::Every { every_ms: 60_000 },
1747            "do work",
1748            SessionTarget::Isolated,
1749            None,
1750            None,
1751            false,
1752            None,
1753        )
1754        .unwrap();
1755
1756        let updated = update_job(
1757            &config,
1758            &job.id,
1759            CronJobPatch {
1760                allowed_tools: Some(vec!["shell".into()]),
1761                ..CronJobPatch::default()
1762            },
1763        )
1764        .unwrap();
1765
1766        assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
1767        assert_eq!(
1768            get_job(&config, &job.id).unwrap().allowed_tools,
1769            Some(vec!["shell".into()])
1770        );
1771    }
1772
1773    #[test]
1774    fn reschedule_after_run_persists_last_status_and_last_run() {
1775        let tmp = TempDir::new().unwrap();
1776        let config = test_config(&tmp);
1777
1778        let job = add_job(&config, "test-agent", "*/15 * * * *", "echo run").unwrap();
1779        reschedule_after_run(&config, &job, false, "failed output").unwrap();
1780
1781        let listed = list_jobs(&config).unwrap();
1782        let stored = listed.iter().find(|j| j.id == job.id).unwrap();
1783        assert_eq!(stored.last_status.as_deref(), Some("error"));
1784        assert!(stored.last_run.is_some());
1785        assert_eq!(stored.last_output.as_deref(), Some("failed output"));
1786    }
1787
1788    #[test]
1789    fn job_type_from_sql_reads_valid_value() {
1790        let tmp = TempDir::new().unwrap();
1791        let config = test_config(&tmp);
1792        let now = Utc::now();
1793
1794        with_initialized_connection(&config, |conn| {
1795            conn.execute(
1796                "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1797                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1798                params![
1799                    "job-type-valid",
1800                    "*/5 * * * *",
1801                    "echo ok",
1802                    Option::<String>::None,
1803                    "agent",
1804                    now.to_rfc3339(),
1805                    (now + ChronoDuration::minutes(5)).to_rfc3339(),
1806                ],
1807            )?;
1808            Ok(())
1809        })
1810        .unwrap();
1811
1812        let job = get_job(&config, "job-type-valid").unwrap();
1813        assert_eq!(job.job_type, JobType::Agent);
1814    }
1815
1816    #[test]
1817    fn job_type_from_sql_rejects_invalid_value() {
1818        let tmp = TempDir::new().unwrap();
1819        let config = test_config(&tmp);
1820        let now = Utc::now();
1821
1822        with_initialized_connection(&config, |conn| {
1823            conn.execute(
1824                "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1825                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1826                params![
1827                    "job-type-invalid",
1828                    "*/5 * * * *",
1829                    "echo ok",
1830                    Option::<String>::None,
1831                    "unknown",
1832                    now.to_rfc3339(),
1833                    (now + ChronoDuration::minutes(5)).to_rfc3339(),
1834                ],
1835            )?;
1836            Ok(())
1837        })
1838        .unwrap();
1839
1840        assert!(get_job(&config, "job-type-invalid").is_err());
1841    }
1842
1843    #[test]
1844    fn migration_falls_back_to_legacy_expression() {
1845        let tmp = TempDir::new().unwrap();
1846        let config = test_config(&tmp);
1847
1848        with_initialized_connection(&config, |conn| {
1849            conn.execute(
1850                "INSERT INTO cron_jobs (id, expression, command, created_at, next_run)
1851                 VALUES (?1, ?2, ?3, ?4, ?5)",
1852                params![
1853                    "legacy-id",
1854                    "*/5 * * * *",
1855                    "echo legacy",
1856                    Utc::now().to_rfc3339(),
1857                    (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1858                ],
1859            )?;
1860            conn.execute(
1861                "UPDATE cron_jobs SET schedule = NULL WHERE id = 'legacy-id'",
1862                [],
1863            )?;
1864            Ok(())
1865        })
1866        .unwrap();
1867
1868        let job = get_job(&config, "legacy-id").unwrap();
1869        assert!(matches!(job.schedule, Schedule::Cron { .. }));
1870    }
1871
1872    #[test]
1873    fn record_and_prune_runs() {
1874        let tmp = TempDir::new().unwrap();
1875        let mut config = test_config(&tmp);
1876        config.scheduler.max_run_history = 2;
1877        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1878        let base = Utc::now();
1879
1880        for idx in 0..3 {
1881            let start = base + ChronoDuration::seconds(idx);
1882            let end = start + ChronoDuration::milliseconds(100);
1883            record_run(&config, &job.id, start, end, "ok", Some("done"), 100).unwrap();
1884        }
1885
1886        let runs = list_runs(&config, &job.id, 10).unwrap();
1887        assert_eq!(runs.len(), 2);
1888    }
1889
1890    #[test]
1891    fn remove_job_cascades_run_history() {
1892        let tmp = TempDir::new().unwrap();
1893        let config = test_config(&tmp);
1894        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1895        let start = Utc::now();
1896        record_run(
1897            &config,
1898            &job.id,
1899            start,
1900            start + ChronoDuration::milliseconds(5),
1901            "ok",
1902            Some("ok"),
1903            5,
1904        )
1905        .unwrap();
1906
1907        remove_job(&config, &job.id).unwrap();
1908        let runs = list_runs(&config, &job.id, 10).unwrap();
1909        assert!(runs.is_empty());
1910    }
1911
1912    #[test]
1913    fn record_run_truncates_large_output() {
1914        let tmp = TempDir::new().unwrap();
1915        let config = test_config(&tmp);
1916        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo trunc").unwrap();
1917        let output = "x".repeat(MAX_CRON_OUTPUT_BYTES + 512);
1918
1919        record_run(
1920            &config,
1921            &job.id,
1922            Utc::now(),
1923            Utc::now(),
1924            "ok",
1925            Some(&output),
1926            1,
1927        )
1928        .unwrap();
1929
1930        let runs = list_runs(&config, &job.id, 1).unwrap();
1931        let stored = runs[0].output.as_deref().unwrap_or_default();
1932        assert!(stored.ends_with(TRUNCATED_OUTPUT_MARKER));
1933        assert!(stored.len() <= MAX_CRON_OUTPUT_BYTES);
1934    }
1935
1936    #[test]
1937    fn reschedule_after_run_disables_at_schedule_job() {
1938        let tmp = TempDir::new().unwrap();
1939        let config = test_config(&tmp);
1940        let at = Utc::now() + ChronoDuration::minutes(10);
1941        let job = add_shell_job(
1942            &config,
1943            "test-agent",
1944            None,
1945            Schedule::At { at },
1946            "echo once",
1947            None,
1948        )
1949        .unwrap();
1950
1951        reschedule_after_run(&config, &job, true, "done").unwrap();
1952
1953        let stored = get_job(&config, &job.id).unwrap();
1954        assert!(
1955            !stored.enabled,
1956            "At schedule job should be disabled after reschedule"
1957        );
1958        assert_eq!(stored.last_status.as_deref(), Some("ok"));
1959    }
1960
1961    #[test]
1962    fn reschedule_after_run_disables_at_schedule_job_on_failure() {
1963        let tmp = TempDir::new().unwrap();
1964        let config = test_config(&tmp);
1965        let at = Utc::now() + ChronoDuration::minutes(10);
1966        let job = add_shell_job(
1967            &config,
1968            "test-agent",
1969            None,
1970            Schedule::At { at },
1971            "echo once",
1972            None,
1973        )
1974        .unwrap();
1975
1976        reschedule_after_run(&config, &job, false, "failed").unwrap();
1977
1978        let stored = get_job(&config, &job.id).unwrap();
1979        assert!(
1980            !stored.enabled,
1981            "At schedule job should be disabled after reschedule even on failure"
1982        );
1983        assert_eq!(stored.last_status.as_deref(), Some("error"));
1984        assert_eq!(stored.last_output.as_deref(), Some("failed"));
1985    }
1986
1987    #[test]
1988    fn reschedule_after_run_truncates_last_output() {
1989        let tmp = TempDir::new().unwrap();
1990        let config = test_config(&tmp);
1991        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo trunc").unwrap();
1992        let output = "y".repeat(MAX_CRON_OUTPUT_BYTES + 1024);
1993
1994        reschedule_after_run(&config, &job, false, &output).unwrap();
1995
1996        let stored = get_job(&config, &job.id).unwrap();
1997        let last_output = stored.last_output.as_deref().unwrap_or_default();
1998        assert!(last_output.ends_with(TRUNCATED_OUTPUT_MARKER));
1999        assert!(last_output.len() <= MAX_CRON_OUTPUT_BYTES);
2000    }
2001
2002    // ── Declarative cron job sync tests ──────────────────────────
2003
2004    fn make_shell_decl(
2005        id: &str,
2006        expr: &str,
2007        cmd: &str,
2008    ) -> (String, zeroclaw_config::schema::CronJobDecl) {
2009        (
2010            id.to_string(),
2011            zeroclaw_config::schema::CronJobDecl {
2012                name: Some(format!("decl-{id}")),
2013                job_type: "shell".to_string(),
2014                schedule: zeroclaw_config::schema::CronScheduleDecl::Cron {
2015                    expr: expr.to_string(),
2016                    tz: None,
2017                },
2018                command: Some(cmd.to_string()),
2019                prompt: None,
2020                enabled: true,
2021                model: None,
2022                allowed_tools: None,
2023                uses_memory: true,
2024                session_target: None,
2025                delivery: None,
2026            },
2027        )
2028    }
2029
2030    fn make_agent_decl(
2031        id: &str,
2032        expr: &str,
2033        prompt: &str,
2034    ) -> (String, zeroclaw_config::schema::CronJobDecl) {
2035        (
2036            id.to_string(),
2037            zeroclaw_config::schema::CronJobDecl {
2038                name: Some(format!("decl-{id}")),
2039                job_type: "agent".to_string(),
2040                schedule: zeroclaw_config::schema::CronScheduleDecl::Cron {
2041                    expr: expr.to_string(),
2042                    tz: None,
2043                },
2044                command: None,
2045                prompt: Some(prompt.to_string()),
2046                enabled: true,
2047                model: None,
2048                allowed_tools: None,
2049                uses_memory: true,
2050                session_target: None,
2051                delivery: None,
2052            },
2053        )
2054    }
2055
2056    fn decls_map(
2057        items: Vec<(String, zeroclaw_config::schema::CronJobDecl)>,
2058    ) -> std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl> {
2059        items.into_iter().collect()
2060    }
2061
2062    /// Seed an enabled agent that claims `ids` via its `cron_jobs` list so
2063    /// `sync_declarative_jobs` can resolve an owning agent for each entry.
2064    fn seed_claiming_agent(config: &mut Config, ids: &[&str]) {
2065        config.agents.insert(
2066            "test-agent".to_string(),
2067            zeroclaw_config::schema::AliasedAgentConfig {
2068                enabled: true,
2069                cron_jobs: ids.iter().map(|s| (*s).to_string()).collect(),
2070                ..Default::default()
2071            },
2072        );
2073    }
2074
2075    #[test]
2076    fn sync_inserts_new_declarative_job() {
2077        let tmp = TempDir::new().unwrap();
2078        let mut config = test_config(&tmp);
2079        seed_claiming_agent(&mut config, &["daily-backup"]);
2080
2081        let decls = decls_map(vec![make_shell_decl(
2082            "daily-backup",
2083            "0 2 * * *",
2084            "echo backup",
2085        )]);
2086        sync_declarative_jobs(&config, &decls).unwrap();
2087
2088        let job = get_job(&config, "daily-backup").unwrap();
2089        assert_eq!(job.command, "echo backup");
2090        assert_eq!(job.source, "declarative");
2091        assert_eq!(job.name.as_deref(), Some("decl-daily-backup"));
2092    }
2093
2094    #[test]
2095    fn sync_updates_existing_declarative_job() {
2096        let tmp = TempDir::new().unwrap();
2097        let mut config = test_config(&tmp);
2098        seed_claiming_agent(&mut config, &["updatable"]);
2099
2100        let decls = decls_map(vec![make_shell_decl("updatable", "0 2 * * *", "echo v1")]);
2101        sync_declarative_jobs(&config, &decls).unwrap();
2102
2103        let job_v1 = get_job(&config, "updatable").unwrap();
2104        assert_eq!(job_v1.command, "echo v1");
2105
2106        let decls_v2 = decls_map(vec![make_shell_decl("updatable", "0 3 * * *", "echo v2")]);
2107        sync_declarative_jobs(&config, &decls_v2).unwrap();
2108
2109        let job_v2 = get_job(&config, "updatable").unwrap();
2110        assert_eq!(job_v2.command, "echo v2");
2111        assert_eq!(job_v2.expression, "0 3 * * *");
2112        assert_eq!(job_v2.source, "declarative");
2113    }
2114
2115    #[test]
2116    fn sync_does_not_delete_imperative_jobs() {
2117        let tmp = TempDir::new().unwrap();
2118        let mut config = test_config(&tmp);
2119        seed_claiming_agent(&mut config, &["my-decl"]);
2120
2121        // Create an imperative job via the normal API.
2122        let imperative = add_job(&config, "test-agent", "*/10 * * * *", "echo imperative").unwrap();
2123
2124        // Sync declarative jobs (none of which match the imperative job).
2125        let decls = decls_map(vec![make_shell_decl("my-decl", "0 2 * * *", "echo decl")]);
2126        sync_declarative_jobs(&config, &decls).unwrap();
2127
2128        // Imperative job should still exist.
2129        let still_there = get_job(&config, &imperative.id).unwrap();
2130        assert_eq!(still_there.command, "echo imperative");
2131        assert_eq!(still_there.source, "imperative");
2132
2133        // Declarative job should also exist.
2134        let decl_job = get_job(&config, "my-decl").unwrap();
2135        assert_eq!(decl_job.command, "echo decl");
2136    }
2137
2138    #[test]
2139    fn sync_removes_stale_declarative_jobs() {
2140        let tmp = TempDir::new().unwrap();
2141        let mut config = test_config(&tmp);
2142        seed_claiming_agent(&mut config, &["keeper", "stale"]);
2143
2144        // Insert two declarative jobs.
2145        let decls = decls_map(vec![
2146            make_shell_decl("keeper", "0 2 * * *", "echo keep"),
2147            make_shell_decl("stale", "0 3 * * *", "echo stale"),
2148        ]);
2149        sync_declarative_jobs(&config, &decls).unwrap();
2150
2151        // Now sync with only "keeper"; "stale" should be removed.
2152        let decls_v2 = decls_map(vec![make_shell_decl("keeper", "0 2 * * *", "echo keep")]);
2153        sync_declarative_jobs(&config, &decls_v2).unwrap();
2154
2155        assert!(get_job(&config, "stale").is_err());
2156        assert!(get_job(&config, "keeper").is_ok());
2157    }
2158
2159    #[test]
2160    fn sync_empty_removes_all_declarative_jobs() {
2161        let tmp = TempDir::new().unwrap();
2162        let mut config = test_config(&tmp);
2163        seed_claiming_agent(&mut config, &["to-remove"]);
2164
2165        let decls = decls_map(vec![make_shell_decl("to-remove", "0 2 * * *", "echo bye")]);
2166        sync_declarative_jobs(&config, &decls).unwrap();
2167        assert!(get_job(&config, "to-remove").is_ok());
2168
2169        // Sync with empty map.
2170        sync_declarative_jobs(&config, &std::collections::HashMap::new()).unwrap();
2171        assert!(get_job(&config, "to-remove").is_err());
2172    }
2173
2174    #[test]
2175    fn sync_validates_shell_job_requires_command() {
2176        let tmp = TempDir::new().unwrap();
2177        let config = test_config(&tmp);
2178
2179        let (id, mut decl) = make_shell_decl("bad", "0 2 * * *", "echo ok");
2180        decl.command = None;
2181
2182        let decls = decls_map(vec![(id, decl)]);
2183        let result = sync_declarative_jobs(&config, &decls);
2184        assert!(result.is_err());
2185        assert!(result.unwrap_err().to_string().contains("command"));
2186    }
2187
2188    #[test]
2189    fn sync_validates_agent_job_requires_prompt() {
2190        let tmp = TempDir::new().unwrap();
2191        let config = test_config(&tmp);
2192
2193        let (id, mut decl) = make_agent_decl("bad-agent", "0 2 * * *", "do stuff");
2194        decl.prompt = None;
2195
2196        let decls = decls_map(vec![(id, decl)]);
2197        let result = sync_declarative_jobs(&config, &decls);
2198        assert!(result.is_err());
2199        assert!(result.unwrap_err().to_string().contains("prompt"));
2200    }
2201
2202    #[test]
2203    fn sync_agent_job_inserts_correctly() {
2204        let tmp = TempDir::new().unwrap();
2205        let mut config = test_config(&tmp);
2206        seed_claiming_agent(&mut config, &["agent-check"]);
2207
2208        let decls = decls_map(vec![make_agent_decl(
2209            "agent-check",
2210            "*/15 * * * *",
2211            "check health",
2212        )]);
2213        sync_declarative_jobs(&config, &decls).unwrap();
2214
2215        let job = get_job(&config, "agent-check").unwrap();
2216        assert_eq!(job.job_type, JobType::Agent);
2217        assert_eq!(job.prompt.as_deref(), Some("check health"));
2218        assert_eq!(job.source, "declarative");
2219    }
2220
2221    #[test]
2222    fn sync_every_schedule_works() {
2223        let tmp = TempDir::new().unwrap();
2224        let mut config = test_config(&tmp);
2225        seed_claiming_agent(&mut config, &["interval-job"]);
2226
2227        let decl = zeroclaw_config::schema::CronJobDecl {
2228            name: None,
2229            job_type: "shell".to_string(),
2230            schedule: zeroclaw_config::schema::CronScheduleDecl::Every { every_ms: 60000 },
2231            command: Some("echo interval".to_string()),
2232            prompt: None,
2233            enabled: true,
2234            model: None,
2235            allowed_tools: None,
2236            uses_memory: true,
2237            session_target: None,
2238            delivery: None,
2239        };
2240
2241        let mut decls = std::collections::HashMap::new();
2242        decls.insert("interval-job".to_string(), decl);
2243        sync_declarative_jobs(&config, &decls).unwrap();
2244
2245        let job = get_job(&config, "interval-job").unwrap();
2246        assert!(matches!(job.schedule, Schedule::Every { every_ms: 60000 }));
2247        assert_eq!(job.command, "echo interval");
2248    }
2249
2250    #[test]
2251    fn declarative_config_parses_from_toml() {
2252        // Alias-keyed cron map: `[cron.<alias>]` syntax.
2253        let toml_str = r#"
2254[cron.daily-report]
2255name = "Daily Report"
2256job_type = "shell"
2257command = "echo report"
2258schedule = { kind = "cron", expr = "0 9 * * *" }
2259
2260[cron.health-check]
2261job_type = "agent"
2262prompt = "Check server health"
2263schedule = { kind = "every", every_ms = 300000 }
2264        "#;
2265
2266        #[derive(serde::Deserialize)]
2267        struct Wrap {
2268            cron: std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl>,
2269        }
2270        let parsed: Wrap = toml::from_str(toml_str).unwrap();
2271        assert_eq!(parsed.cron.len(), 2);
2272
2273        let report = parsed.cron.get("daily-report").unwrap();
2274        assert_eq!(report.command.as_deref(), Some("echo report"));
2275        assert!(matches!(
2276            report.schedule,
2277            zeroclaw_config::schema::CronScheduleDecl::Cron { ref expr, .. } if expr == "0 9 * * *"
2278        ));
2279
2280        let health = parsed.cron.get("health-check").unwrap();
2281        assert_eq!(health.job_type, "agent");
2282        assert_eq!(health.prompt.as_deref(), Some("Check server health"));
2283        assert!(matches!(
2284            health.schedule,
2285            zeroclaw_config::schema::CronScheduleDecl::Every { every_ms: 300_000 }
2286        ));
2287    }
2288
2289    #[test]
2290    fn skip_missed_run_advances_recurring_job_next_run() {
2291        let tmp = TempDir::new().unwrap();
2292        let config = test_config(&tmp);
2293
2294        // Add a cron job that will be "overdue" — its next_run is set based
2295        // on the schedule from the current time, so we need to make it past.
2296        let job = add_job(&config, "test-agent", "* * * * *", "echo test").unwrap();
2297
2298        // Force next_run into the past so the job appears overdue.
2299        let past = Utc::now() - ChronoDuration::hours(1);
2300        with_initialized_connection(&config, |conn| {
2301            conn.execute(
2302                "UPDATE cron_jobs SET next_run = ?1 WHERE id = ?2",
2303                params![past.to_rfc3339(), job.id],
2304            )
2305            .unwrap();
2306            Ok(())
2307        })
2308        .unwrap();
2309
2310        // Verify it is overdue now.
2311        assert!(
2312            !all_overdue_jobs(&config, Utc::now()).unwrap().is_empty(),
2313            "job with past next_run must appear in overdue"
2314        );
2315
2316        // Skip the missed run.
2317        let reloaded = get_job(&config, &job.id).unwrap();
2318        skip_missed_run(&config, &reloaded, Utc::now()).unwrap();
2319
2320        // The job's next_run should now be in the future.
2321        let updated = get_job(&config, &job.id).unwrap();
2322        assert!(
2323            updated.next_run > Utc::now(),
2324            "skip_missed_run must advance next_run to the future"
2325        );
2326        assert!(updated.enabled, "recurring job must stay enabled");
2327    }
2328
2329    #[test]
2330    fn skip_missed_run_disables_overdue_oneshot_job() {
2331        let tmp = TempDir::new().unwrap();
2332        let config = test_config(&tmp);
2333
2334        let run_at = Utc::now() - ChronoDuration::hours(2);
2335        let schedule = Schedule::At { at: run_at };
2336        let job = add_job_with_schedule(&config, "test-agent", &schedule, "echo once").unwrap();
2337
2338        // The add_job_with_schedule should have set next_run = run_at,
2339        // so the job is overdue now.
2340        assert!(
2341            !all_overdue_jobs(&config, Utc::now()).unwrap().is_empty(),
2342            "one-shot job with past at-time must be overdue"
2343        );
2344
2345        let reloaded = get_job(&config, &job.id).unwrap();
2346        skip_missed_run(&config, &reloaded, Utc::now()).unwrap();
2347
2348        let updated = get_job(&config, &job.id).unwrap();
2349        assert!(
2350            !updated.enabled,
2351            "overdue one-shot job must be disabled after skip"
2352        );
2353        assert_eq!(
2354            updated.last_status.as_deref(),
2355            Some("skipped"),
2356            "one-shot job last_status must be 'skipped'"
2357        );
2358    }
2359
2360    fn add_job_with_schedule(
2361        config: &Config,
2362        agent_alias: &str,
2363        schedule: &Schedule,
2364        command: &str,
2365    ) -> Result<CronJob> {
2366        let now = Utc::now();
2367        let job = CronJob {
2368            id: format!("test-job-{}", Uuid::new_v4()),
2369            expression: String::new(),
2370            schedule: schedule.clone(),
2371            command: command.to_string(),
2372            prompt: None,
2373            name: None,
2374            job_type: JobType::Shell,
2375            session_target: SessionTarget::Isolated,
2376            model: None,
2377            agent_alias: agent_alias.to_string(),
2378            enabled: true,
2379            delivery: DeliveryConfig::default(),
2380            delete_after_run: false,
2381            allowed_tools: None,
2382            uses_memory: false,
2383            source: "imperative".to_string(),
2384            created_at: now,
2385            next_run: next_run_for_schedule(schedule, now).unwrap_or(now),
2386            last_run: None,
2387            last_status: None,
2388            last_output: None,
2389        };
2390        let job_type_str: String = match &job.job_type {
2391            JobType::Shell => "shell".to_string(),
2392            JobType::Agent => "agent".to_string(),
2393        };
2394        let schedule_json = serde_json::to_string(&job.schedule).unwrap();
2395        let delivery_json = serde_json::to_string(&job.delivery).unwrap();
2396        let allowed_tools_json =
2397            crate::cron::store::encode_allowed_tools(job.allowed_tools.as_ref()).unwrap();
2398        with_initialized_connection(config, |conn| {
2399            conn.execute(
2400                "INSERT INTO cron_jobs
2401                 (id, expression, command, schedule, job_type, prompt, name,
2402                  session_target, model, enabled, delivery, delete_after_run,
2403                  allowed_tools, next_run, last_run, last_status, last_output,
2404                  uses_memory, source, created_at, agent_alias)
2405                 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21)",
2406                params![
2407                    job.id,
2408                    job.expression,
2409                    job.command,
2410                    schedule_json,
2411                    job_type_str.to_string(),
2412                    job.prompt,
2413                    job.name,
2414                    job.session_target.as_str(),
2415                    job.model,
2416                    if job.enabled { 1 } else { 0 },
2417                    delivery_json,
2418                    if job.delete_after_run { 1 } else { 0 },
2419                    allowed_tools_json,
2420                    job.next_run.to_rfc3339(),
2421                    job.last_run.map(|t| t.to_rfc3339()),
2422                    job.last_status,
2423                    job.last_output,
2424                    if job.uses_memory { 1 } else { 0 },
2425                    job.source,
2426                    job.created_at.to_rfc3339(),
2427                    job.agent_alias,
2428                ],
2429            )
2430            .context("Failed to insert test cron job")?;
2431            Ok(())
2432        })?;
2433        Ok(job)
2434    }
2435}