Skip to main content

zeroclaw_log/
migrate.rs

1//! One-shot, streaming, in-place migration from schema_version 1 rows
2//! to schema_version 2.
3//!
4//! RAM contract: pure streaming. Read one line, parse, convert, write
5//! one line to a temp file. Bounded by a single line's allocation
6//! regardless of file size. Atomic rename at the end.
7
8use std::fs::{self, File, OpenOptions};
9use std::io::{BufRead, BufReader, BufWriter, Write};
10use std::path::Path;
11
12use anyhow::{Context, Result};
13use serde_json::Value;
14
15use crate::event::LogEvent;
16
17/// Detect-and-migrate. No-op when the file is already at schema_version 2.
18pub fn migrate_legacy_jsonl_in_place(path: &Path) -> Result<()> {
19    if !path.exists() {
20        return Ok(());
21    }
22    if file_already_at_current_schema(path)? {
23        return Ok(());
24    }
25
26    let tmp = path.with_extension(format!(
27        "migrate.{}.{}",
28        std::process::id(),
29        chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default()
30    ));
31
32    let mut opts = OpenOptions::new();
33    opts.create_new(true).write(true);
34    #[cfg(unix)]
35    {
36        use std::os::unix::fs::OpenOptionsExt;
37        opts.mode(0o600);
38    }
39    let out_file = opts
40        .open(&tmp)
41        .with_context(|| format!("creating migration temp {}", tmp.display()))?;
42    let mut out = BufWriter::new(out_file);
43
44    let in_file =
45        File::open(path).with_context(|| format!("opening log for migrate: {}", path.display()))?;
46    let reader = BufReader::new(in_file);
47    let mut migrated: u64 = 0;
48    let mut kept: u64 = 0;
49
50    for line in reader.lines() {
51        let line = line.context("reading log line during migrate")?;
52        let trimmed = line.trim();
53        if trimmed.is_empty() {
54            continue;
55        }
56        let value: Value = match serde_json::from_str(trimmed) {
57            Ok(v) => v,
58            Err(err) => {
59                tracing::warn!(
60                    target: "zeroclaw_log",
61                    error = ?err,
62                    "log: skipping malformed line during migrate"
63                );
64                continue;
65            }
66        };
67        let migrated_value = if is_legacy_shape(&value) {
68            migrated += 1;
69            convert_legacy_to_current(value)
70        } else {
71            kept += 1;
72            value
73        };
74        serde_json::to_writer(&mut out, &migrated_value).context("writing migrated line")?;
75        out.write_all(b"\n").context("writing migrated newline")?;
76    }
77    out.flush().context("flushing migrated file")?;
78    out.into_inner()
79        .context("taking migrated file out of buf writer")?
80        .sync_data()
81        .context("fsync migrated file")?;
82
83    #[cfg(unix)]
84    {
85        use std::os::unix::fs::PermissionsExt;
86        let _ = fs::set_permissions(&tmp, fs::Permissions::from_mode(0o600));
87    }
88    fs::rename(&tmp, path).with_context(|| {
89        format!(
90            "renaming migration temp {} → {}",
91            tmp.display(),
92            path.display()
93        )
94    })?;
95
96    if migrated > 0 {
97        tracing::info!(
98            target: "zeroclaw_log",
99            migrated,
100            kept,
101            path = %path.display(),
102            "log: migrated legacy schema-1 rows to schema-2"
103        );
104    }
105    Ok(())
106}
107
108fn file_already_at_current_schema(path: &Path) -> Result<bool> {
109    // Sample the LAST few lines: a file that's been written by the new
110    // writer (or migrated previously) will have current-shape rows at
111    // the tail. Streaming the tail (without rev-iterating) is annoying;
112    // a forward scan of just the first non-empty line is a cheap heuristic.
113    let file = File::open(path)
114        .with_context(|| format!("opening log for schema check: {}", path.display()))?;
115    let reader = BufReader::new(file);
116    for line in reader.lines() {
117        let line = line.context("reading log line for schema check")?;
118        let trimmed = line.trim();
119        if trimmed.is_empty() {
120            continue;
121        }
122        if let Ok(v) = serde_json::from_str::<Value>(trimmed) {
123            return Ok(!is_legacy_shape(&v));
124        }
125        // Malformed first line: assume legacy (migration is best-effort).
126        return Ok(false);
127    }
128    Ok(true) // empty file — nothing to migrate
129}
130
131fn is_legacy_shape(v: &Value) -> bool {
132    let has_legacy = v.get("timestamp").is_some();
133    let has_new = v.get("@timestamp").is_some();
134    has_legacy && !has_new
135}
136
137fn convert_legacy_to_current(legacy: Value) -> Value {
138    let get_str = |key: &str| -> Option<String> {
139        legacy.get(key).and_then(Value::as_str).map(str::to_string)
140    };
141
142    let id = get_str("id").unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
143    let timestamp = get_str("timestamp")
144        .unwrap_or_else(|| chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
145    let event_type = get_str("event_type").unwrap_or_else(|| "legacy".to_string());
146    let success = legacy.get("success").and_then(Value::as_bool);
147    let outcome = match success {
148        Some(true) => "success",
149        Some(false) => "failure",
150        None => "unknown",
151    };
152
153    let mut zeroclaw = serde_json::Map::new();
154    if let Some(agent) = get_str("agent_alias") {
155        zeroclaw.insert("agent_alias".into(), Value::String(agent));
156    }
157    use crate::event::{alias_field, type_field};
158    if let Some(channel) = get_str("channel") {
159        // Legacy "channel" might be bare type or composite. If it
160        // contains `.`, treat as composite and split.
161        if let Some((ty, alias)) = channel.split_once('.') {
162            zeroclaw.insert("channel".into(), Value::String(channel.clone()));
163            zeroclaw.insert(type_field("channel"), Value::String(ty.to_string()));
164            zeroclaw.insert(alias_field("channel"), Value::String(alias.to_string()));
165        } else {
166            zeroclaw.insert("channel".into(), Value::String(channel.clone()));
167            zeroclaw.insert(type_field("channel"), Value::String(channel));
168        }
169    }
170    if let Some(mp) = get_str("model_provider") {
171        if let Some((ty, alias)) = mp.split_once('.') {
172            zeroclaw.insert("model_provider".into(), Value::String(mp.clone()));
173            zeroclaw.insert(type_field("model_provider"), Value::String(ty.to_string()));
174            zeroclaw.insert(
175                alias_field("model_provider"),
176                Value::String(alias.to_string()),
177            );
178        } else {
179            zeroclaw.insert("model_provider".into(), Value::String(mp.clone()));
180            zeroclaw.insert(type_field("model_provider"), Value::String(mp));
181        }
182    }
183    if let Some(model) = get_str("model") {
184        zeroclaw.insert("model".into(), Value::String(model));
185    }
186
187    let trace_id = get_str("turn_id");
188    let message = get_str("message");
189    let attributes = legacy.get("payload").cloned().unwrap_or(Value::Null);
190
191    // Map event_type → category heuristically. Unknown types fall under
192    // "system".
193    let category = category_for_action(&event_type);
194    let severity = if matches!(success, Some(false)) {
195        ("WARN", 13u8)
196    } else {
197        ("INFO", 9u8)
198    };
199
200    serde_json::json!({
201        "id": id,
202        "@timestamp": timestamp,
203        "severity_number": severity.1,
204        "severity_text": severity.0,
205        "event": {
206            "category": category,
207            "action": event_type,
208            "outcome": outcome,
209        },
210        "service": { "name": "zeroclaw", "version": env!("CARGO_PKG_VERSION") },
211        "trace_id": trace_id,
212        "zeroclaw": Value::Object(zeroclaw),
213        "message": message,
214        "attributes": attributes,
215        "schema_version": LogEvent::SCHEMA_VERSION,
216    })
217}
218
219fn category_for_action(action: &str) -> &'static str {
220    match action {
221        "llm_request" | "agent_start" | "agent_end" => "agent",
222        "tool_call" | "tool_call_start" | "tool_call_result" => "tool",
223        "channel_message_inbound" | "channel_send" => "channel",
224        "cron_run" => "cron",
225        "memory_store" | "memory_recall" | "memory_forget" => "memory",
226        "session_open" | "session_close" => "session",
227        "error" => "system",
228        "gateway_ws_turn" => "session",
229        _ => "system",
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use std::io::Write;
237
238    fn write_jsonl(path: &Path, lines: &[&str]) {
239        let mut f = File::create(path).unwrap();
240        for line in lines {
241            f.write_all(line.as_bytes()).unwrap();
242            f.write_all(b"\n").unwrap();
243        }
244    }
245
246    fn read_all_lines(path: &Path) -> Vec<String> {
247        let f = File::open(path).unwrap();
248        BufReader::new(f)
249            .lines()
250            .map(|l| l.unwrap())
251            .filter(|l| !l.trim().is_empty())
252            .collect()
253    }
254
255    #[test]
256    fn migrates_legacy_to_current_shape() {
257        let tmp = tempfile::tempdir().unwrap();
258        let path = tmp.path().join("trace.jsonl");
259        write_jsonl(
260            &path,
261            &[
262                r#"{"id":"id-1","timestamp":"2026-05-15T19:00:00Z","event_type":"llm_request","channel":"discord.clamps","model_provider":"anthropic.clamps","model":"claude-sonnet-4-6","turn_id":"t1","success":true,"agent_alias":"clamps","message":"call","payload":{"tokens":10}}"#,
263            ],
264        );
265
266        migrate_legacy_jsonl_in_place(&path).unwrap();
267
268        let lines = read_all_lines(&path);
269        let v: Value = serde_json::from_str(&lines[0]).unwrap();
270        assert_eq!(v["@timestamp"], "2026-05-15T19:00:00Z");
271        assert!(v.get("timestamp").is_none());
272        assert_eq!(v["event"]["action"], "llm_request");
273        assert_eq!(v["event"]["category"], "agent");
274        assert_eq!(v["event"]["outcome"], "success");
275        assert_eq!(v["zeroclaw"]["agent_alias"], "clamps");
276        assert_eq!(v["zeroclaw"]["channel"], "discord.clamps");
277        assert_eq!(v["zeroclaw"]["channel_type"], "discord");
278        assert_eq!(v["zeroclaw"]["channel_alias"], "clamps");
279        assert_eq!(v["zeroclaw"]["model_provider"], "anthropic.clamps");
280        assert_eq!(v["trace_id"], "t1");
281        assert_eq!(v["attributes"]["tokens"], 10);
282        assert_eq!(v["schema_version"], LogEvent::SCHEMA_VERSION);
283    }
284
285    #[test]
286    fn already_current_is_noop() {
287        let tmp = tempfile::tempdir().unwrap();
288        let path = tmp.path().join("trace.jsonl");
289        let line = r#"{"id":"id","@timestamp":"2026-05-15T19:00:00Z","severity_number":9,"severity_text":"INFO","event":{"category":"agent","action":"x","outcome":"success"},"service":{"name":"zeroclaw","version":"0.7.5"},"zeroclaw":{},"schema_version":2}"#;
290        write_jsonl(&path, &[line]);
291        migrate_legacy_jsonl_in_place(&path).unwrap();
292        let lines = read_all_lines(&path);
293        let v: Value = serde_json::from_str(&lines[0]).unwrap();
294        assert_eq!(v["schema_version"], 2);
295    }
296
297    #[test]
298    fn empty_file_is_noop() {
299        let tmp = tempfile::tempdir().unwrap();
300        let path = tmp.path().join("trace.jsonl");
301        File::create(&path).unwrap();
302        migrate_legacy_jsonl_in_place(&path).unwrap();
303        let lines = read_all_lines(&path);
304        assert!(lines.is_empty());
305    }
306}