Skip to main content

zeroclaw_runtime/heartbeat/
store.rs

1//! SQLite persistence for heartbeat task execution history.
2//!
3//! Mirrors the `cron/store.rs` pattern: fresh connection per call, schema
4//! auto-created, output truncated, history pruned to a configurable limit.
5
6use anyhow::{Context, Result};
7use chrono::{DateTime, Utc};
8use rusqlite::{Connection, params};
9use std::path::{Path, PathBuf};
10
11const MAX_OUTPUT_BYTES: usize = 16 * 1024;
12const TRUNCATED_MARKER: &str = "\n...[truncated]";
13
14/// A single heartbeat task execution record.
15#[derive(Debug, Clone)]
16pub struct HeartbeatRun {
17    pub id: i64,
18    pub task_text: String,
19    pub task_priority: String,
20    pub started_at: DateTime<Utc>,
21    pub finished_at: DateTime<Utc>,
22    pub status: String, // "ok" or "error"
23    pub output: Option<String>,
24    pub duration_ms: i64,
25}
26
27/// Record a heartbeat task execution and prune old entries.
28pub fn record_run(
29    workspace_dir: &Path,
30    task_text: &str,
31    task_priority: &str,
32    started_at: DateTime<Utc>,
33    finished_at: DateTime<Utc>,
34    status: &str,
35    output: Option<&str>,
36    duration_ms: i64,
37    max_history: u32,
38) -> Result<()> {
39    let bounded_output = output.map(truncate_output);
40    with_connection(workspace_dir, |conn| {
41        let tx = conn.unchecked_transaction()?;
42
43        tx.execute(
44            "INSERT INTO heartbeat_runs
45                (task_text, task_priority, started_at, finished_at, status, output, duration_ms)
46             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
47            params![
48                task_text,
49                task_priority,
50                started_at.to_rfc3339(),
51                finished_at.to_rfc3339(),
52                status,
53                bounded_output.as_deref(),
54                duration_ms,
55            ],
56        )
57        .context("Failed to insert heartbeat run")?;
58
59        let keep = i64::from(max_history.max(1));
60        tx.execute(
61            "DELETE FROM heartbeat_runs
62             WHERE id NOT IN (
63                 SELECT id FROM heartbeat_runs
64                 ORDER BY started_at DESC, id DESC
65                 LIMIT ?1
66             )",
67            params![keep],
68        )
69        .context("Failed to prune heartbeat run history")?;
70
71        tx.commit()
72            .context("Failed to commit heartbeat run transaction")?;
73        Ok(())
74    })
75}
76
77/// List the most recent heartbeat runs.
78pub fn list_runs(workspace_dir: &Path, limit: usize) -> Result<Vec<HeartbeatRun>> {
79    with_connection(workspace_dir, |conn| {
80        let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
81        let mut stmt = conn.prepare(
82            "SELECT id, task_text, task_priority, started_at, finished_at, status, output, duration_ms
83             FROM heartbeat_runs
84             ORDER BY started_at DESC, id DESC
85             LIMIT ?1",
86        )?;
87
88        let rows = stmt.query_map(params![lim], |row| {
89            Ok(HeartbeatRun {
90                id: row.get(0)?,
91                task_text: row.get(1)?,
92                task_priority: row.get(2)?,
93                started_at: parse_rfc3339(&row.get::<_, String>(3)?).map_err(sql_err)?,
94                finished_at: parse_rfc3339(&row.get::<_, String>(4)?).map_err(sql_err)?,
95                status: row.get(5)?,
96                output: row.get(6)?,
97                duration_ms: row.get(7)?,
98            })
99        })?;
100
101        let mut runs = Vec::new();
102        for row in rows {
103            runs.push(row?);
104        }
105        Ok(runs)
106    })
107}
108
109/// Get aggregate stats: (total_runs, total_ok, total_error).
110pub fn run_stats(workspace_dir: &Path) -> Result<(u64, u64, u64)> {
111    with_connection(workspace_dir, |conn| {
112        let total: i64 = conn.query_row("SELECT COUNT(*) FROM heartbeat_runs", [], |r| r.get(0))?;
113        let ok: i64 = conn.query_row(
114            "SELECT COUNT(*) FROM heartbeat_runs WHERE status = 'ok'",
115            [],
116            |r| r.get(0),
117        )?;
118        let err: i64 = conn.query_row(
119            "SELECT COUNT(*) FROM heartbeat_runs WHERE status = 'error'",
120            [],
121            |r| r.get(0),
122        )?;
123        #[allow(clippy::cast_sign_loss)]
124        Ok((total as u64, ok as u64, err as u64))
125    })
126}
127
128fn db_path(workspace_dir: &Path) -> PathBuf {
129    workspace_dir.join("heartbeat").join("history.db")
130}
131
132fn with_connection<T>(workspace_dir: &Path, f: impl FnOnce(&Connection) -> Result<T>) -> Result<T> {
133    let path = db_path(workspace_dir);
134    if let Some(parent) = path.parent() {
135        std::fs::create_dir_all(parent).with_context(|| {
136            format!(
137                "Failed to create heartbeat directory: {}",
138                parent.display().to_string()
139            )
140        })?;
141    }
142
143    let conn = Connection::open(&path).with_context(|| {
144        format!(
145            "Failed to open heartbeat history DB: {}",
146            path.display().to_string()
147        )
148    })?;
149
150    conn.execute_batch(
151        "PRAGMA journal_mode = WAL;
152         PRAGMA synchronous = NORMAL;
153         PRAGMA temp_store = MEMORY;
154
155         CREATE TABLE IF NOT EXISTS heartbeat_runs (
156            id             INTEGER PRIMARY KEY AUTOINCREMENT,
157            task_text      TEXT NOT NULL,
158            task_priority  TEXT NOT NULL,
159            started_at     TEXT NOT NULL,
160            finished_at    TEXT NOT NULL,
161            status         TEXT NOT NULL,
162            output         TEXT,
163            duration_ms    INTEGER
164         );
165         CREATE INDEX IF NOT EXISTS idx_hb_runs_started ON heartbeat_runs(started_at);
166         CREATE INDEX IF NOT EXISTS idx_hb_runs_task ON heartbeat_runs(task_text);",
167    )
168    .context("Failed to initialize heartbeat history schema")?;
169
170    f(&conn)
171}
172
173fn truncate_output(output: &str) -> String {
174    if output.len() <= MAX_OUTPUT_BYTES {
175        return output.to_string();
176    }
177
178    if MAX_OUTPUT_BYTES <= TRUNCATED_MARKER.len() {
179        return TRUNCATED_MARKER.to_string();
180    }
181
182    let mut cutoff = MAX_OUTPUT_BYTES - TRUNCATED_MARKER.len();
183    while cutoff > 0 && !output.is_char_boundary(cutoff) {
184        cutoff -= 1;
185    }
186
187    let mut truncated = output[..cutoff].to_string();
188    truncated.push_str(TRUNCATED_MARKER);
189    truncated
190}
191
192fn parse_rfc3339(raw: &str) -> Result<DateTime<Utc>> {
193    let parsed = DateTime::parse_from_rfc3339(raw)
194        .with_context(|| format!("Invalid RFC3339 timestamp in heartbeat DB: {raw}"))?;
195    Ok(parsed.with_timezone(&Utc))
196}
197
198fn sql_err(err: anyhow::Error) -> rusqlite::Error {
199    rusqlite::Error::ToSqlConversionFailure(err.into())
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use chrono::Duration as ChronoDuration;
206    use tempfile::TempDir;
207
208    #[test]
209    fn record_and_list_runs() {
210        let tmp = TempDir::new().unwrap();
211        let base = Utc::now();
212
213        for i in 0..3 {
214            let start = base + ChronoDuration::seconds(i);
215            let end = start + ChronoDuration::milliseconds(100);
216            record_run(
217                tmp.path(),
218                &format!("Task {i}"),
219                "medium",
220                start,
221                end,
222                "ok",
223                Some("done"),
224                100,
225                50,
226            )
227            .unwrap();
228        }
229
230        let runs = list_runs(tmp.path(), 10).unwrap();
231        assert_eq!(runs.len(), 3);
232        // Most recent first
233        assert!(runs[0].task_text.contains('2'));
234    }
235
236    #[test]
237    fn prunes_old_runs() {
238        let tmp = TempDir::new().unwrap();
239        let base = Utc::now();
240
241        for i in 0..5 {
242            let start = base + ChronoDuration::seconds(i);
243            let end = start + ChronoDuration::milliseconds(50);
244            record_run(
245                tmp.path(),
246                "Task",
247                "high",
248                start,
249                end,
250                "ok",
251                None,
252                50,
253                2, // keep only 2
254            )
255            .unwrap();
256        }
257
258        let runs = list_runs(tmp.path(), 10).unwrap();
259        assert_eq!(runs.len(), 2);
260    }
261
262    #[test]
263    fn run_stats_counts_correctly() {
264        let tmp = TempDir::new().unwrap();
265        let now = Utc::now();
266
267        record_run(tmp.path(), "A", "high", now, now, "ok", None, 10, 50).unwrap();
268        record_run(
269            tmp.path(),
270            "B",
271            "low",
272            now,
273            now,
274            "error",
275            Some("fail"),
276            20,
277            50,
278        )
279        .unwrap();
280        record_run(tmp.path(), "C", "medium", now, now, "ok", None, 15, 50).unwrap();
281
282        let (total, ok, err) = run_stats(tmp.path()).unwrap();
283        assert_eq!(total, 3);
284        assert_eq!(ok, 2);
285        assert_eq!(err, 1);
286    }
287
288    #[test]
289    fn truncates_large_output() {
290        let tmp = TempDir::new().unwrap();
291        let now = Utc::now();
292        let big = "x".repeat(MAX_OUTPUT_BYTES + 512);
293
294        record_run(
295            tmp.path(),
296            "T",
297            "medium",
298            now,
299            now,
300            "ok",
301            Some(&big),
302            10,
303            50,
304        )
305        .unwrap();
306
307        let runs = list_runs(tmp.path(), 1).unwrap();
308        let stored = runs[0].output.as_deref().unwrap_or_default();
309        assert!(stored.ends_with(TRUNCATED_MARKER));
310        assert!(stored.len() <= MAX_OUTPUT_BYTES);
311    }
312}