Skip to main content

zeroclaw_log/
writer.rs

1//! JSONL append-only writer + rolling rotation.
2//!
3//! RAM contract: a single event lands in two allocations (the JSON line
4//! that goes to disk + the `serde_json::Value` clone that goes to the
5//! broadcast hook). Rolling rotation streams through `BufReader::lines`
6//! into a temp file rather than slurping the whole file into a `String`.
7
8use std::fs::{self, OpenOptions};
9use std::io::{BufRead, BufReader, BufWriter, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, OnceLock};
12
13use crate::broadcast::current_broadcast_hook;
14use crate::config::{LogConfig, ResolvedPolicy, StoragePolicy};
15use crate::event::LogEvent;
16use crate::migrate;
17use crate::observer_bridge;
18use anyhow::{Context, Result};
19use parking_lot::Mutex;
20use serde_json::Value;
21
22struct WriterState {
23    policy: ResolvedPolicy,
24    write_lock: Mutex<()>,
25}
26
27static WRITER: OnceLock<parking_lot::RwLock<Option<Arc<WriterState>>>> = OnceLock::new();
28
29fn slot() -> &'static parking_lot::RwLock<Option<Arc<WriterState>>> {
30    WRITER.get_or_init(|| parking_lot::RwLock::new(None))
31}
32
33fn current_state() -> Option<Arc<WriterState>> {
34    slot().read().clone()
35}
36
37/// Initialize (or disable) the persistence writer from config. Idempotent.
38/// When enabled, runs a streaming in-place migration of any schema-1 rows
39/// in the existing file before resuming appends.
40pub fn init_from_config(config: &LogConfig, workspace_dir: &Path) {
41    let policy = ResolvedPolicy::from_config(config, workspace_dir);
42
43    if policy.storage.is_enabled()
44        && policy.path.exists()
45        && let Err(err) = migrate::migrate_legacy_jsonl_in_place(&policy.path)
46    {
47        tracing::warn!(
48            target: "zeroclaw_log",
49            error = ?err,
50            path = %policy.path.display(),
51            "log: legacy JSONL migration failed; daemon continuing with mixed-shape file"
52        );
53    }
54
55    let state = Arc::new(WriterState {
56        policy,
57        write_lock: Mutex::new(()),
58    });
59    *slot().write() = Some(state);
60}
61
62/// Public accessor for the canonical log file path. Used by the gateway's
63/// `/api/logs` endpoint to know which file to stream.
64pub fn runtime_trace_path() -> Option<PathBuf> {
65    current_state().map(|s| s.policy.path.clone())
66}
67
68/// Emit one event. Always fans out to the broadcast hook + tracing event.
69/// If persistence is enabled, also appends a JSON line to disk.
70///
71/// This is the function the `record!` macro expands into. Direct callers
72/// (the schema migration tool, tests) can invoke it too, but production
73/// code should go through the macro so the `tracing::event!` carries the
74/// correct `file:line` source info.
75pub fn record_event(event: LogEvent) {
76    let value = match serde_json::to_value(&event) {
77        Ok(v) => v,
78        Err(err) => {
79            tracing::warn!(
80                target: "zeroclaw_log_internal",
81                error = ?err,
82                "log: event serialization failed"
83            );
84            return;
85        }
86    };
87
88    observer_bridge::forward(&event);
89
90    if let Some(hook) = current_broadcast_hook() {
91        let _ = hook.send(value.clone());
92    }
93
94    let Some(state) = current_state() else {
95        return;
96    };
97    if !state.policy.storage.is_enabled() {
98        return;
99    }
100
101    if let Err(err) = append_line(&state, &value) {
102        tracing::warn!(
103            target: "zeroclaw_log_internal",
104            error = ?err,
105            path = %state.policy.path.display(),
106            "log: append failed",
107        );
108    }
109}
110
111fn append_line(state: &Arc<WriterState>, value: &Value) -> Result<()> {
112    let _guard = state.write_lock.lock();
113
114    if let Some(parent) = state.policy.path.parent() {
115        fs::create_dir_all(parent)
116            .with_context(|| format!("creating log directory {}", parent.display()))?;
117    }
118
119    let mut options = OpenOptions::new();
120    options.create(true).append(true);
121
122    #[cfg(unix)]
123    {
124        use std::os::unix::fs::OpenOptionsExt;
125        options.mode(0o600);
126    }
127
128    let file = options
129        .open(&state.policy.path)
130        .with_context(|| format!("opening log file {}", state.policy.path.display()))?;
131    let mut writer = BufWriter::new(file);
132    serde_json::to_writer(&mut writer, value).context("serializing log line")?;
133    writer.write_all(b"\n").context("writing newline")?;
134    writer.flush().context("flushing log line")?;
135    let file = writer
136        .into_inner()
137        .context("taking log file out of buf writer")?;
138    file.sync_data().context("fsync log line")?;
139
140    #[cfg(unix)]
141    {
142        use std::os::unix::fs::PermissionsExt;
143        let _ = fs::set_permissions(&state.policy.path, fs::Permissions::from_mode(0o600));
144    }
145
146    if state.policy.storage == StoragePolicy::Rolling {
147        trim_to_last_entries(state)?;
148    }
149
150    Ok(())
151}
152
153/// Rolling trim. Streams the file line-by-line into a temp file, keeping
154/// the last `max_entries` lines, then atomically renames. Never loads the
155/// whole file into memory.
156fn trim_to_last_entries(state: &Arc<WriterState>) -> Result<()> {
157    // Count lines first (cheap pass).
158    let total = count_nonempty_lines(&state.policy.path)?;
159    if total <= state.policy.max_entries {
160        return Ok(());
161    }
162    let skip = total - state.policy.max_entries;
163
164    let tmp = state.policy.path.with_extension(format!(
165        "tmp.{}.{}",
166        std::process::id(),
167        chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
168    ));
169
170    {
171        let mut opts = OpenOptions::new();
172        opts.create_new(true).write(true);
173        #[cfg(unix)]
174        {
175            use std::os::unix::fs::OpenOptionsExt;
176            opts.mode(0o600);
177        }
178        let out_file = opts
179            .open(&tmp)
180            .with_context(|| format!("creating trim temp file {}", tmp.display()))?;
181        let mut out = BufWriter::new(out_file);
182
183        let in_file = fs::File::open(&state.policy.path)
184            .with_context(|| format!("opening log for trim: {}", state.policy.path.display()))?;
185        let reader = BufReader::new(in_file);
186
187        let mut index: usize = 0;
188        for line in reader.lines() {
189            let line = line.context("reading log line during trim")?;
190            if line.trim().is_empty() {
191                continue;
192            }
193            if index >= skip {
194                out.write_all(line.as_bytes())
195                    .context("writing trim line")?;
196                out.write_all(b"\n").context("writing trim newline")?;
197            }
198            index += 1;
199        }
200        out.flush().context("flushing trim file")?;
201        out.into_inner()
202            .context("taking trim file out of buf writer")?
203            .sync_data()
204            .context("fsync trim file")?;
205    }
206
207    #[cfg(unix)]
208    {
209        use std::os::unix::fs::PermissionsExt;
210        let _ = fs::set_permissions(&tmp, fs::Permissions::from_mode(0o600));
211    }
212    fs::rename(&tmp, &state.policy.path).with_context(|| {
213        format!(
214            "renaming trim temp {} → {}",
215            tmp.display(),
216            state.policy.path.display()
217        )
218    })?;
219
220    Ok(())
221}
222
223fn count_nonempty_lines(path: &Path) -> Result<usize> {
224    let file = fs::File::open(path)
225        .with_context(|| format!("opening log to count lines: {}", path.display()))?;
226    let reader = BufReader::new(file);
227    let mut n = 0usize;
228    for line in reader.lines() {
229        let line = line.context("reading log line for count")?;
230        if !line.trim().is_empty() {
231            n += 1;
232        }
233    }
234    Ok(n)
235}
236
237/// Shared test-time mutex for tests that mutate the global writer state.
238/// Re-exported `pub(crate)` so `macro::tests` etc. can serialize against
239/// the same lock as `writer::tests`.
240#[cfg(test)]
241pub(crate) static WRITER_TEST_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::event::{EventCategory, Severity};
247
248    fn install_writer(dir: &Path, max_entries: usize) {
249        let cfg = LogConfig {
250            log_persistence: "rolling".into(),
251            log_persistence_max_entries: max_entries,
252            ..LogConfig::default()
253        };
254        init_from_config(&cfg, dir);
255    }
256
257    #[test]
258    fn append_and_rolling_keeps_only_max_entries() {
259        let _guard = WRITER_TEST_LOCK.lock();
260        let tmp = tempfile::tempdir().unwrap();
261        install_writer(tmp.path(), 3);
262
263        for i in 0..10 {
264            let mut ev = LogEvent::new(Severity::Info, "test", EventCategory::Agent);
265            ev.message = Some(format!("event-{i}"));
266            record_event(ev);
267        }
268
269        let path = runtime_trace_path().unwrap();
270        let contents = fs::read_to_string(&path).unwrap();
271        let lines: Vec<&str> = contents.lines().filter(|l| !l.trim().is_empty()).collect();
272        assert_eq!(lines.len(), 3);
273        // Last three should be 7, 8, 9 (oldest to newest order preserved).
274        for (idx, &line) in lines.iter().enumerate() {
275            let v: Value = serde_json::from_str(line).unwrap();
276            assert_eq!(v["message"].as_str().unwrap(), format!("event-{}", idx + 7));
277        }
278    }
279
280    #[test]
281    fn disabled_storage_does_not_write_file() {
282        let _guard = WRITER_TEST_LOCK.lock();
283        let tmp = tempfile::tempdir().unwrap();
284        let cfg = LogConfig {
285            log_persistence: "none".into(),
286            ..LogConfig::default()
287        };
288        init_from_config(&cfg, tmp.path());
289
290        let event = LogEvent::new(Severity::Info, "test", EventCategory::Agent);
291        record_event(event);
292
293        let path = runtime_trace_path().unwrap();
294        assert!(
295            !path.exists(),
296            "no file should exist when storage is disabled"
297        );
298    }
299}