zeroclaw_runtime/heartbeat/
store.rs1use anyhow::{Context, Result};
7use chrono::{DateTime, Utc};
8use rusqlite::{Connection, params};
9use std::path::{Path, PathBuf};
10
11const MAX_OUTPUT_BYTES: usize = 16 * 1024;
12const TRUNCATED_MARKER: &str = "\n...[truncated]";
13
14#[derive(Debug, Clone)]
16pub struct HeartbeatRun {
17 pub id: i64,
18 pub task_text: String,
19 pub task_priority: String,
20 pub started_at: DateTime<Utc>,
21 pub finished_at: DateTime<Utc>,
22 pub status: String, pub output: Option<String>,
24 pub duration_ms: i64,
25}
26
27pub fn record_run(
29 workspace_dir: &Path,
30 task_text: &str,
31 task_priority: &str,
32 started_at: DateTime<Utc>,
33 finished_at: DateTime<Utc>,
34 status: &str,
35 output: Option<&str>,
36 duration_ms: i64,
37 max_history: u32,
38) -> Result<()> {
39 let bounded_output = output.map(truncate_output);
40 with_connection(workspace_dir, |conn| {
41 let tx = conn.unchecked_transaction()?;
42
43 tx.execute(
44 "INSERT INTO heartbeat_runs
45 (task_text, task_priority, started_at, finished_at, status, output, duration_ms)
46 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
47 params![
48 task_text,
49 task_priority,
50 started_at.to_rfc3339(),
51 finished_at.to_rfc3339(),
52 status,
53 bounded_output.as_deref(),
54 duration_ms,
55 ],
56 )
57 .context("Failed to insert heartbeat run")?;
58
59 let keep = i64::from(max_history.max(1));
60 tx.execute(
61 "DELETE FROM heartbeat_runs
62 WHERE id NOT IN (
63 SELECT id FROM heartbeat_runs
64 ORDER BY started_at DESC, id DESC
65 LIMIT ?1
66 )",
67 params![keep],
68 )
69 .context("Failed to prune heartbeat run history")?;
70
71 tx.commit()
72 .context("Failed to commit heartbeat run transaction")?;
73 Ok(())
74 })
75}
76
77pub fn list_runs(workspace_dir: &Path, limit: usize) -> Result<Vec<HeartbeatRun>> {
79 with_connection(workspace_dir, |conn| {
80 let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
81 let mut stmt = conn.prepare(
82 "SELECT id, task_text, task_priority, started_at, finished_at, status, output, duration_ms
83 FROM heartbeat_runs
84 ORDER BY started_at DESC, id DESC
85 LIMIT ?1",
86 )?;
87
88 let rows = stmt.query_map(params![lim], |row| {
89 Ok(HeartbeatRun {
90 id: row.get(0)?,
91 task_text: row.get(1)?,
92 task_priority: row.get(2)?,
93 started_at: parse_rfc3339(&row.get::<_, String>(3)?).map_err(sql_err)?,
94 finished_at: parse_rfc3339(&row.get::<_, String>(4)?).map_err(sql_err)?,
95 status: row.get(5)?,
96 output: row.get(6)?,
97 duration_ms: row.get(7)?,
98 })
99 })?;
100
101 let mut runs = Vec::new();
102 for row in rows {
103 runs.push(row?);
104 }
105 Ok(runs)
106 })
107}
108
109pub fn run_stats(workspace_dir: &Path) -> Result<(u64, u64, u64)> {
111 with_connection(workspace_dir, |conn| {
112 let total: i64 = conn.query_row("SELECT COUNT(*) FROM heartbeat_runs", [], |r| r.get(0))?;
113 let ok: i64 = conn.query_row(
114 "SELECT COUNT(*) FROM heartbeat_runs WHERE status = 'ok'",
115 [],
116 |r| r.get(0),
117 )?;
118 let err: i64 = conn.query_row(
119 "SELECT COUNT(*) FROM heartbeat_runs WHERE status = 'error'",
120 [],
121 |r| r.get(0),
122 )?;
123 #[allow(clippy::cast_sign_loss)]
124 Ok((total as u64, ok as u64, err as u64))
125 })
126}
127
128fn db_path(workspace_dir: &Path) -> PathBuf {
129 workspace_dir.join("heartbeat").join("history.db")
130}
131
132fn with_connection<T>(workspace_dir: &Path, f: impl FnOnce(&Connection) -> Result<T>) -> Result<T> {
133 let path = db_path(workspace_dir);
134 if let Some(parent) = path.parent() {
135 std::fs::create_dir_all(parent).with_context(|| {
136 format!(
137 "Failed to create heartbeat directory: {}",
138 parent.display().to_string()
139 )
140 })?;
141 }
142
143 let conn = Connection::open(&path).with_context(|| {
144 format!(
145 "Failed to open heartbeat history DB: {}",
146 path.display().to_string()
147 )
148 })?;
149
150 conn.execute_batch(
151 "PRAGMA journal_mode = WAL;
152 PRAGMA synchronous = NORMAL;
153 PRAGMA temp_store = MEMORY;
154
155 CREATE TABLE IF NOT EXISTS heartbeat_runs (
156 id INTEGER PRIMARY KEY AUTOINCREMENT,
157 task_text TEXT NOT NULL,
158 task_priority TEXT NOT NULL,
159 started_at TEXT NOT NULL,
160 finished_at TEXT NOT NULL,
161 status TEXT NOT NULL,
162 output TEXT,
163 duration_ms INTEGER
164 );
165 CREATE INDEX IF NOT EXISTS idx_hb_runs_started ON heartbeat_runs(started_at);
166 CREATE INDEX IF NOT EXISTS idx_hb_runs_task ON heartbeat_runs(task_text);",
167 )
168 .context("Failed to initialize heartbeat history schema")?;
169
170 f(&conn)
171}
172
173fn truncate_output(output: &str) -> String {
174 if output.len() <= MAX_OUTPUT_BYTES {
175 return output.to_string();
176 }
177
178 if MAX_OUTPUT_BYTES <= TRUNCATED_MARKER.len() {
179 return TRUNCATED_MARKER.to_string();
180 }
181
182 let mut cutoff = MAX_OUTPUT_BYTES - TRUNCATED_MARKER.len();
183 while cutoff > 0 && !output.is_char_boundary(cutoff) {
184 cutoff -= 1;
185 }
186
187 let mut truncated = output[..cutoff].to_string();
188 truncated.push_str(TRUNCATED_MARKER);
189 truncated
190}
191
192fn parse_rfc3339(raw: &str) -> Result<DateTime<Utc>> {
193 let parsed = DateTime::parse_from_rfc3339(raw)
194 .with_context(|| format!("Invalid RFC3339 timestamp in heartbeat DB: {raw}"))?;
195 Ok(parsed.with_timezone(&Utc))
196}
197
198fn sql_err(err: anyhow::Error) -> rusqlite::Error {
199 rusqlite::Error::ToSqlConversionFailure(err.into())
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use chrono::Duration as ChronoDuration;
206 use tempfile::TempDir;
207
208 #[test]
209 fn record_and_list_runs() {
210 let tmp = TempDir::new().unwrap();
211 let base = Utc::now();
212
213 for i in 0..3 {
214 let start = base + ChronoDuration::seconds(i);
215 let end = start + ChronoDuration::milliseconds(100);
216 record_run(
217 tmp.path(),
218 &format!("Task {i}"),
219 "medium",
220 start,
221 end,
222 "ok",
223 Some("done"),
224 100,
225 50,
226 )
227 .unwrap();
228 }
229
230 let runs = list_runs(tmp.path(), 10).unwrap();
231 assert_eq!(runs.len(), 3);
232 assert!(runs[0].task_text.contains('2'));
234 }
235
236 #[test]
237 fn prunes_old_runs() {
238 let tmp = TempDir::new().unwrap();
239 let base = Utc::now();
240
241 for i in 0..5 {
242 let start = base + ChronoDuration::seconds(i);
243 let end = start + ChronoDuration::milliseconds(50);
244 record_run(
245 tmp.path(),
246 "Task",
247 "high",
248 start,
249 end,
250 "ok",
251 None,
252 50,
253 2, )
255 .unwrap();
256 }
257
258 let runs = list_runs(tmp.path(), 10).unwrap();
259 assert_eq!(runs.len(), 2);
260 }
261
262 #[test]
263 fn run_stats_counts_correctly() {
264 let tmp = TempDir::new().unwrap();
265 let now = Utc::now();
266
267 record_run(tmp.path(), "A", "high", now, now, "ok", None, 10, 50).unwrap();
268 record_run(
269 tmp.path(),
270 "B",
271 "low",
272 now,
273 now,
274 "error",
275 Some("fail"),
276 20,
277 50,
278 )
279 .unwrap();
280 record_run(tmp.path(), "C", "medium", now, now, "ok", None, 15, 50).unwrap();
281
282 let (total, ok, err) = run_stats(tmp.path()).unwrap();
283 assert_eq!(total, 3);
284 assert_eq!(ok, 2);
285 assert_eq!(err, 1);
286 }
287
288 #[test]
289 fn truncates_large_output() {
290 let tmp = TempDir::new().unwrap();
291 let now = Utc::now();
292 let big = "x".repeat(MAX_OUTPUT_BYTES + 512);
293
294 record_run(
295 tmp.path(),
296 "T",
297 "medium",
298 now,
299 now,
300 "ok",
301 Some(&big),
302 10,
303 50,
304 )
305 .unwrap();
306
307 let runs = list_runs(tmp.path(), 1).unwrap();
308 let stored = runs[0].output.as_deref().unwrap_or_default();
309 assert!(stored.ends_with(TRUNCATED_MARKER));
310 assert!(stored.len() <= MAX_OUTPUT_BYTES);
311 }
312}