1use std::fs::{self, File, OpenOptions};
9use std::io::{BufRead, BufReader, BufWriter, Write};
10use std::path::Path;
11
12use anyhow::{Context, Result};
13use serde_json::Value;
14
15use crate::event::LogEvent;
16
17pub fn migrate_legacy_jsonl_in_place(path: &Path) -> Result<()> {
19 if !path.exists() {
20 return Ok(());
21 }
22 if file_already_at_current_schema(path)? {
23 return Ok(());
24 }
25
26 let tmp = path.with_extension(format!(
27 "migrate.{}.{}",
28 std::process::id(),
29 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default()
30 ));
31
32 let mut opts = OpenOptions::new();
33 opts.create_new(true).write(true);
34 #[cfg(unix)]
35 {
36 use std::os::unix::fs::OpenOptionsExt;
37 opts.mode(0o600);
38 }
39 let out_file = opts
40 .open(&tmp)
41 .with_context(|| format!("creating migration temp {}", tmp.display()))?;
42 let mut out = BufWriter::new(out_file);
43
44 let in_file =
45 File::open(path).with_context(|| format!("opening log for migrate: {}", path.display()))?;
46 let reader = BufReader::new(in_file);
47 let mut migrated: u64 = 0;
48 let mut kept: u64 = 0;
49
50 for line in reader.lines() {
51 let line = line.context("reading log line during migrate")?;
52 let trimmed = line.trim();
53 if trimmed.is_empty() {
54 continue;
55 }
56 let value: Value = match serde_json::from_str(trimmed) {
57 Ok(v) => v,
58 Err(err) => {
59 tracing::warn!(
60 target: "zeroclaw_log",
61 error = ?err,
62 "log: skipping malformed line during migrate"
63 );
64 continue;
65 }
66 };
67 let migrated_value = if is_legacy_shape(&value) {
68 migrated += 1;
69 convert_legacy_to_current(value)
70 } else {
71 kept += 1;
72 value
73 };
74 serde_json::to_writer(&mut out, &migrated_value).context("writing migrated line")?;
75 out.write_all(b"\n").context("writing migrated newline")?;
76 }
77 out.flush().context("flushing migrated file")?;
78 out.into_inner()
79 .context("taking migrated file out of buf writer")?
80 .sync_data()
81 .context("fsync migrated file")?;
82
83 #[cfg(unix)]
84 {
85 use std::os::unix::fs::PermissionsExt;
86 let _ = fs::set_permissions(&tmp, fs::Permissions::from_mode(0o600));
87 }
88 fs::rename(&tmp, path).with_context(|| {
89 format!(
90 "renaming migration temp {} → {}",
91 tmp.display(),
92 path.display()
93 )
94 })?;
95
96 if migrated > 0 {
97 tracing::info!(
98 target: "zeroclaw_log",
99 migrated,
100 kept,
101 path = %path.display(),
102 "log: migrated legacy schema-1 rows to schema-2"
103 );
104 }
105 Ok(())
106}
107
108fn file_already_at_current_schema(path: &Path) -> Result<bool> {
109 let file = File::open(path)
114 .with_context(|| format!("opening log for schema check: {}", path.display()))?;
115 let reader = BufReader::new(file);
116 for line in reader.lines() {
117 let line = line.context("reading log line for schema check")?;
118 let trimmed = line.trim();
119 if trimmed.is_empty() {
120 continue;
121 }
122 if let Ok(v) = serde_json::from_str::<Value>(trimmed) {
123 return Ok(!is_legacy_shape(&v));
124 }
125 return Ok(false);
127 }
128 Ok(true) }
130
131fn is_legacy_shape(v: &Value) -> bool {
132 let has_legacy = v.get("timestamp").is_some();
133 let has_new = v.get("@timestamp").is_some();
134 has_legacy && !has_new
135}
136
137fn convert_legacy_to_current(legacy: Value) -> Value {
138 let get_str = |key: &str| -> Option<String> {
139 legacy.get(key).and_then(Value::as_str).map(str::to_string)
140 };
141
142 let id = get_str("id").unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
143 let timestamp = get_str("timestamp")
144 .unwrap_or_else(|| chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
145 let event_type = get_str("event_type").unwrap_or_else(|| "legacy".to_string());
146 let success = legacy.get("success").and_then(Value::as_bool);
147 let outcome = match success {
148 Some(true) => "success",
149 Some(false) => "failure",
150 None => "unknown",
151 };
152
153 let mut zeroclaw = serde_json::Map::new();
154 if let Some(agent) = get_str("agent_alias") {
155 zeroclaw.insert("agent_alias".into(), Value::String(agent));
156 }
157 use crate::event::{alias_field, type_field};
158 if let Some(channel) = get_str("channel") {
159 if let Some((ty, alias)) = channel.split_once('.') {
162 zeroclaw.insert("channel".into(), Value::String(channel.clone()));
163 zeroclaw.insert(type_field("channel"), Value::String(ty.to_string()));
164 zeroclaw.insert(alias_field("channel"), Value::String(alias.to_string()));
165 } else {
166 zeroclaw.insert("channel".into(), Value::String(channel.clone()));
167 zeroclaw.insert(type_field("channel"), Value::String(channel));
168 }
169 }
170 if let Some(mp) = get_str("model_provider") {
171 if let Some((ty, alias)) = mp.split_once('.') {
172 zeroclaw.insert("model_provider".into(), Value::String(mp.clone()));
173 zeroclaw.insert(type_field("model_provider"), Value::String(ty.to_string()));
174 zeroclaw.insert(
175 alias_field("model_provider"),
176 Value::String(alias.to_string()),
177 );
178 } else {
179 zeroclaw.insert("model_provider".into(), Value::String(mp.clone()));
180 zeroclaw.insert(type_field("model_provider"), Value::String(mp));
181 }
182 }
183 if let Some(model) = get_str("model") {
184 zeroclaw.insert("model".into(), Value::String(model));
185 }
186
187 let trace_id = get_str("turn_id");
188 let message = get_str("message");
189 let attributes = legacy.get("payload").cloned().unwrap_or(Value::Null);
190
191 let category = category_for_action(&event_type);
194 let severity = if matches!(success, Some(false)) {
195 ("WARN", 13u8)
196 } else {
197 ("INFO", 9u8)
198 };
199
200 serde_json::json!({
201 "id": id,
202 "@timestamp": timestamp,
203 "severity_number": severity.1,
204 "severity_text": severity.0,
205 "event": {
206 "category": category,
207 "action": event_type,
208 "outcome": outcome,
209 },
210 "service": { "name": "zeroclaw", "version": env!("CARGO_PKG_VERSION") },
211 "trace_id": trace_id,
212 "zeroclaw": Value::Object(zeroclaw),
213 "message": message,
214 "attributes": attributes,
215 "schema_version": LogEvent::SCHEMA_VERSION,
216 })
217}
218
219fn category_for_action(action: &str) -> &'static str {
220 match action {
221 "llm_request" | "agent_start" | "agent_end" => "agent",
222 "tool_call" | "tool_call_start" | "tool_call_result" => "tool",
223 "channel_message_inbound" | "channel_send" => "channel",
224 "cron_run" => "cron",
225 "memory_store" | "memory_recall" | "memory_forget" => "memory",
226 "session_open" | "session_close" => "session",
227 "error" => "system",
228 "gateway_ws_turn" => "session",
229 _ => "system",
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use std::io::Write;
237
238 fn write_jsonl(path: &Path, lines: &[&str]) {
239 let mut f = File::create(path).unwrap();
240 for line in lines {
241 f.write_all(line.as_bytes()).unwrap();
242 f.write_all(b"\n").unwrap();
243 }
244 }
245
246 fn read_all_lines(path: &Path) -> Vec<String> {
247 let f = File::open(path).unwrap();
248 BufReader::new(f)
249 .lines()
250 .map(|l| l.unwrap())
251 .filter(|l| !l.trim().is_empty())
252 .collect()
253 }
254
255 #[test]
256 fn migrates_legacy_to_current_shape() {
257 let tmp = tempfile::tempdir().unwrap();
258 let path = tmp.path().join("trace.jsonl");
259 write_jsonl(
260 &path,
261 &[
262 r#"{"id":"id-1","timestamp":"2026-05-15T19:00:00Z","event_type":"llm_request","channel":"discord.clamps","model_provider":"anthropic.clamps","model":"claude-sonnet-4-6","turn_id":"t1","success":true,"agent_alias":"clamps","message":"call","payload":{"tokens":10}}"#,
263 ],
264 );
265
266 migrate_legacy_jsonl_in_place(&path).unwrap();
267
268 let lines = read_all_lines(&path);
269 let v: Value = serde_json::from_str(&lines[0]).unwrap();
270 assert_eq!(v["@timestamp"], "2026-05-15T19:00:00Z");
271 assert!(v.get("timestamp").is_none());
272 assert_eq!(v["event"]["action"], "llm_request");
273 assert_eq!(v["event"]["category"], "agent");
274 assert_eq!(v["event"]["outcome"], "success");
275 assert_eq!(v["zeroclaw"]["agent_alias"], "clamps");
276 assert_eq!(v["zeroclaw"]["channel"], "discord.clamps");
277 assert_eq!(v["zeroclaw"]["channel_type"], "discord");
278 assert_eq!(v["zeroclaw"]["channel_alias"], "clamps");
279 assert_eq!(v["zeroclaw"]["model_provider"], "anthropic.clamps");
280 assert_eq!(v["trace_id"], "t1");
281 assert_eq!(v["attributes"]["tokens"], 10);
282 assert_eq!(v["schema_version"], LogEvent::SCHEMA_VERSION);
283 }
284
285 #[test]
286 fn already_current_is_noop() {
287 let tmp = tempfile::tempdir().unwrap();
288 let path = tmp.path().join("trace.jsonl");
289 let line = r#"{"id":"id","@timestamp":"2026-05-15T19:00:00Z","severity_number":9,"severity_text":"INFO","event":{"category":"agent","action":"x","outcome":"success"},"service":{"name":"zeroclaw","version":"0.7.5"},"zeroclaw":{},"schema_version":2}"#;
290 write_jsonl(&path, &[line]);
291 migrate_legacy_jsonl_in_place(&path).unwrap();
292 let lines = read_all_lines(&path);
293 let v: Value = serde_json::from_str(&lines[0]).unwrap();
294 assert_eq!(v["schema_version"], 2);
295 }
296
297 #[test]
298 fn empty_file_is_noop() {
299 let tmp = tempfile::tempdir().unwrap();
300 let path = tmp.path().join("trace.jsonl");
301 File::create(&path).unwrap();
302 migrate_legacy_jsonl_in_place(&path).unwrap();
303 let lines = read_all_lines(&path);
304 assert!(lines.is_empty());
305 }
306}