1use std::fs::{self, OpenOptions};
9use std::io::{BufRead, BufReader, BufWriter, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, OnceLock};
12
13use crate::broadcast::current_broadcast_hook;
14use crate::config::{LogConfig, ResolvedPolicy, StoragePolicy};
15use crate::event::LogEvent;
16use crate::migrate;
17use crate::observer_bridge;
18use anyhow::{Context, Result};
19use parking_lot::Mutex;
20use serde_json::Value;
21
22struct WriterState {
23 policy: ResolvedPolicy,
24 write_lock: Mutex<()>,
25}
26
27static WRITER: OnceLock<parking_lot::RwLock<Option<Arc<WriterState>>>> = OnceLock::new();
28
29fn slot() -> &'static parking_lot::RwLock<Option<Arc<WriterState>>> {
30 WRITER.get_or_init(|| parking_lot::RwLock::new(None))
31}
32
33fn current_state() -> Option<Arc<WriterState>> {
34 slot().read().clone()
35}
36
37pub fn init_from_config(config: &LogConfig, workspace_dir: &Path) {
41 let policy = ResolvedPolicy::from_config(config, workspace_dir);
42
43 if policy.storage.is_enabled()
44 && policy.path.exists()
45 && let Err(err) = migrate::migrate_legacy_jsonl_in_place(&policy.path)
46 {
47 tracing::warn!(
48 target: "zeroclaw_log",
49 error = ?err,
50 path = %policy.path.display(),
51 "log: legacy JSONL migration failed; daemon continuing with mixed-shape file"
52 );
53 }
54
55 let state = Arc::new(WriterState {
56 policy,
57 write_lock: Mutex::new(()),
58 });
59 *slot().write() = Some(state);
60}
61
62pub fn runtime_trace_path() -> Option<PathBuf> {
65 current_state().map(|s| s.policy.path.clone())
66}
67
68pub fn record_event(event: LogEvent) {
76 let value = match serde_json::to_value(&event) {
77 Ok(v) => v,
78 Err(err) => {
79 tracing::warn!(
80 target: "zeroclaw_log_internal",
81 error = ?err,
82 "log: event serialization failed"
83 );
84 return;
85 }
86 };
87
88 observer_bridge::forward(&event);
89
90 if let Some(hook) = current_broadcast_hook() {
91 let _ = hook.send(value.clone());
92 }
93
94 let Some(state) = current_state() else {
95 return;
96 };
97 if !state.policy.storage.is_enabled() {
98 return;
99 }
100
101 if let Err(err) = append_line(&state, &value) {
102 tracing::warn!(
103 target: "zeroclaw_log_internal",
104 error = ?err,
105 path = %state.policy.path.display(),
106 "log: append failed",
107 );
108 }
109}
110
111fn append_line(state: &Arc<WriterState>, value: &Value) -> Result<()> {
112 let _guard = state.write_lock.lock();
113
114 if let Some(parent) = state.policy.path.parent() {
115 fs::create_dir_all(parent)
116 .with_context(|| format!("creating log directory {}", parent.display()))?;
117 }
118
119 let mut options = OpenOptions::new();
120 options.create(true).append(true);
121
122 #[cfg(unix)]
123 {
124 use std::os::unix::fs::OpenOptionsExt;
125 options.mode(0o600);
126 }
127
128 let file = options
129 .open(&state.policy.path)
130 .with_context(|| format!("opening log file {}", state.policy.path.display()))?;
131 let mut writer = BufWriter::new(file);
132 serde_json::to_writer(&mut writer, value).context("serializing log line")?;
133 writer.write_all(b"\n").context("writing newline")?;
134 writer.flush().context("flushing log line")?;
135 let file = writer
136 .into_inner()
137 .context("taking log file out of buf writer")?;
138 file.sync_data().context("fsync log line")?;
139
140 #[cfg(unix)]
141 {
142 use std::os::unix::fs::PermissionsExt;
143 let _ = fs::set_permissions(&state.policy.path, fs::Permissions::from_mode(0o600));
144 }
145
146 if state.policy.storage == StoragePolicy::Rolling {
147 trim_to_last_entries(state)?;
148 }
149
150 Ok(())
151}
152
153fn trim_to_last_entries(state: &Arc<WriterState>) -> Result<()> {
157 let total = count_nonempty_lines(&state.policy.path)?;
159 if total <= state.policy.max_entries {
160 return Ok(());
161 }
162 let skip = total - state.policy.max_entries;
163
164 let tmp = state.policy.path.with_extension(format!(
165 "tmp.{}.{}",
166 std::process::id(),
167 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
168 ));
169
170 {
171 let mut opts = OpenOptions::new();
172 opts.create_new(true).write(true);
173 #[cfg(unix)]
174 {
175 use std::os::unix::fs::OpenOptionsExt;
176 opts.mode(0o600);
177 }
178 let out_file = opts
179 .open(&tmp)
180 .with_context(|| format!("creating trim temp file {}", tmp.display()))?;
181 let mut out = BufWriter::new(out_file);
182
183 let in_file = fs::File::open(&state.policy.path)
184 .with_context(|| format!("opening log for trim: {}", state.policy.path.display()))?;
185 let reader = BufReader::new(in_file);
186
187 let mut index: usize = 0;
188 for line in reader.lines() {
189 let line = line.context("reading log line during trim")?;
190 if line.trim().is_empty() {
191 continue;
192 }
193 if index >= skip {
194 out.write_all(line.as_bytes())
195 .context("writing trim line")?;
196 out.write_all(b"\n").context("writing trim newline")?;
197 }
198 index += 1;
199 }
200 out.flush().context("flushing trim file")?;
201 out.into_inner()
202 .context("taking trim file out of buf writer")?
203 .sync_data()
204 .context("fsync trim file")?;
205 }
206
207 #[cfg(unix)]
208 {
209 use std::os::unix::fs::PermissionsExt;
210 let _ = fs::set_permissions(&tmp, fs::Permissions::from_mode(0o600));
211 }
212 fs::rename(&tmp, &state.policy.path).with_context(|| {
213 format!(
214 "renaming trim temp {} → {}",
215 tmp.display(),
216 state.policy.path.display()
217 )
218 })?;
219
220 Ok(())
221}
222
223fn count_nonempty_lines(path: &Path) -> Result<usize> {
224 let file = fs::File::open(path)
225 .with_context(|| format!("opening log to count lines: {}", path.display()))?;
226 let reader = BufReader::new(file);
227 let mut n = 0usize;
228 for line in reader.lines() {
229 let line = line.context("reading log line for count")?;
230 if !line.trim().is_empty() {
231 n += 1;
232 }
233 }
234 Ok(n)
235}
236
237#[cfg(test)]
241pub(crate) static WRITER_TEST_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::event::{EventCategory, Severity};
247
248 fn install_writer(dir: &Path, max_entries: usize) {
249 let cfg = LogConfig {
250 log_persistence: "rolling".into(),
251 log_persistence_max_entries: max_entries,
252 ..LogConfig::default()
253 };
254 init_from_config(&cfg, dir);
255 }
256
257 #[test]
258 fn append_and_rolling_keeps_only_max_entries() {
259 let _guard = WRITER_TEST_LOCK.lock();
260 let tmp = tempfile::tempdir().unwrap();
261 install_writer(tmp.path(), 3);
262
263 for i in 0..10 {
264 let mut ev = LogEvent::new(Severity::Info, "test", EventCategory::Agent);
265 ev.message = Some(format!("event-{i}"));
266 record_event(ev);
267 }
268
269 let path = runtime_trace_path().unwrap();
270 let contents = fs::read_to_string(&path).unwrap();
271 let lines: Vec<&str> = contents.lines().filter(|l| !l.trim().is_empty()).collect();
272 assert_eq!(lines.len(), 3);
273 for (idx, &line) in lines.iter().enumerate() {
275 let v: Value = serde_json::from_str(line).unwrap();
276 assert_eq!(v["message"].as_str().unwrap(), format!("event-{}", idx + 7));
277 }
278 }
279
280 #[test]
281 fn disabled_storage_does_not_write_file() {
282 let _guard = WRITER_TEST_LOCK.lock();
283 let tmp = tempfile::tempdir().unwrap();
284 let cfg = LogConfig {
285 log_persistence: "none".into(),
286 ..LogConfig::default()
287 };
288 init_from_config(&cfg, tmp.path());
289
290 let event = LogEvent::new(Severity::Info, "test", EventCategory::Agent);
291 record_event(event);
292
293 let path = runtime_trace_path().unwrap();
294 assert!(
295 !path.exists(),
296 "no file should exist when storage is disabled"
297 );
298 }
299}