Skip to main content

zeroclaw_runtime/cron/
store.rs

1use crate::cron::{
2    CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
3    next_run_for_schedule, schedule_cron_expression, validate_delivery_config, validate_schedule,
4};
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use rusqlite::types::{FromSqlResult, ValueRef};
8use rusqlite::{Connection, OpenFlags, params};
9use uuid::Uuid;
10use zeroclaw_config::schema::Config;
11
12const MAX_CRON_OUTPUT_BYTES: usize = 16 * 1024;
13const TRUNCATED_OUTPUT_MARKER: &str = "\n...[truncated]";
14
15#[derive(Clone, Copy, Debug, Eq, PartialEq)]
16pub(crate) enum RunCompletionAction {
17    Reschedule,
18    Disable,
19    Delete,
20}
21
22#[cfg(test)]
23static WRITE_CONNECTION_COUNTS_FOR_TESTS: std::sync::LazyLock<
24    std::sync::Mutex<std::collections::HashMap<std::path::PathBuf, usize>>,
25> = std::sync::LazyLock::new(|| std::sync::Mutex::new(std::collections::HashMap::new()));
26
27#[cfg(test)]
28pub(crate) fn reset_write_connection_count_for_tests(config: &Config) {
29    let mut counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
30        .lock()
31        .unwrap_or_else(|e| e.into_inner());
32    counts.insert(cron_db_path(config), 0);
33}
34
35#[cfg(test)]
36pub(crate) fn write_connection_count_for_tests(config: &Config) -> usize {
37    let counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
38        .lock()
39        .unwrap_or_else(|e| e.into_inner());
40    counts.get(&cron_db_path(config)).copied().unwrap_or(0)
41}
42
43impl rusqlite::types::FromSql for JobType {
44    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
45        let text = value.as_str()?;
46        JobType::try_from(text).map_err(|e| rusqlite::types::FromSqlError::Other(e.into()))
47    }
48}
49
50#[cfg(test)]
51pub fn add_job(
52    config: &Config,
53    agent_alias: &str,
54    expression: &str,
55    command: &str,
56) -> Result<CronJob> {
57    let schedule = Schedule::Cron {
58        expr: expression.to_string(),
59        tz: None,
60    };
61    add_shell_job(config, agent_alias, None, schedule, command, None)
62}
63
64pub fn add_shell_job(
65    config: &Config,
66    agent_alias: &str,
67    name: Option<String>,
68    schedule: Schedule,
69    command: &str,
70    delivery: Option<DeliveryConfig>,
71) -> Result<CronJob> {
72    let now = Utc::now();
73    validate_schedule(&schedule, now)?;
74    validate_delivery_config(delivery.as_ref())?;
75    let next_run = next_run_for_schedule(&schedule, now)?;
76    let id = Uuid::new_v4().to_string();
77    let expression = schedule_cron_expression(&schedule).unwrap_or_default();
78    let schedule_json = serde_json::to_string(&schedule)?;
79    let delivery = delivery.unwrap_or_default();
80
81    let delete_after_run = matches!(schedule, Schedule::At { .. });
82    let agent_alias = agent_alias.trim();
83    if agent_alias.is_empty() {
84        anyhow::bail!("agent_alias is required; cron jobs must name an owning agent");
85    }
86
87    with_initialized_connection(config, |conn| {
88        conn.execute(
89            "INSERT INTO cron_jobs (
90                id, expression, command, schedule, job_type, prompt, name, session_target, model,
91                enabled, delivery, delete_after_run, agent_alias, created_at, next_run
92             ) VALUES (?1, ?2, ?3, ?4, 'shell', NULL, ?5, 'isolated', NULL, 1, ?6, ?7, ?8, ?9, ?10)",
93            params![
94                id,
95                expression,
96                command,
97                schedule_json,
98                name,
99                serde_json::to_string(&delivery)?,
100                if delete_after_run { 1 } else { 0 },
101                agent_alias,
102                now.to_rfc3339(),
103                next_run.to_rfc3339(),
104            ],
105        )
106        .context("Failed to insert cron shell job")?;
107        Ok(())
108    })?;
109
110    get_job(config, &id)
111}
112
113#[allow(clippy::too_many_arguments)]
114pub fn add_agent_job(
115    config: &Config,
116    agent_alias: &str,
117    name: Option<String>,
118    schedule: Schedule,
119    prompt: &str,
120    session_target: SessionTarget,
121    model: Option<String>,
122    delivery: Option<DeliveryConfig>,
123    delete_after_run: bool,
124    allowed_tools: Option<Vec<String>>,
125) -> Result<CronJob> {
126    let now = Utc::now();
127    validate_schedule(&schedule, now)?;
128    validate_delivery_config(delivery.as_ref())?;
129    let next_run = next_run_for_schedule(&schedule, now)?;
130    let id = Uuid::new_v4().to_string();
131    let expression = schedule_cron_expression(&schedule).unwrap_or_default();
132    let schedule_json = serde_json::to_string(&schedule)?;
133    let delivery = delivery.unwrap_or_default();
134    let agent_alias = agent_alias.trim();
135    if agent_alias.is_empty() {
136        anyhow::bail!("agent_alias is required; cron jobs must name an owning agent");
137    }
138
139    with_initialized_connection(config, |conn| {
140        conn.execute(
141            "INSERT INTO cron_jobs (
142                id, expression, command, schedule, job_type, prompt, name, session_target, model,
143                enabled, delivery, delete_after_run, allowed_tools, agent_alias, created_at, next_run
144             ) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11, ?12, ?13)",
145            params![
146                id,
147                expression,
148                schedule_json,
149                prompt,
150                name,
151                session_target.as_str(),
152                model,
153                serde_json::to_string(&delivery)?,
154                if delete_after_run { 1 } else { 0 },
155                encode_allowed_tools(allowed_tools.as_ref())?,
156                agent_alias,
157                now.to_rfc3339(),
158                next_run.to_rfc3339(),
159            ],
160        )
161        .context("Failed to insert cron agent job")?;
162        Ok(())
163    })?;
164
165    get_job(config, &id)
166}
167
168pub fn list_jobs(config: &Config) -> Result<Vec<CronJob>> {
169    let Some(jobs) = with_read_connection(config, |conn| {
170        let mut stmt = conn.prepare(
171            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
172                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
173                    allowed_tools, source, uses_memory, agent_alias
174             FROM cron_jobs ORDER BY next_run ASC",
175        )?;
176
177        let rows = stmt.query_map([], map_cron_job_row)?;
178
179        let mut jobs = Vec::new();
180        for row in rows {
181            jobs.push(row?);
182        }
183        Ok(jobs)
184    })?
185    else {
186        return Ok(Vec::new());
187    };
188
189    Ok(jobs)
190}
191
192pub fn get_job(config: &Config, job_id: &str) -> Result<CronJob> {
193    let Some(job) = with_read_connection(config, |conn| {
194        let mut stmt = conn.prepare(
195            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
196                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
197                    allowed_tools, source, uses_memory, agent_alias
198             FROM cron_jobs WHERE id = ?1",
199        )?;
200
201        let mut rows = stmt.query(params![job_id])?;
202        if let Some(row) = rows.next()? {
203            map_cron_job_row(row).map_err(Into::into)
204        } else {
205            anyhow::bail!("Cron job '{job_id}' not found")
206        }
207    })?
208    else {
209        anyhow::bail!("Cron job '{job_id}' not found")
210    };
211
212    Ok(job)
213}
214
215pub fn remove_job(config: &Config, id: &str) -> Result<()> {
216    let changed = with_initialized_connection(config, |conn| {
217        conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id])
218            .context("Failed to delete cron job")
219    })?;
220
221    if changed == 0 {
222        anyhow::bail!("Cron job '{id}' not found");
223    }
224
225    println!("✅ Removed cron job {id}");
226    Ok(())
227}
228
229pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
230    let lim = i64::try_from(config.scheduler.max_tasks.max(1))
231        .context("Scheduler max_tasks overflows i64")?;
232    let Some(jobs) = with_read_connection(config, |conn| {
233        let mut stmt = conn.prepare(
234            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
235                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
236                    allowed_tools, source, uses_memory, agent_alias
237             FROM cron_jobs
238             WHERE enabled = 1 AND next_run <= ?1
239             ORDER BY next_run ASC
240             LIMIT ?2",
241        )?;
242
243        let rows = stmt.query_map(params![now.to_rfc3339(), lim], map_cron_job_row)?;
244
245        let mut jobs = Vec::new();
246        for row in rows {
247            match row {
248                Ok(job) => jobs.push(job),
249                Err(e) => ::zeroclaw_log::record!(
250                    WARN,
251                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
252                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
253                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
254                    "Skipping cron job with unparseable row data"
255                ),
256            }
257        }
258        Ok(jobs)
259    })?
260    else {
261        return Ok(Vec::new());
262    };
263
264    Ok(jobs)
265}
266
267/// Return **all** enabled overdue jobs without the `max_tasks` limit.
268///
269/// Used by the scheduler startup catch-up to ensure every missed job is
270/// executed at least once after a period of downtime (late boot, daemon
271/// restart, etc.).
272pub fn all_overdue_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
273    let Some(jobs) = with_read_connection(config, |conn| {
274        let mut stmt = conn.prepare(
275            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
276                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
277                    allowed_tools, source, uses_memory, agent_alias
278             FROM cron_jobs
279             WHERE enabled = 1 AND next_run <= ?1
280             ORDER BY next_run ASC",
281        )?;
282
283        let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?;
284
285        let mut jobs = Vec::new();
286        for row in rows {
287            match row {
288                Ok(job) => jobs.push(job),
289                Err(e) => ::zeroclaw_log::record!(
290                    WARN,
291                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
292                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
293                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
294                    "Skipping cron job with unparseable row data"
295                ),
296            }
297        }
298        Ok(jobs)
299    })?
300    else {
301        return Ok(Vec::new());
302    };
303
304    Ok(jobs)
305}
306
307pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<CronJob> {
308    let mut job = get_job(config, job_id)?;
309    let mut schedule_changed = false;
310
311    if let Some(schedule) = patch.schedule {
312        validate_schedule(&schedule, Utc::now())?;
313        job.schedule = schedule;
314        job.expression = schedule_cron_expression(&job.schedule).unwrap_or_default();
315        schedule_changed = true;
316    }
317    if let Some(command) = patch.command {
318        job.command = command;
319    }
320    if let Some(prompt) = patch.prompt {
321        job.prompt = Some(prompt);
322    }
323    if let Some(name) = patch.name {
324        job.name = Some(name);
325    }
326    if let Some(enabled) = patch.enabled {
327        job.enabled = enabled;
328    }
329    if let Some(delivery) = patch.delivery {
330        job.delivery = delivery;
331    }
332    if let Some(model) = patch.model {
333        job.model = Some(model);
334    }
335    if let Some(target) = patch.session_target {
336        job.session_target = target;
337    }
338    if let Some(delete_after_run) = patch.delete_after_run {
339        job.delete_after_run = delete_after_run;
340    }
341    if let Some(allowed_tools) = patch.allowed_tools {
342        // Empty list means "clear the allowlist" (all tools available),
343        // not "allow zero tools".
344        if allowed_tools.is_empty() {
345            job.allowed_tools = None;
346        } else {
347            job.allowed_tools = Some(allowed_tools);
348        }
349    }
350    if let Some(uses_memory) = patch.uses_memory {
351        job.uses_memory = uses_memory;
352    }
353
354    if schedule_changed {
355        job.next_run = next_run_for_schedule(&job.schedule, Utc::now())?;
356    }
357
358    with_initialized_connection(config, |conn| {
359        conn.execute(
360            "UPDATE cron_jobs
361             SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4, prompt = ?5, name = ?6,
362                 session_target = ?7, model = ?8, enabled = ?9, delivery = ?10, delete_after_run = ?11,
363                 allowed_tools = ?12, next_run = ?13, uses_memory = ?14
364             WHERE id = ?15",
365            params![
366                job.expression,
367                job.command,
368                serde_json::to_string(&job.schedule)?,
369                <JobType as Into<&str>>::into(job.job_type).to_string(),
370                job.prompt,
371                job.name,
372                job.session_target.as_str(),
373                job.model,
374                if job.enabled { 1 } else { 0 },
375                serde_json::to_string(&job.delivery)?,
376                if job.delete_after_run { 1 } else { 0 },
377                encode_allowed_tools(job.allowed_tools.as_ref())?,
378                job.next_run.to_rfc3339(),
379                if job.uses_memory { 1 } else { 0 },
380                job.id,
381            ],
382        )
383        .context("Failed to update cron job")?;
384        Ok(())
385    })?;
386
387    get_job(config, job_id)
388}
389
390pub fn record_last_run(
391    config: &Config,
392    job_id: &str,
393    finished_at: DateTime<Utc>,
394    success: bool,
395    output: &str,
396) -> Result<()> {
397    let status = if success { "ok" } else { "error" };
398    record_last_run_with_status(config, job_id, finished_at, status, output)
399}
400
401pub fn record_last_run_with_status(
402    config: &Config,
403    job_id: &str,
404    finished_at: DateTime<Utc>,
405    status: &str,
406    output: &str,
407) -> Result<()> {
408    let bounded_output = truncate_cron_output(output);
409    with_initialized_connection(config, |conn| {
410        conn.execute(
411            "UPDATE cron_jobs
412             SET last_run = ?1, last_status = ?2, last_output = ?3
413             WHERE id = ?4",
414            params![finished_at.to_rfc3339(), status, bounded_output, job_id],
415        )
416        .context("Failed to update cron last run fields")?;
417        Ok(())
418    })
419}
420
421pub fn reschedule_after_run(
422    config: &Config,
423    job: &CronJob,
424    success: bool,
425    output: &str,
426) -> Result<()> {
427    let status = if success { "ok" } else { "error" };
428    reschedule_after_run_with_status(config, job, status, output)
429}
430
431pub fn reschedule_after_run_with_status(
432    config: &Config,
433    job: &CronJob,
434    status: &str,
435    output: &str,
436) -> Result<()> {
437    let now = Utc::now();
438    let bounded_output = truncate_cron_output(output);
439
440    // One-shot `At` schedules have no future occurrence — record the run
441    // result and disable the job so it won't be picked up again.
442    if matches!(job.schedule, Schedule::At { .. }) {
443        with_initialized_connection(config, |conn| {
444            conn.execute(
445                "UPDATE cron_jobs
446                 SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
447                 WHERE id = ?4",
448                params![now.to_rfc3339(), status, bounded_output, job.id],
449            )
450            .context("Failed to disable completed one-shot cron job")?;
451            Ok(())
452        })
453    } else {
454        let next_run = next_run_for_schedule(&job.schedule, now)?;
455        with_initialized_connection(config, |conn| {
456            conn.execute(
457                "UPDATE cron_jobs
458                 SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
459                 WHERE id = ?5",
460                params![
461                    next_run.to_rfc3339(),
462                    now.to_rfc3339(),
463                    status,
464                    bounded_output,
465                    job.id
466                ],
467            )
468            .context("Failed to update cron job run state")?;
469            Ok(())
470        })
471    }
472}
473
474pub fn record_run(
475    config: &Config,
476    job_id: &str,
477    started_at: DateTime<Utc>,
478    finished_at: DateTime<Utc>,
479    status: &str,
480    output: Option<&str>,
481    duration_ms: i64,
482) -> Result<()> {
483    let bounded_output = output.map(truncate_cron_output);
484    with_initialized_connection(config, |conn| {
485        // Wrap INSERT + pruning DELETE in an explicit transaction so that
486        // if the DELETE fails, the INSERT is rolled back and the run table
487        // cannot grow unboundedly.
488        let tx = conn.unchecked_transaction()?;
489
490        insert_run_and_prune(
491            &tx,
492            config,
493            job_id,
494            started_at,
495            finished_at,
496            status,
497            bounded_output.as_deref(),
498            duration_ms,
499        )?;
500
501        tx.commit()
502            .context("Failed to commit cron run transaction")?;
503        Ok(())
504    })
505}
506
507#[allow(clippy::too_many_arguments)]
508pub(crate) fn persist_run_result(
509    config: &Config,
510    job: &CronJob,
511    started_at: DateTime<Utc>,
512    finished_at: DateTime<Utc>,
513    job_state_at: DateTime<Utc>,
514    status: &str,
515    output: Option<&str>,
516    duration_ms: i64,
517    action: RunCompletionAction,
518) -> Result<()> {
519    let bounded_output = output.map(truncate_cron_output);
520
521    with_initialized_connection(config, |conn| {
522        let tx = conn.unchecked_transaction()?;
523
524        insert_run_and_prune(
525            &tx,
526            config,
527            &job.id,
528            started_at,
529            finished_at,
530            status,
531            bounded_output.as_deref(),
532            duration_ms,
533        )?;
534
535        apply_run_completion_state(
536            &tx,
537            job,
538            job_state_at,
539            status,
540            bounded_output.as_deref(),
541            action,
542        )?;
543
544        tx.commit()
545            .context("Failed to commit cron run result transaction")?;
546        Ok(())
547    })
548}
549
550/// Persist only the job-state side of a completed cron run.
551///
552/// This is intentionally separate from `persist_run_result` so the scheduler
553/// can recover job state even when run-history persistence fails. The SQL
554/// mutation itself stays in the store layer.
555pub(crate) fn persist_run_completion_state(
556    config: &Config,
557    job: &CronJob,
558    job_state_at: DateTime<Utc>,
559    status: &str,
560    output: Option<&str>,
561    action: RunCompletionAction,
562) -> Result<()> {
563    with_initialized_connection(config, |conn| {
564        apply_run_completion_state(conn, job, job_state_at, status, output, action)
565    })
566}
567
568#[allow(clippy::too_many_arguments)]
569fn insert_run_and_prune(
570    conn: &Connection,
571    config: &Config,
572    job_id: &str,
573    started_at: DateTime<Utc>,
574    finished_at: DateTime<Utc>,
575    status: &str,
576    output: Option<&str>,
577    duration_ms: i64,
578) -> Result<()> {
579    conn.execute(
580        "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
581         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
582        params![
583            job_id,
584            started_at.to_rfc3339(),
585            finished_at.to_rfc3339(),
586            status,
587            output,
588            duration_ms,
589        ],
590    )
591    .context("Failed to insert cron run")?;
592
593    let keep = i64::from(config.scheduler.max_run_history.max(1));
594    conn.execute(
595        "DELETE FROM cron_runs
596         WHERE job_id = ?1
597           AND id NOT IN (
598             SELECT id FROM cron_runs
599             WHERE job_id = ?1
600             ORDER BY started_at DESC, id DESC
601             LIMIT ?2
602           )",
603        params![job_id, keep],
604    )
605    .context("Failed to prune cron run history")?;
606
607    Ok(())
608}
609
610fn truncate_cron_output(output: &str) -> String {
611    if output.len() <= MAX_CRON_OUTPUT_BYTES {
612        return output.to_string();
613    }
614
615    if MAX_CRON_OUTPUT_BYTES <= TRUNCATED_OUTPUT_MARKER.len() {
616        return TRUNCATED_OUTPUT_MARKER.to_string();
617    }
618
619    let mut cutoff = MAX_CRON_OUTPUT_BYTES - TRUNCATED_OUTPUT_MARKER.len();
620    while cutoff > 0 && !output.is_char_boundary(cutoff) {
621        cutoff -= 1;
622    }
623
624    let mut truncated = output[..cutoff].to_string();
625    truncated.push_str(TRUNCATED_OUTPUT_MARKER);
626    truncated
627}
628
629pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result<Vec<CronRun>> {
630    let Some(runs) = with_read_connection(config, |conn| {
631        let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
632        let mut stmt = conn.prepare(
633            "SELECT id, job_id, started_at, finished_at, status, output, duration_ms
634             FROM cron_runs
635             WHERE job_id = ?1
636             ORDER BY started_at DESC, id DESC
637             LIMIT ?2",
638        )?;
639
640        let rows = stmt.query_map(params![job_id, lim], |row| {
641            Ok(CronRun {
642                id: row.get(0)?,
643                job_id: row.get(1)?,
644                started_at: parse_rfc3339(&row.get::<_, String>(2)?)
645                    .map_err(sql_conversion_error)?,
646                finished_at: parse_rfc3339(&row.get::<_, String>(3)?)
647                    .map_err(sql_conversion_error)?,
648                status: row.get(4)?,
649                output: row.get(5)?,
650                duration_ms: row.get(6)?,
651            })
652        })?;
653
654        let mut runs = Vec::new();
655        for row in rows {
656            runs.push(row?);
657        }
658        Ok(runs)
659    })?
660    else {
661        return Ok(Vec::new());
662    };
663
664    Ok(runs)
665}
666
667fn parse_rfc3339(raw: &str) -> Result<DateTime<Utc>> {
668    let parsed = DateTime::parse_from_rfc3339(raw)
669        .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?;
670    Ok(parsed.with_timezone(&Utc))
671}
672
673fn sql_conversion_error(err: anyhow::Error) -> rusqlite::Error {
674    rusqlite::Error::ToSqlConversionFailure(err.into())
675}
676
677fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
678    let expression: String = row.get(1)?;
679    let schedule_raw: Option<String> = row.get(3)?;
680    let schedule =
681        decode_schedule(schedule_raw.as_deref(), &expression).map_err(sql_conversion_error)?;
682
683    let delivery_raw: Option<String> = row.get(10)?;
684    let delivery = decode_delivery(delivery_raw.as_deref()).map_err(sql_conversion_error)?;
685
686    let next_run_raw: String = row.get(13)?;
687    let last_run_raw: Option<String> = row.get(14)?;
688    let created_at_raw: String = row.get(12)?;
689    let allowed_tools_raw: Option<String> = row.get(17)?;
690    let source: Option<String> = row.get(18)?;
691    let uses_memory: Option<i64> = row.get(19)?;
692    let agent_alias: Option<String> = row.get(20)?;
693
694    Ok(CronJob {
695        id: row.get(0)?,
696        expression,
697        schedule,
698        command: row.get(2)?,
699        job_type: row.get(4)?,
700        prompt: row.get(5)?,
701        name: row.get(6)?,
702        session_target: SessionTarget::parse(&row.get::<_, String>(7)?),
703        model: row.get(8)?,
704        agent_alias: agent_alias
705            .map(|s| s.trim().to_string())
706            .unwrap_or_default(),
707        enabled: row.get::<_, i64>(9)? != 0,
708        delivery,
709        delete_after_run: row.get::<_, i64>(11)? != 0,
710        source: source.unwrap_or_else(|| "imperative".to_string()),
711        uses_memory: uses_memory != Some(0),
712        created_at: parse_rfc3339(&created_at_raw).map_err(sql_conversion_error)?,
713        next_run: parse_rfc3339(&next_run_raw).map_err(sql_conversion_error)?,
714        last_run: match last_run_raw {
715            Some(raw) => Some(parse_rfc3339(&raw).map_err(sql_conversion_error)?),
716            None => None,
717        },
718        last_status: row.get(15)?,
719        last_output: row.get(16)?,
720        allowed_tools: decode_allowed_tools(allowed_tools_raw.as_deref())
721            .map_err(sql_conversion_error)?,
722    })
723}
724
725fn decode_schedule(schedule_raw: Option<&str>, expression: &str) -> Result<Schedule> {
726    if let Some(raw) = schedule_raw {
727        let trimmed = raw.trim();
728        if !trimmed.is_empty() {
729            return serde_json::from_str(trimmed)
730                .with_context(|| format!("Failed to parse cron schedule JSON: {trimmed}"));
731        }
732    }
733
734    if expression.trim().is_empty() {
735        anyhow::bail!("Missing schedule and legacy expression for cron job")
736    }
737
738    Ok(Schedule::Cron {
739        expr: expression.to_string(),
740        tz: None,
741    })
742}
743
744fn decode_delivery(delivery_raw: Option<&str>) -> Result<DeliveryConfig> {
745    if let Some(raw) = delivery_raw {
746        let trimmed = raw.trim();
747        if !trimmed.is_empty() {
748            return serde_json::from_str(trimmed)
749                .with_context(|| format!("Failed to parse cron delivery JSON: {trimmed}"));
750        }
751    }
752    Ok(DeliveryConfig::default())
753}
754
755fn encode_allowed_tools(allowed_tools: Option<&Vec<String>>) -> Result<Option<String>> {
756    allowed_tools
757        .map(serde_json::to_string)
758        .transpose()
759        .context("Failed to serialize cron allowed_tools")
760}
761
762fn decode_allowed_tools(raw: Option<&str>) -> Result<Option<Vec<String>>> {
763    if let Some(raw) = raw {
764        let trimmed = raw.trim();
765        if !trimmed.is_empty() {
766            return serde_json::from_str(trimmed)
767                .map(Some)
768                .with_context(|| format!("Failed to parse cron allowed_tools JSON: {trimmed}"));
769        }
770    }
771    Ok(None)
772}
773
774/// Synchronize declarative cron job definitions from config into the database.
775///
776/// For each declarative job (identified by `id`):
777/// - If the job exists in DB: update it to match the config definition.
778/// - If the job does not exist: insert it.
779///
780/// Jobs created imperatively (via CLI/API) are never modified or deleted.
781/// Declarative jobs that are no longer present in config are removed.
782pub fn sync_declarative_jobs(
783    config: &Config,
784    decls: &std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl>,
785) -> Result<()> {
786    use zeroclaw_config::schema::CronScheduleDecl;
787
788    if decls.is_empty() {
789        // If no declarative jobs are defined, clean up previously synced
790        // declarative jobs only when cron storage already exists. A fresh
791        // workspace with nothing to sync should stay DB-free on daemon start.
792        let _ = with_existing_initialized_connection(config, |conn| {
793            let deleted = conn
794                .execute("DELETE FROM cron_jobs WHERE source = 'declarative'", [])
795                .context("Failed to remove stale declarative cron jobs")?;
796            if deleted > 0 {
797                ::zeroclaw_log::record!(
798                    INFO,
799                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
800                        .with_attrs(::serde_json::json!({"count": deleted})),
801                    "Removed declarative cron jobs no longer in config"
802                );
803            }
804            Ok(())
805        })?;
806        return Ok(());
807    }
808
809    // Validate declarations before touching the DB.
810    for (id, decl) in decls {
811        validate_decl(id, decl)?;
812    }
813
814    let now = Utc::now();
815
816    with_initialized_connection(config, |conn| {
817        // Collect IDs of all declarative jobs currently defined in config.
818        let config_ids: std::collections::HashSet<&str> =
819            decls.keys().map(String::as_str).collect();
820
821        // Remove declarative jobs no longer in config.
822        {
823            let mut stmt = conn.prepare("SELECT id FROM cron_jobs WHERE source = 'declarative'")?;
824            let db_ids: Vec<String> = stmt
825                .query_map([], |row| row.get(0))?
826                .filter_map(|r| r.ok())
827                .collect();
828
829            for db_id in &db_ids {
830                if !config_ids.contains(db_id.as_str()) {
831                    conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![db_id])
832                        .with_context(|| {
833                            format!("Failed to remove stale declarative cron job '{db_id}'")
834                        })?;
835                    ::zeroclaw_log::record!(
836                        INFO,
837                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
838                            .with_attrs(::serde_json::json!({"job_id": db_id})),
839                        "Removed declarative cron job no longer in config"
840                    );
841                }
842            }
843        }
844
845        for (id, decl) in decls {
846            let schedule = convert_schedule_decl(&decl.schedule)?;
847            let expression = schedule_cron_expression(&schedule).unwrap_or_default();
848            let schedule_json = serde_json::to_string(&schedule)?;
849            let job_type = &decl.job_type;
850            let session_target = decl.session_target.as_deref().unwrap_or("isolated");
851            let delivery = match &decl.delivery {
852                Some(d) => convert_delivery_decl(d),
853                None => DeliveryConfig::default(),
854            };
855            let delivery_json = serde_json::to_string(&delivery)?;
856            let allowed_tools_json = encode_allowed_tools(decl.allowed_tools.as_ref())?;
857            let command = decl.command.as_deref().unwrap_or("");
858            let delete_after_run = matches!(decl.schedule, CronScheduleDecl::At { .. });
859
860            // Check if job already exists.
861            let exists: bool = conn
862                .prepare("SELECT COUNT(*) FROM cron_jobs WHERE id = ?1")?
863                .query_row(params![id], |row| row.get::<_, i64>(0))
864                .map(|c| c > 0)
865                .unwrap_or(false);
866
867            if exists {
868                // Update existing declarative job — preserve runtime state
869                // (next_run, last_run, last_status, last_output, created_at).
870                // Only update the schedule's next_run if the schedule itself changed.
871                let current_schedule_raw: Option<String> = conn
872                    .prepare("SELECT schedule FROM cron_jobs WHERE id = ?1")?
873                    .query_row(params![id], |row| row.get(0))
874                    .ok();
875
876                let schedule_changed = current_schedule_raw.as_deref() != Some(&schedule_json);
877
878                if schedule_changed {
879                    let next_run = next_run_for_schedule(&schedule, now)?;
880                    conn.execute(
881                        "UPDATE cron_jobs
882                         SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
883                             prompt = ?5, name = ?6, session_target = ?7, model = ?8,
884                             enabled = ?9, delivery = ?10, delete_after_run = ?11,
885                             allowed_tools = ?12, source = 'declarative', next_run = ?13,
886                             uses_memory = ?14
887                         WHERE id = ?15",
888                        params![
889                            expression,
890                            command,
891                            schedule_json,
892                            job_type,
893                            decl.prompt,
894                            decl.name,
895                            session_target,
896                            decl.model,
897                            i32::from(decl.enabled),
898                            delivery_json,
899                            i32::from(delete_after_run),
900                            allowed_tools_json,
901                            next_run.to_rfc3339(),
902                            i32::from(decl.uses_memory),
903                            id,
904                        ],
905                    )
906                    .with_context(|| format!("Failed to update declarative cron job '{id}'"))?;
907                } else {
908                    conn.execute(
909                        "UPDATE cron_jobs
910                         SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
911                             prompt = ?5, name = ?6, session_target = ?7, model = ?8,
912                             enabled = ?9, delivery = ?10, delete_after_run = ?11,
913                             allowed_tools = ?12, source = 'declarative',
914                             uses_memory = ?13
915                         WHERE id = ?14",
916                        params![
917                            expression,
918                            command,
919                            schedule_json,
920                            job_type,
921                            decl.prompt,
922                            decl.name,
923                            session_target,
924                            decl.model,
925                            i32::from(decl.enabled),
926                            delivery_json,
927                            i32::from(delete_after_run),
928                            allowed_tools_json,
929                            i32::from(decl.uses_memory),
930                            id,
931                        ],
932                    )
933                    .with_context(|| format!("Failed to update declarative cron job '{id}'"))?;
934                }
935
936                ::zeroclaw_log::record!(
937                    DEBUG,
938                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
939                        .with_attrs(::serde_json::json!({"job_id": id})),
940                    "Updated declarative cron job"
941                );
942            } else {
943                // Reverse-resolve the owning agent from
944                // `[agents.<x>].cron_jobs` membership. Orphan declarative
945                // entries that no agent claims are skipped with a warning
946                // rather than silently bound to a magic alias.
947                let Some(agent_alias) = config.agent_for_cron_job(id) else {
948                    ::zeroclaw_log::record!(
949                        WARN,
950                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
951                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
952                            .with_attrs(::serde_json::json!({"job_id": id})),
953                        "Skipping declarative cron job: no [agents.<x>].cron_jobs entry claims this id"
954                    );
955                    continue;
956                };
957                let next_run = next_run_for_schedule(&schedule, now)?;
958                conn.execute(
959                    "INSERT INTO cron_jobs (
960                        id, expression, command, schedule, job_type, prompt, name,
961                        session_target, model, enabled, delivery, delete_after_run,
962                        allowed_tools, source, uses_memory, agent_alias, created_at, next_run
963                     ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, 'declarative', ?14, ?15, ?16, ?17)",
964                    params![
965                        id,
966                        expression,
967                        command,
968                        schedule_json,
969                        job_type,
970                        decl.prompt,
971                        decl.name,
972                        session_target,
973                        decl.model,
974                        i32::from(decl.enabled),
975                        delivery_json,
976                        i32::from(delete_after_run),
977                        allowed_tools_json,
978                        i32::from(decl.uses_memory),
979                        agent_alias,
980                        now.to_rfc3339(),
981                        next_run.to_rfc3339(),
982                    ],
983                )
984                .with_context(|| {
985                    format!("Failed to insert declarative cron job '{id}'")
986                })?;
987
988                ::zeroclaw_log::record!(
989                    INFO,
990                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
991                        .with_attrs(::serde_json::json!({"job_id": id})),
992                    "Inserted declarative cron job from config"
993                );
994            }
995        }
996
997        Ok(())
998    })
999}
1000
1001/// Validate a declarative cron job definition.
1002fn validate_decl(id: &str, decl: &zeroclaw_config::schema::CronJobDecl) -> Result<()> {
1003    if id.trim().is_empty() {
1004        anyhow::bail!("Declarative cron job has empty id");
1005    }
1006
1007    match decl.job_type.to_lowercase().as_str() {
1008        "shell" => {
1009            if decl.command.as_deref().is_none_or(|c| c.trim().is_empty()) {
1010                anyhow::bail!(
1011                    "Declarative cron job '{id}': shell job requires a non-empty 'command'"
1012                );
1013            }
1014        }
1015        "agent" => {
1016            if decl.prompt.as_deref().is_none_or(|p| p.trim().is_empty()) {
1017                anyhow::bail!(
1018                    "Declarative cron job '{id}': agent job requires a non-empty 'prompt'"
1019                );
1020            }
1021        }
1022        other => {
1023            anyhow::bail!(
1024                "Declarative cron job '{id}': invalid job_type '{other}', expected 'shell' or 'agent'"
1025            );
1026        }
1027    }
1028
1029    Ok(())
1030}
1031
1032/// Convert a `CronScheduleDecl` to the runtime `Schedule` type.
1033fn convert_schedule_decl(decl: &zeroclaw_config::schema::CronScheduleDecl) -> Result<Schedule> {
1034    use zeroclaw_config::schema::CronScheduleDecl;
1035    match decl {
1036        CronScheduleDecl::Cron { expr, tz } => Ok(Schedule::Cron {
1037            expr: expr.clone(),
1038            tz: tz.clone(),
1039        }),
1040        CronScheduleDecl::Every { every_ms } => Ok(Schedule::Every {
1041            every_ms: *every_ms,
1042        }),
1043        CronScheduleDecl::At { at } => {
1044            let parsed = DateTime::parse_from_rfc3339(at)
1045                .with_context(|| {
1046                    format!("Invalid RFC3339 timestamp in declarative cron 'at': {at}")
1047                })?
1048                .with_timezone(&Utc);
1049            Ok(Schedule::At { at: parsed })
1050        }
1051    }
1052}
1053
1054/// Convert a `DeliveryConfigDecl` to the runtime `DeliveryConfig`.
1055fn convert_delivery_decl(decl: &zeroclaw_config::schema::DeliveryConfigDecl) -> DeliveryConfig {
1056    DeliveryConfig {
1057        mode: decl.mode.clone(),
1058        channel: decl.channel.clone(),
1059        to: decl.to.clone(),
1060        thread_id: decl.thread_id.clone(),
1061        best_effort: decl.best_effort,
1062    }
1063}
1064
1065fn add_column_if_missing(conn: &Connection, name: &str, sql_type: &str) -> Result<()> {
1066    let mut stmt = conn.prepare("PRAGMA table_info(cron_jobs)")?;
1067    let mut rows = stmt.query([])?;
1068    while let Some(row) = rows.next()? {
1069        let col_name: String = row.get(1)?;
1070        if col_name == name {
1071            return Ok(());
1072        }
1073    }
1074    // Drop the statement/rows before executing ALTER to release any locks
1075    drop(rows);
1076    drop(stmt);
1077
1078    // Tolerate "duplicate column name" errors to handle the race where
1079    // another process adds the column between our PRAGMA check and ALTER.
1080    match conn.execute(
1081        &format!("ALTER TABLE cron_jobs ADD COLUMN {name} {sql_type}"),
1082        [],
1083    ) {
1084        Ok(_) => Ok(()),
1085        Err(rusqlite::Error::SqliteFailure(err, Some(ref msg)))
1086            if msg.contains("duplicate column name") =>
1087        {
1088            ::zeroclaw_log::record!(
1089                DEBUG,
1090                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1091                    .with_attrs(::serde_json::json!({"error": format!("{}", err), "name": name})),
1092                "Column cron_jobs. already exists (concurrent migration)"
1093            );
1094            Ok(())
1095        }
1096        Err(e) => Err(e).with_context(|| format!("Failed to add cron_jobs.{name}")),
1097    }
1098}
1099
1100fn cron_db_path(config: &Config) -> std::path::PathBuf {
1101    config.data_dir.join("cron").join("jobs.db")
1102}
1103
1104// Read paths must not create the cron directory or jobs.db. If the DB already
1105// exists, however, reads still need the lightweight schema/migration ensure
1106// step before selecting columns added by newer releases.
1107fn with_read_connection<T>(
1108    config: &Config,
1109    f: impl FnOnce(&Connection) -> Result<T>,
1110) -> Result<Option<T>> {
1111    with_existing_initialized_connection(config, f)
1112}
1113
1114fn with_existing_initialized_connection<T>(
1115    config: &Config,
1116    f: impl FnOnce(&Connection) -> Result<T>,
1117) -> Result<Option<T>> {
1118    let db_path = cron_db_path(config);
1119    if !db_path.exists() {
1120        return Ok(None);
1121    }
1122
1123    let conn = Connection::open_with_flags(
1124        &db_path,
1125        OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
1126    )
1127    .with_context(|| {
1128        format!(
1129            "Failed to open existing cron DB: {}",
1130            db_path.display().to_string()
1131        )
1132    })?;
1133
1134    initialize_schema(&conn)?;
1135
1136    f(&conn).map(Some)
1137}
1138
1139fn with_initialized_connection<T>(
1140    config: &Config,
1141    f: impl FnOnce(&Connection) -> Result<T>,
1142) -> Result<T> {
1143    let db_path = cron_db_path(config);
1144    #[cfg(test)]
1145    {
1146        let mut counts = WRITE_CONNECTION_COUNTS_FOR_TESTS
1147            .lock()
1148            .unwrap_or_else(|e| e.into_inner());
1149        if let Some(count) = counts.get_mut(&db_path) {
1150            *count += 1;
1151        }
1152    }
1153
1154    if let Some(parent) = db_path.parent() {
1155        std::fs::create_dir_all(parent).with_context(|| {
1156            format!(
1157                "Failed to create cron directory: {}",
1158                parent.display().to_string()
1159            )
1160        })?;
1161    }
1162
1163    let conn = Connection::open(&db_path)
1164        .with_context(|| format!("Failed to open cron DB: {}", db_path.display().to_string()))?;
1165
1166    initialize_schema(&conn)?;
1167
1168    f(&conn)
1169}
1170
1171/// Apply the completion state change for a cron job inside an existing connection.
1172///
1173/// This keeps the scheduler's normal path and the fallback path using the same
1174/// SQL mutation logic while allowing the caller to decide whether the
1175/// run-history write should be attempted first.
1176fn apply_run_completion_state(
1177    conn: &Connection,
1178    job: &CronJob,
1179    job_state_at: DateTime<Utc>,
1180    status: &str,
1181    output: Option<&str>,
1182    action: RunCompletionAction,
1183) -> Result<()> {
1184    let bounded_output = output.map(truncate_cron_output);
1185
1186    match action {
1187        RunCompletionAction::Reschedule => {
1188            let next_run = next_run_for_schedule(&job.schedule, job_state_at)?;
1189            let changed = conn
1190                .execute(
1191                    "UPDATE cron_jobs
1192                     SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
1193                     WHERE id = ?5",
1194                    params![
1195                        next_run.to_rfc3339(),
1196                        job_state_at.to_rfc3339(),
1197                        status,
1198                        bounded_output.as_deref(),
1199                        job.id,
1200                    ],
1201                )
1202                .context("Failed to update cron job run state")?;
1203            if changed == 0 {
1204                anyhow::bail!("Cron job '{}' not found", job.id);
1205            }
1206        }
1207        RunCompletionAction::Disable => {
1208            let changed = conn
1209                .execute(
1210                    "UPDATE cron_jobs
1211                     SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
1212                     WHERE id = ?4",
1213                    params![
1214                        job_state_at.to_rfc3339(),
1215                        status,
1216                        bounded_output.as_deref(),
1217                        job.id,
1218                    ],
1219                )
1220                .context("Failed to disable completed one-shot cron job")?;
1221            if changed == 0 {
1222                anyhow::bail!("Cron job '{}' not found", job.id);
1223            }
1224        }
1225        RunCompletionAction::Delete => {
1226            let changed = conn
1227                .execute("DELETE FROM cron_jobs WHERE id = ?1", params![job.id])
1228                .context("Failed to delete completed one-shot cron job")?;
1229            if changed == 0 {
1230                anyhow::bail!("Cron job '{}' not found", job.id);
1231            }
1232        }
1233    }
1234
1235    Ok(())
1236}
1237
1238fn initialize_schema(conn: &Connection) -> Result<()> {
1239    conn.execute_batch(
1240        "PRAGMA foreign_keys = ON;
1241         CREATE TABLE IF NOT EXISTS cron_jobs (
1242            id               TEXT PRIMARY KEY,
1243            expression       TEXT NOT NULL,
1244            command          TEXT NOT NULL,
1245            schedule         TEXT,
1246            job_type         TEXT NOT NULL DEFAULT 'shell',
1247            prompt           TEXT,
1248            name             TEXT,
1249            session_target   TEXT NOT NULL DEFAULT 'isolated',
1250            model            TEXT,
1251            enabled          INTEGER NOT NULL DEFAULT 1,
1252            delivery         TEXT,
1253            delete_after_run INTEGER NOT NULL DEFAULT 0,
1254            allowed_tools    TEXT,
1255            created_at       TEXT NOT NULL,
1256            next_run         TEXT NOT NULL,
1257            last_run         TEXT,
1258            last_status      TEXT,
1259            last_output      TEXT
1260        );
1261        CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run);
1262
1263        CREATE TABLE IF NOT EXISTS cron_runs (
1264            id          INTEGER PRIMARY KEY AUTOINCREMENT,
1265            job_id      TEXT NOT NULL,
1266            started_at  TEXT NOT NULL,
1267            finished_at TEXT NOT NULL,
1268            status      TEXT NOT NULL,
1269            output      TEXT,
1270            duration_ms INTEGER,
1271            FOREIGN KEY (job_id) REFERENCES cron_jobs(id) ON DELETE CASCADE
1272        );
1273        CREATE INDEX IF NOT EXISTS idx_cron_runs_job_id ON cron_runs(job_id);
1274        CREATE INDEX IF NOT EXISTS idx_cron_runs_started_at ON cron_runs(started_at);
1275        CREATE INDEX IF NOT EXISTS idx_cron_runs_job_started ON cron_runs(job_id, started_at);",
1276    )
1277    .context("Failed to initialize cron schema")?;
1278
1279    add_column_if_missing(conn, "schedule", "TEXT")?;
1280    add_column_if_missing(conn, "job_type", "TEXT NOT NULL DEFAULT 'shell'")?;
1281    add_column_if_missing(conn, "prompt", "TEXT")?;
1282    add_column_if_missing(conn, "name", "TEXT")?;
1283    add_column_if_missing(conn, "session_target", "TEXT NOT NULL DEFAULT 'isolated'")?;
1284    add_column_if_missing(conn, "model", "TEXT")?;
1285    add_column_if_missing(conn, "enabled", "INTEGER NOT NULL DEFAULT 1")?;
1286    add_column_if_missing(conn, "delivery", "TEXT")?;
1287    add_column_if_missing(conn, "delete_after_run", "INTEGER NOT NULL DEFAULT 0")?;
1288    add_column_if_missing(conn, "allowed_tools", "TEXT")?;
1289    add_column_if_missing(conn, "source", "TEXT DEFAULT 'imperative'")?;
1290    add_column_if_missing(conn, "uses_memory", "INTEGER NOT NULL DEFAULT 1")?;
1291    // Rows written before the column existed get an empty alias; the
1292    // scheduler treats those as orphans (skip with warning) rather than
1293    // coercing them to a magic alias.
1294    add_column_if_missing(conn, "agent_alias", "TEXT NOT NULL DEFAULT ''")?;
1295
1296    Ok(())
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    use super::*;
1302    use chrono::Duration as ChronoDuration;
1303    use tempfile::TempDir;
1304    use zeroclaw_config::schema::Config;
1305
1306    fn test_config(tmp: &TempDir) -> Config {
1307        let config = Config {
1308            data_dir: tmp.path().join("data"),
1309            config_path: tmp.path().join("config.toml"),
1310            ..Config::default()
1311        };
1312        std::fs::create_dir_all(&config.data_dir).unwrap();
1313        config
1314    }
1315
1316    fn cron_dir(config: &Config) -> std::path::PathBuf {
1317        config.data_dir.join("cron")
1318    }
1319
1320    fn cron_db(config: &Config) -> std::path::PathBuf {
1321        cron_dir(config).join("jobs.db")
1322    }
1323
1324    #[test]
1325    fn read_only_queries_on_empty_workspace_do_not_initialize_cron_db() {
1326        let tmp = TempDir::new().unwrap();
1327        let config = test_config(&tmp);
1328
1329        assert!(list_jobs(&config).unwrap().is_empty());
1330        assert!(due_jobs(&config, Utc::now()).unwrap().is_empty());
1331        assert!(all_overdue_jobs(&config, Utc::now()).unwrap().is_empty());
1332        assert!(list_runs(&config, "missing", 10).unwrap().is_empty());
1333
1334        let err = get_job(&config, "missing").unwrap_err();
1335        assert!(err.to_string().contains("not found"));
1336
1337        assert!(
1338            !cron_dir(&config).exists(),
1339            "read-only queries should not create the cron directory"
1340        );
1341        assert!(
1342            !cron_db(&config).exists(),
1343            "read-only queries should not create jobs.db"
1344        );
1345    }
1346
1347    #[test]
1348    fn first_write_initializes_schema_and_follow_up_reads_work() {
1349        let tmp = TempDir::new().unwrap();
1350        let config = test_config(&tmp);
1351
1352        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1353
1354        assert!(cron_db(&config).exists());
1355        assert_eq!(get_job(&config, &job.id).unwrap().id, job.id);
1356        assert_eq!(list_jobs(&config).unwrap().len(), 1);
1357    }
1358
1359    #[test]
1360    fn empty_declarative_sync_on_empty_workspace_does_not_initialize_cron_db() {
1361        let tmp = TempDir::new().unwrap();
1362        let config = test_config(&tmp);
1363
1364        sync_declarative_jobs(&config, &std::collections::HashMap::new()).unwrap();
1365
1366        assert!(
1367            !cron_dir(&config).exists(),
1368            "empty declarative sync should not create the cron directory"
1369        );
1370        assert!(
1371            !cron_db(&config).exists(),
1372            "empty declarative sync should not create jobs.db"
1373        );
1374    }
1375
1376    #[test]
1377    fn read_existing_old_schema_db_migrates_before_querying_new_columns() {
1378        let tmp = TempDir::new().unwrap();
1379        let config = test_config(&tmp);
1380        let cron_dir = cron_dir(&config);
1381        std::fs::create_dir_all(&cron_dir).unwrap();
1382        let db_path = cron_db(&config);
1383        let conn = Connection::open(&db_path).unwrap();
1384        conn.execute_batch(
1385            "CREATE TABLE cron_jobs (
1386                id               TEXT PRIMARY KEY,
1387                expression       TEXT NOT NULL,
1388                command          TEXT NOT NULL,
1389                schedule         TEXT,
1390                job_type         TEXT NOT NULL DEFAULT 'shell',
1391                prompt           TEXT,
1392                name             TEXT,
1393                session_target   TEXT NOT NULL DEFAULT 'isolated',
1394                model            TEXT,
1395                enabled          INTEGER NOT NULL DEFAULT 1,
1396                delivery         TEXT,
1397                delete_after_run INTEGER NOT NULL DEFAULT 0,
1398                allowed_tools    TEXT,
1399                created_at       TEXT NOT NULL,
1400                next_run         TEXT NOT NULL,
1401                last_run         TEXT,
1402                last_status      TEXT,
1403                last_output      TEXT
1404            );",
1405        )
1406        .unwrap();
1407        conn.execute(
1408            "INSERT INTO cron_jobs (
1409                id, expression, command, schedule, job_type, session_target,
1410                enabled, delete_after_run, created_at, next_run
1411             ) VALUES (?1, ?2, ?3, ?4, 'shell', 'isolated', 1, 0, ?5, ?6)",
1412            params![
1413                "legacy-schema",
1414                "*/5 * * * *",
1415                "echo legacy",
1416                Option::<String>::None,
1417                Utc::now().to_rfc3339(),
1418                (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1419            ],
1420        )
1421        .unwrap();
1422        drop(conn);
1423
1424        let jobs = list_jobs(&config).unwrap();
1425
1426        assert_eq!(jobs.len(), 1);
1427        assert_eq!(jobs[0].id, "legacy-schema");
1428        assert_eq!(jobs[0].source, "imperative");
1429        assert!(jobs[0].uses_memory);
1430
1431        let conn = Connection::open(&db_path).unwrap();
1432        let columns: Vec<String> = conn
1433            .prepare("PRAGMA table_info(cron_jobs)")
1434            .unwrap()
1435            .query_map([], |row| row.get(1))
1436            .unwrap()
1437            .collect::<Result<_, _>>()
1438            .unwrap();
1439        assert!(columns.iter().any(|name| name == "source"));
1440        assert!(columns.iter().any(|name| name == "uses_memory"));
1441    }
1442
1443    #[test]
1444    fn add_job_accepts_five_field_expression() {
1445        let tmp = TempDir::new().unwrap();
1446        let config = test_config(&tmp);
1447
1448        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1449        assert_eq!(job.expression, "*/5 * * * *");
1450        assert_eq!(job.command, "echo ok");
1451        assert!(matches!(job.schedule, Schedule::Cron { .. }));
1452    }
1453
1454    #[test]
1455    fn add_shell_job_marks_at_schedule_for_auto_delete() {
1456        let tmp = TempDir::new().unwrap();
1457        let config = test_config(&tmp);
1458
1459        let one_shot = add_shell_job(
1460            &config,
1461            "default",
1462            None,
1463            Schedule::At {
1464                at: Utc::now() + ChronoDuration::minutes(10),
1465            },
1466            "echo once",
1467            None,
1468        )
1469        .unwrap();
1470        assert!(one_shot.delete_after_run);
1471
1472        let recurring = add_shell_job(
1473            &config,
1474            "default",
1475            None,
1476            Schedule::Every { every_ms: 60_000 },
1477            "echo recurring",
1478            None,
1479        )
1480        .unwrap();
1481        assert!(!recurring.delete_after_run);
1482    }
1483
1484    #[test]
1485    fn add_shell_job_persists_delivery() {
1486        let tmp = TempDir::new().unwrap();
1487        let config = test_config(&tmp);
1488
1489        let job = add_shell_job(
1490            &config,
1491            "default",
1492            Some("deliver-shell".into()),
1493            Schedule::Cron {
1494                expr: "*/5 * * * *".into(),
1495                tz: None,
1496            },
1497            "echo delivered",
1498            Some(DeliveryConfig {
1499                mode: "announce".into(),
1500                channel: Some("discord".into()),
1501                to: Some("1234567890".into()),
1502                thread_id: None,
1503                best_effort: true,
1504            }),
1505        )
1506        .unwrap();
1507
1508        assert_eq!(job.delivery.mode, "announce");
1509        assert_eq!(job.delivery.channel.as_deref(), Some("discord"));
1510        assert_eq!(job.delivery.to.as_deref(), Some("1234567890"));
1511
1512        let stored = get_job(&config, &job.id).unwrap();
1513        assert_eq!(stored.delivery.mode, "announce");
1514        assert_eq!(stored.delivery.channel.as_deref(), Some("discord"));
1515        assert_eq!(stored.delivery.to.as_deref(), Some("1234567890"));
1516    }
1517
1518    #[test]
1519    fn add_agent_job_rejects_invalid_announce_delivery() {
1520        let tmp = TempDir::new().unwrap();
1521        let config = test_config(&tmp);
1522
1523        let err = add_agent_job(
1524            &config,
1525            "default",
1526            Some("deliver-agent".into()),
1527            Schedule::Cron {
1528                expr: "*/5 * * * *".into(),
1529                tz: None,
1530            },
1531            "summarize logs",
1532            SessionTarget::Isolated,
1533            None,
1534            Some(DeliveryConfig {
1535                mode: "announce".into(),
1536                channel: Some("discord".into()),
1537                to: None,
1538                thread_id: None,
1539                best_effort: true,
1540            }),
1541            false,
1542            None,
1543        )
1544        .unwrap_err();
1545
1546        assert!(err.to_string().contains("delivery.to is required"));
1547    }
1548
1549    #[test]
1550    fn add_shell_job_rejects_invalid_delivery_mode() {
1551        let tmp = TempDir::new().unwrap();
1552        let config = test_config(&tmp);
1553
1554        let err = add_shell_job(
1555            &config,
1556            "default",
1557            Some("deliver-shell".into()),
1558            Schedule::Cron {
1559                expr: "*/5 * * * *".into(),
1560                tz: None,
1561            },
1562            "echo delivered",
1563            Some(DeliveryConfig {
1564                mode: "annouce".into(),
1565                channel: Some("discord".into()),
1566                to: Some("1234567890".into()),
1567                thread_id: None,
1568                best_effort: true,
1569            }),
1570        )
1571        .unwrap_err();
1572
1573        assert!(err.to_string().contains("unsupported delivery mode"));
1574    }
1575
1576    #[test]
1577    fn add_list_remove_roundtrip() {
1578        let tmp = TempDir::new().unwrap();
1579        let config = test_config(&tmp);
1580
1581        let job = add_job(&config, "test-agent", "*/10 * * * *", "echo roundtrip").unwrap();
1582        let listed = list_jobs(&config).unwrap();
1583        assert_eq!(listed.len(), 1);
1584        assert_eq!(listed[0].id, job.id);
1585
1586        remove_job(&config, &job.id).unwrap();
1587        assert!(list_jobs(&config).unwrap().is_empty());
1588    }
1589
1590    #[test]
1591    fn due_jobs_filters_by_timestamp_and_enabled() {
1592        let tmp = TempDir::new().unwrap();
1593        let config = test_config(&tmp);
1594
1595        let job = add_job(&config, "test-agent", "* * * * *", "echo due").unwrap();
1596
1597        let due_now = due_jobs(&config, Utc::now()).unwrap();
1598        assert!(due_now.is_empty(), "new job should not be due immediately");
1599
1600        let far_future = Utc::now() + ChronoDuration::days(365);
1601        let due_future = due_jobs(&config, far_future).unwrap();
1602        assert_eq!(due_future.len(), 1, "job should be due in far future");
1603
1604        let _ = update_job(
1605            &config,
1606            &job.id,
1607            CronJobPatch {
1608                enabled: Some(false),
1609                ..CronJobPatch::default()
1610            },
1611        )
1612        .unwrap();
1613        let due_after_disable = due_jobs(&config, far_future).unwrap();
1614        assert!(due_after_disable.is_empty());
1615    }
1616
1617    #[test]
1618    fn due_jobs_respects_scheduler_max_tasks_limit() {
1619        let tmp = TempDir::new().unwrap();
1620        let mut config = test_config(&tmp);
1621        config.scheduler.max_tasks = 2;
1622
1623        let _ = add_job(&config, "test-agent", "* * * * *", "echo due-1").unwrap();
1624        let _ = add_job(&config, "test-agent", "* * * * *", "echo due-2").unwrap();
1625        let _ = add_job(&config, "test-agent", "* * * * *", "echo due-3").unwrap();
1626
1627        let far_future = Utc::now() + ChronoDuration::days(365);
1628        let due = due_jobs(&config, far_future).unwrap();
1629        assert_eq!(due.len(), 2);
1630    }
1631
1632    #[test]
1633    fn all_overdue_jobs_ignores_max_tasks_limit() {
1634        let tmp = TempDir::new().unwrap();
1635        let mut config = test_config(&tmp);
1636        config.scheduler.max_tasks = 2;
1637
1638        let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-1").unwrap();
1639        let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-2").unwrap();
1640        let _ = add_job(&config, "test-agent", "* * * * *", "echo ov-3").unwrap();
1641
1642        let far_future = Utc::now() + ChronoDuration::days(365);
1643        // due_jobs respects the limit
1644        let due = due_jobs(&config, far_future).unwrap();
1645        assert_eq!(due.len(), 2);
1646        // all_overdue_jobs returns everything
1647        let overdue = all_overdue_jobs(&config, far_future).unwrap();
1648        assert_eq!(overdue.len(), 3);
1649    }
1650
1651    #[test]
1652    fn all_overdue_jobs_excludes_disabled_jobs() {
1653        let tmp = TempDir::new().unwrap();
1654        let config = test_config(&tmp);
1655
1656        let job = add_job(&config, "test-agent", "* * * * *", "echo disabled").unwrap();
1657        let _ = update_job(
1658            &config,
1659            &job.id,
1660            CronJobPatch {
1661                enabled: Some(false),
1662                ..CronJobPatch::default()
1663            },
1664        )
1665        .unwrap();
1666
1667        let far_future = Utc::now() + ChronoDuration::days(365);
1668        let overdue = all_overdue_jobs(&config, far_future).unwrap();
1669        assert!(overdue.is_empty());
1670    }
1671
1672    #[test]
1673    fn add_agent_job_persists_allowed_tools() {
1674        let tmp = TempDir::new().unwrap();
1675        let config = test_config(&tmp);
1676
1677        let job = add_agent_job(
1678            &config,
1679            "default",
1680            Some("agent".into()),
1681            Schedule::Every { every_ms: 60_000 },
1682            "do work",
1683            SessionTarget::Isolated,
1684            None,
1685            None,
1686            false,
1687            Some(vec!["file_read".into(), "web_search".into()]),
1688        )
1689        .unwrap();
1690
1691        assert_eq!(
1692            job.allowed_tools,
1693            Some(vec!["file_read".into(), "web_search".into()])
1694        );
1695
1696        let stored = get_job(&config, &job.id).unwrap();
1697        assert_eq!(stored.allowed_tools, job.allowed_tools);
1698    }
1699
1700    #[test]
1701    fn update_job_persists_allowed_tools_patch() {
1702        let tmp = TempDir::new().unwrap();
1703        let config = test_config(&tmp);
1704
1705        let job = add_agent_job(
1706            &config,
1707            "default",
1708            Some("agent".into()),
1709            Schedule::Every { every_ms: 60_000 },
1710            "do work",
1711            SessionTarget::Isolated,
1712            None,
1713            None,
1714            false,
1715            None,
1716        )
1717        .unwrap();
1718
1719        let updated = update_job(
1720            &config,
1721            &job.id,
1722            CronJobPatch {
1723                allowed_tools: Some(vec!["shell".into()]),
1724                ..CronJobPatch::default()
1725            },
1726        )
1727        .unwrap();
1728
1729        assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
1730        assert_eq!(
1731            get_job(&config, &job.id).unwrap().allowed_tools,
1732            Some(vec!["shell".into()])
1733        );
1734    }
1735
1736    #[test]
1737    fn reschedule_after_run_persists_last_status_and_last_run() {
1738        let tmp = TempDir::new().unwrap();
1739        let config = test_config(&tmp);
1740
1741        let job = add_job(&config, "test-agent", "*/15 * * * *", "echo run").unwrap();
1742        reschedule_after_run(&config, &job, false, "failed output").unwrap();
1743
1744        let listed = list_jobs(&config).unwrap();
1745        let stored = listed.iter().find(|j| j.id == job.id).unwrap();
1746        assert_eq!(stored.last_status.as_deref(), Some("error"));
1747        assert!(stored.last_run.is_some());
1748        assert_eq!(stored.last_output.as_deref(), Some("failed output"));
1749    }
1750
1751    #[test]
1752    fn job_type_from_sql_reads_valid_value() {
1753        let tmp = TempDir::new().unwrap();
1754        let config = test_config(&tmp);
1755        let now = Utc::now();
1756
1757        with_initialized_connection(&config, |conn| {
1758            conn.execute(
1759                "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1760                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1761                params![
1762                    "job-type-valid",
1763                    "*/5 * * * *",
1764                    "echo ok",
1765                    Option::<String>::None,
1766                    "agent",
1767                    now.to_rfc3339(),
1768                    (now + ChronoDuration::minutes(5)).to_rfc3339(),
1769                ],
1770            )?;
1771            Ok(())
1772        })
1773        .unwrap();
1774
1775        let job = get_job(&config, "job-type-valid").unwrap();
1776        assert_eq!(job.job_type, JobType::Agent);
1777    }
1778
1779    #[test]
1780    fn job_type_from_sql_rejects_invalid_value() {
1781        let tmp = TempDir::new().unwrap();
1782        let config = test_config(&tmp);
1783        let now = Utc::now();
1784
1785        with_initialized_connection(&config, |conn| {
1786            conn.execute(
1787                "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1788                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1789                params![
1790                    "job-type-invalid",
1791                    "*/5 * * * *",
1792                    "echo ok",
1793                    Option::<String>::None,
1794                    "unknown",
1795                    now.to_rfc3339(),
1796                    (now + ChronoDuration::minutes(5)).to_rfc3339(),
1797                ],
1798            )?;
1799            Ok(())
1800        })
1801        .unwrap();
1802
1803        assert!(get_job(&config, "job-type-invalid").is_err());
1804    }
1805
1806    #[test]
1807    fn migration_falls_back_to_legacy_expression() {
1808        let tmp = TempDir::new().unwrap();
1809        let config = test_config(&tmp);
1810
1811        with_initialized_connection(&config, |conn| {
1812            conn.execute(
1813                "INSERT INTO cron_jobs (id, expression, command, created_at, next_run)
1814                 VALUES (?1, ?2, ?3, ?4, ?5)",
1815                params![
1816                    "legacy-id",
1817                    "*/5 * * * *",
1818                    "echo legacy",
1819                    Utc::now().to_rfc3339(),
1820                    (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1821                ],
1822            )?;
1823            conn.execute(
1824                "UPDATE cron_jobs SET schedule = NULL WHERE id = 'legacy-id'",
1825                [],
1826            )?;
1827            Ok(())
1828        })
1829        .unwrap();
1830
1831        let job = get_job(&config, "legacy-id").unwrap();
1832        assert!(matches!(job.schedule, Schedule::Cron { .. }));
1833    }
1834
1835    #[test]
1836    fn record_and_prune_runs() {
1837        let tmp = TempDir::new().unwrap();
1838        let mut config = test_config(&tmp);
1839        config.scheduler.max_run_history = 2;
1840        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1841        let base = Utc::now();
1842
1843        for idx in 0..3 {
1844            let start = base + ChronoDuration::seconds(idx);
1845            let end = start + ChronoDuration::milliseconds(100);
1846            record_run(&config, &job.id, start, end, "ok", Some("done"), 100).unwrap();
1847        }
1848
1849        let runs = list_runs(&config, &job.id, 10).unwrap();
1850        assert_eq!(runs.len(), 2);
1851    }
1852
1853    #[test]
1854    fn remove_job_cascades_run_history() {
1855        let tmp = TempDir::new().unwrap();
1856        let config = test_config(&tmp);
1857        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo ok").unwrap();
1858        let start = Utc::now();
1859        record_run(
1860            &config,
1861            &job.id,
1862            start,
1863            start + ChronoDuration::milliseconds(5),
1864            "ok",
1865            Some("ok"),
1866            5,
1867        )
1868        .unwrap();
1869
1870        remove_job(&config, &job.id).unwrap();
1871        let runs = list_runs(&config, &job.id, 10).unwrap();
1872        assert!(runs.is_empty());
1873    }
1874
1875    #[test]
1876    fn record_run_truncates_large_output() {
1877        let tmp = TempDir::new().unwrap();
1878        let config = test_config(&tmp);
1879        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo trunc").unwrap();
1880        let output = "x".repeat(MAX_CRON_OUTPUT_BYTES + 512);
1881
1882        record_run(
1883            &config,
1884            &job.id,
1885            Utc::now(),
1886            Utc::now(),
1887            "ok",
1888            Some(&output),
1889            1,
1890        )
1891        .unwrap();
1892
1893        let runs = list_runs(&config, &job.id, 1).unwrap();
1894        let stored = runs[0].output.as_deref().unwrap_or_default();
1895        assert!(stored.ends_with(TRUNCATED_OUTPUT_MARKER));
1896        assert!(stored.len() <= MAX_CRON_OUTPUT_BYTES);
1897    }
1898
1899    #[test]
1900    fn reschedule_after_run_disables_at_schedule_job() {
1901        let tmp = TempDir::new().unwrap();
1902        let config = test_config(&tmp);
1903        let at = Utc::now() + ChronoDuration::minutes(10);
1904        let job = add_shell_job(
1905            &config,
1906            "test-agent",
1907            None,
1908            Schedule::At { at },
1909            "echo once",
1910            None,
1911        )
1912        .unwrap();
1913
1914        reschedule_after_run(&config, &job, true, "done").unwrap();
1915
1916        let stored = get_job(&config, &job.id).unwrap();
1917        assert!(
1918            !stored.enabled,
1919            "At schedule job should be disabled after reschedule"
1920        );
1921        assert_eq!(stored.last_status.as_deref(), Some("ok"));
1922    }
1923
1924    #[test]
1925    fn reschedule_after_run_disables_at_schedule_job_on_failure() {
1926        let tmp = TempDir::new().unwrap();
1927        let config = test_config(&tmp);
1928        let at = Utc::now() + ChronoDuration::minutes(10);
1929        let job = add_shell_job(
1930            &config,
1931            "test-agent",
1932            None,
1933            Schedule::At { at },
1934            "echo once",
1935            None,
1936        )
1937        .unwrap();
1938
1939        reschedule_after_run(&config, &job, false, "failed").unwrap();
1940
1941        let stored = get_job(&config, &job.id).unwrap();
1942        assert!(
1943            !stored.enabled,
1944            "At schedule job should be disabled after reschedule even on failure"
1945        );
1946        assert_eq!(stored.last_status.as_deref(), Some("error"));
1947        assert_eq!(stored.last_output.as_deref(), Some("failed"));
1948    }
1949
1950    #[test]
1951    fn reschedule_after_run_truncates_last_output() {
1952        let tmp = TempDir::new().unwrap();
1953        let config = test_config(&tmp);
1954        let job = add_job(&config, "test-agent", "*/5 * * * *", "echo trunc").unwrap();
1955        let output = "y".repeat(MAX_CRON_OUTPUT_BYTES + 1024);
1956
1957        reschedule_after_run(&config, &job, false, &output).unwrap();
1958
1959        let stored = get_job(&config, &job.id).unwrap();
1960        let last_output = stored.last_output.as_deref().unwrap_or_default();
1961        assert!(last_output.ends_with(TRUNCATED_OUTPUT_MARKER));
1962        assert!(last_output.len() <= MAX_CRON_OUTPUT_BYTES);
1963    }
1964
1965    // ── Declarative cron job sync tests ──────────────────────────
1966
1967    fn make_shell_decl(
1968        id: &str,
1969        expr: &str,
1970        cmd: &str,
1971    ) -> (String, zeroclaw_config::schema::CronJobDecl) {
1972        (
1973            id.to_string(),
1974            zeroclaw_config::schema::CronJobDecl {
1975                name: Some(format!("decl-{id}")),
1976                job_type: "shell".to_string(),
1977                schedule: zeroclaw_config::schema::CronScheduleDecl::Cron {
1978                    expr: expr.to_string(),
1979                    tz: None,
1980                },
1981                command: Some(cmd.to_string()),
1982                prompt: None,
1983                enabled: true,
1984                model: None,
1985                allowed_tools: None,
1986                uses_memory: true,
1987                session_target: None,
1988                delivery: None,
1989            },
1990        )
1991    }
1992
1993    fn make_agent_decl(
1994        id: &str,
1995        expr: &str,
1996        prompt: &str,
1997    ) -> (String, zeroclaw_config::schema::CronJobDecl) {
1998        (
1999            id.to_string(),
2000            zeroclaw_config::schema::CronJobDecl {
2001                name: Some(format!("decl-{id}")),
2002                job_type: "agent".to_string(),
2003                schedule: zeroclaw_config::schema::CronScheduleDecl::Cron {
2004                    expr: expr.to_string(),
2005                    tz: None,
2006                },
2007                command: None,
2008                prompt: Some(prompt.to_string()),
2009                enabled: true,
2010                model: None,
2011                allowed_tools: None,
2012                uses_memory: true,
2013                session_target: None,
2014                delivery: None,
2015            },
2016        )
2017    }
2018
2019    fn decls_map(
2020        items: Vec<(String, zeroclaw_config::schema::CronJobDecl)>,
2021    ) -> std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl> {
2022        items.into_iter().collect()
2023    }
2024
2025    /// Seed an enabled agent that claims `ids` via its `cron_jobs` list so
2026    /// `sync_declarative_jobs` can resolve an owning agent for each entry.
2027    fn seed_claiming_agent(config: &mut Config, ids: &[&str]) {
2028        config.agents.insert(
2029            "test-agent".to_string(),
2030            zeroclaw_config::schema::AliasedAgentConfig {
2031                enabled: true,
2032                cron_jobs: ids.iter().map(|s| (*s).to_string()).collect(),
2033                ..Default::default()
2034            },
2035        );
2036    }
2037
2038    #[test]
2039    fn sync_inserts_new_declarative_job() {
2040        let tmp = TempDir::new().unwrap();
2041        let mut config = test_config(&tmp);
2042        seed_claiming_agent(&mut config, &["daily-backup"]);
2043
2044        let decls = decls_map(vec![make_shell_decl(
2045            "daily-backup",
2046            "0 2 * * *",
2047            "echo backup",
2048        )]);
2049        sync_declarative_jobs(&config, &decls).unwrap();
2050
2051        let job = get_job(&config, "daily-backup").unwrap();
2052        assert_eq!(job.command, "echo backup");
2053        assert_eq!(job.source, "declarative");
2054        assert_eq!(job.name.as_deref(), Some("decl-daily-backup"));
2055    }
2056
2057    #[test]
2058    fn sync_updates_existing_declarative_job() {
2059        let tmp = TempDir::new().unwrap();
2060        let mut config = test_config(&tmp);
2061        seed_claiming_agent(&mut config, &["updatable"]);
2062
2063        let decls = decls_map(vec![make_shell_decl("updatable", "0 2 * * *", "echo v1")]);
2064        sync_declarative_jobs(&config, &decls).unwrap();
2065
2066        let job_v1 = get_job(&config, "updatable").unwrap();
2067        assert_eq!(job_v1.command, "echo v1");
2068
2069        let decls_v2 = decls_map(vec![make_shell_decl("updatable", "0 3 * * *", "echo v2")]);
2070        sync_declarative_jobs(&config, &decls_v2).unwrap();
2071
2072        let job_v2 = get_job(&config, "updatable").unwrap();
2073        assert_eq!(job_v2.command, "echo v2");
2074        assert_eq!(job_v2.expression, "0 3 * * *");
2075        assert_eq!(job_v2.source, "declarative");
2076    }
2077
2078    #[test]
2079    fn sync_does_not_delete_imperative_jobs() {
2080        let tmp = TempDir::new().unwrap();
2081        let mut config = test_config(&tmp);
2082        seed_claiming_agent(&mut config, &["my-decl"]);
2083
2084        // Create an imperative job via the normal API.
2085        let imperative = add_job(&config, "test-agent", "*/10 * * * *", "echo imperative").unwrap();
2086
2087        // Sync declarative jobs (none of which match the imperative job).
2088        let decls = decls_map(vec![make_shell_decl("my-decl", "0 2 * * *", "echo decl")]);
2089        sync_declarative_jobs(&config, &decls).unwrap();
2090
2091        // Imperative job should still exist.
2092        let still_there = get_job(&config, &imperative.id).unwrap();
2093        assert_eq!(still_there.command, "echo imperative");
2094        assert_eq!(still_there.source, "imperative");
2095
2096        // Declarative job should also exist.
2097        let decl_job = get_job(&config, "my-decl").unwrap();
2098        assert_eq!(decl_job.command, "echo decl");
2099    }
2100
2101    #[test]
2102    fn sync_removes_stale_declarative_jobs() {
2103        let tmp = TempDir::new().unwrap();
2104        let mut config = test_config(&tmp);
2105        seed_claiming_agent(&mut config, &["keeper", "stale"]);
2106
2107        // Insert two declarative jobs.
2108        let decls = decls_map(vec![
2109            make_shell_decl("keeper", "0 2 * * *", "echo keep"),
2110            make_shell_decl("stale", "0 3 * * *", "echo stale"),
2111        ]);
2112        sync_declarative_jobs(&config, &decls).unwrap();
2113
2114        // Now sync with only "keeper"; "stale" should be removed.
2115        let decls_v2 = decls_map(vec![make_shell_decl("keeper", "0 2 * * *", "echo keep")]);
2116        sync_declarative_jobs(&config, &decls_v2).unwrap();
2117
2118        assert!(get_job(&config, "stale").is_err());
2119        assert!(get_job(&config, "keeper").is_ok());
2120    }
2121
2122    #[test]
2123    fn sync_empty_removes_all_declarative_jobs() {
2124        let tmp = TempDir::new().unwrap();
2125        let mut config = test_config(&tmp);
2126        seed_claiming_agent(&mut config, &["to-remove"]);
2127
2128        let decls = decls_map(vec![make_shell_decl("to-remove", "0 2 * * *", "echo bye")]);
2129        sync_declarative_jobs(&config, &decls).unwrap();
2130        assert!(get_job(&config, "to-remove").is_ok());
2131
2132        // Sync with empty map.
2133        sync_declarative_jobs(&config, &std::collections::HashMap::new()).unwrap();
2134        assert!(get_job(&config, "to-remove").is_err());
2135    }
2136
2137    #[test]
2138    fn sync_validates_shell_job_requires_command() {
2139        let tmp = TempDir::new().unwrap();
2140        let config = test_config(&tmp);
2141
2142        let (id, mut decl) = make_shell_decl("bad", "0 2 * * *", "echo ok");
2143        decl.command = None;
2144
2145        let decls = decls_map(vec![(id, decl)]);
2146        let result = sync_declarative_jobs(&config, &decls);
2147        assert!(result.is_err());
2148        assert!(result.unwrap_err().to_string().contains("command"));
2149    }
2150
2151    #[test]
2152    fn sync_validates_agent_job_requires_prompt() {
2153        let tmp = TempDir::new().unwrap();
2154        let config = test_config(&tmp);
2155
2156        let (id, mut decl) = make_agent_decl("bad-agent", "0 2 * * *", "do stuff");
2157        decl.prompt = None;
2158
2159        let decls = decls_map(vec![(id, decl)]);
2160        let result = sync_declarative_jobs(&config, &decls);
2161        assert!(result.is_err());
2162        assert!(result.unwrap_err().to_string().contains("prompt"));
2163    }
2164
2165    #[test]
2166    fn sync_agent_job_inserts_correctly() {
2167        let tmp = TempDir::new().unwrap();
2168        let mut config = test_config(&tmp);
2169        seed_claiming_agent(&mut config, &["agent-check"]);
2170
2171        let decls = decls_map(vec![make_agent_decl(
2172            "agent-check",
2173            "*/15 * * * *",
2174            "check health",
2175        )]);
2176        sync_declarative_jobs(&config, &decls).unwrap();
2177
2178        let job = get_job(&config, "agent-check").unwrap();
2179        assert_eq!(job.job_type, JobType::Agent);
2180        assert_eq!(job.prompt.as_deref(), Some("check health"));
2181        assert_eq!(job.source, "declarative");
2182    }
2183
2184    #[test]
2185    fn sync_every_schedule_works() {
2186        let tmp = TempDir::new().unwrap();
2187        let mut config = test_config(&tmp);
2188        seed_claiming_agent(&mut config, &["interval-job"]);
2189
2190        let decl = zeroclaw_config::schema::CronJobDecl {
2191            name: None,
2192            job_type: "shell".to_string(),
2193            schedule: zeroclaw_config::schema::CronScheduleDecl::Every { every_ms: 60000 },
2194            command: Some("echo interval".to_string()),
2195            prompt: None,
2196            enabled: true,
2197            model: None,
2198            allowed_tools: None,
2199            uses_memory: true,
2200            session_target: None,
2201            delivery: None,
2202        };
2203
2204        let mut decls = std::collections::HashMap::new();
2205        decls.insert("interval-job".to_string(), decl);
2206        sync_declarative_jobs(&config, &decls).unwrap();
2207
2208        let job = get_job(&config, "interval-job").unwrap();
2209        assert!(matches!(job.schedule, Schedule::Every { every_ms: 60000 }));
2210        assert_eq!(job.command, "echo interval");
2211    }
2212
2213    #[test]
2214    fn declarative_config_parses_from_toml() {
2215        // Alias-keyed cron map: `[cron.<alias>]` syntax.
2216        let toml_str = r#"
2217[cron.daily-report]
2218name = "Daily Report"
2219job_type = "shell"
2220command = "echo report"
2221schedule = { kind = "cron", expr = "0 9 * * *" }
2222
2223[cron.health-check]
2224job_type = "agent"
2225prompt = "Check server health"
2226schedule = { kind = "every", every_ms = 300000 }
2227        "#;
2228
2229        #[derive(serde::Deserialize)]
2230        struct Wrap {
2231            cron: std::collections::HashMap<String, zeroclaw_config::schema::CronJobDecl>,
2232        }
2233        let parsed: Wrap = toml::from_str(toml_str).unwrap();
2234        assert_eq!(parsed.cron.len(), 2);
2235
2236        let report = parsed.cron.get("daily-report").unwrap();
2237        assert_eq!(report.command.as_deref(), Some("echo report"));
2238        assert!(matches!(
2239            report.schedule,
2240            zeroclaw_config::schema::CronScheduleDecl::Cron { ref expr, .. } if expr == "0 9 * * *"
2241        ));
2242
2243        let health = parsed.cron.get("health-check").unwrap();
2244        assert_eq!(health.job_type, "agent");
2245        assert_eq!(health.prompt.as_deref(), Some("Check server health"));
2246        assert!(matches!(
2247            health.schedule,
2248            zeroclaw_config::schema::CronScheduleDecl::Every { every_ms: 300_000 }
2249        ));
2250    }
2251}