Skip to main content

zeroclaw_log/
reader.rs

1//! Paginated stream reader for the JSONL log file.
2//!
3//! RAM contract: at any moment, in-memory state is bounded by `limit`
4//! (the number of events the caller asked for) plus a single-line read
5//! buffer. We do NOT slurp the whole file into a `String`.
6//!
7//! The pagination model is cursor-by-timestamp + cursor-by-id. Callers
8//! pass `until_ts` to ask for "events strictly older than this timestamp
9//! (or older with the same timestamp by id ordering)". Returning page
10//! includes `next_cursor` which is the oldest event's `(timestamp, id)`
11//! pair — callers use that to ask for the next page.
12//!
13//! Filters apply lazily: the reader scans backwards from EOF, decoding
14//! each line, applying the filter predicate, and stopping when it has
15//! collected `limit` matches or exhausted the file. Worst case for tight
16//! filters: the whole file is scanned. Best case (no filter): only
17//! `limit` lines decoded.
18
19use std::collections::{BTreeMap, VecDeque};
20use std::fs::File;
21use std::io::{BufRead, BufReader};
22use std::path::{Path, PathBuf};
23
24use anyhow::{Context, Result};
25use serde::{Deserialize, Serialize};
26
27use crate::event::LogEvent;
28
29/// Filter parameters for [`load_page`]. Each field is independent; an
30/// event must match ALL provided constraints to be included.
31///
32/// Per-attribution-field equality filters live in [`Self::field_eq`]:
33/// keys are any `zeroclaw.*` attribution name (e.g. `"agent_alias"`,
34/// `"channel"`, `"channel_type"`, `"risk_profile"`, `"model_provider"`).
35/// Adding a new attribution field anywhere in the schema requires no
36/// changes here — the filter looks it up dynamically.
37#[derive(Debug, Clone, Default)]
38pub struct LogFilter {
39    /// RFC 3339 lower bound (inclusive).
40    pub since_ts: Option<String>,
41    /// RFC 3339 upper bound (exclusive — used by pagination cursor).
42    pub until_ts: Option<String>,
43    /// Match against the cursor's id when `until_ts` ties.
44    pub until_id: Option<String>,
45    /// Match exact event.action (case-insensitive).
46    pub action: Option<String>,
47    /// Match exact event.category (case-insensitive).
48    pub category: Option<String>,
49    /// Match exact event.outcome (case-insensitive).
50    pub outcome: Option<String>,
51    /// Minimum severity_number.
52    pub severity_min: Option<u8>,
53    /// Match exact trace_id.
54    pub trace_id: Option<String>,
55    /// Substring search across message + attributes.
56    pub q: Option<String>,
57    /// Hide events with event.category == "internal" by default.
58    pub hide_internal: bool,
59    /// Per-attribution-field exact-match constraints. Key is any
60    /// `zeroclaw.*` attribution name. Empty map = no attribution filter.
61    pub field_eq: BTreeMap<String, String>,
62}
63
64/// One page returned by [`load_page`].
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct LogPage {
67    pub events: Vec<LogEvent>,
68    /// `Some((timestamp, id))` when more older events may exist; pass to
69    /// the next call as `(until_ts, until_id)`.
70    pub next_cursor: Option<(String, String)>,
71    /// True when the file was fully scanned. UI uses this to disable
72    /// "load older" affordances.
73    pub at_end: bool,
74}
75
76/// Load one page of events. Newest first.
77///
78/// Implementation: we open the file, read it line by line into a fixed
79/// in-memory buffer (capped at `limit` matched events). To preserve the
80/// "newest first" order without reading from the tail, we accumulate
81/// matched events into a `VecDeque`, keeping the cap = `limit`, popping
82/// the front when overflowed. Final result is reversed in place. That
83/// gives us a one-pass, single-allocation-bounded reader without needing
84/// `mmap` or reverse-byte-stream gymnastics.
85pub fn load_page(path: &Path, filter: &LogFilter, limit: usize) -> Result<LogPage> {
86    let limit = limit.clamp(1, 10_000);
87
88    if !path.exists() {
89        return Ok(LogPage {
90            events: Vec::new(),
91            next_cursor: None,
92            at_end: true,
93        });
94    }
95
96    let file = File::open(path).with_context(|| format!("opening log: {}", path.display()))?;
97    let reader = BufReader::new(file);
98
99    let mut window: VecDeque<LogEvent> = VecDeque::with_capacity(limit + 1);
100    let needle = filter.q.as_deref().map(|s| s.to_ascii_lowercase());
101    // `dropped_older` records whether we ever pushed past `limit` and
102    // had to evict the oldest matching event. If false at the end, every
103    // matching event in the file is in `window` — meaning there are no
104    // older results the caller could page back to.
105    let mut dropped_older = false;
106
107    for line in reader.lines() {
108        let line = line.context("reading log line")?;
109        let trimmed = line.trim();
110        if trimmed.is_empty() {
111            continue;
112        }
113
114        let event: LogEvent = match serde_json::from_str(trimmed) {
115            Ok(event) => event,
116            Err(err) => {
117                tracing::trace!(
118                    target: "zeroclaw_log",
119                    error = ?err,
120                    "log: skipping malformed JSONL line"
121                );
122                continue;
123            }
124        };
125
126        if !matches_filter(&event, filter, needle.as_deref()) {
127            continue;
128        }
129
130        window.push_back(event);
131        if window.len() > limit {
132            window.pop_front();
133            dropped_older = true;
134        }
135    }
136
137    let mut events: Vec<LogEvent> = window.into_iter().collect();
138    // Reverse so newest is first.
139    events.reverse();
140
141    // next_cursor is the OLDEST event in the page (the last one in
142    // newest-first ordering = events.last()). Caller uses it as
143    // `until_ts` / `until_id` for the next "load older" request.
144    let next_cursor = events.last().map(|e| (e.timestamp.clone(), e.id.clone()));
145
146    // We've reached the tail of the matched set when no older matching
147    // events were ever discarded during the scan.
148    let at_end = !dropped_older;
149
150    Ok(LogPage {
151        events,
152        next_cursor,
153        at_end,
154    })
155}
156
157fn matches_filter(event: &LogEvent, filter: &LogFilter, needle: Option<&str>) -> bool {
158    if filter.hide_internal && event.event.category == "internal" {
159        return false;
160    }
161    if let Some(ref since) = filter.since_ts
162        && event.timestamp.as_str() < since.as_str()
163    {
164        return false;
165    }
166    if let Some(ref until) = filter.until_ts {
167        // Cursor pagination: include events strictly older than the
168        // cursor. If the timestamps tie, fall back to id ordering for
169        // deterministic pagination.
170        match event.timestamp.as_str().cmp(until.as_str()) {
171            std::cmp::Ordering::Greater => return false,
172            std::cmp::Ordering::Equal => {
173                if let Some(ref until_id) = filter.until_id
174                    && event.id.as_str() >= until_id.as_str()
175                {
176                    return false;
177                }
178            }
179            std::cmp::Ordering::Less => {}
180        }
181    }
182    if let Some(ref action) = filter.action
183        && !event.event.action.eq_ignore_ascii_case(action)
184    {
185        return false;
186    }
187    if let Some(ref category) = filter.category
188        && !event.event.category.eq_ignore_ascii_case(category)
189    {
190        return false;
191    }
192    if let Some(ref outcome) = filter.outcome
193        && !event.event.outcome.eq_ignore_ascii_case(outcome)
194    {
195        return false;
196    }
197    if let Some(min) = filter.severity_min
198        && event.severity_number < min
199    {
200        return false;
201    }
202    for (key, want) in &filter.field_eq {
203        if event.zeroclaw.get(key) != Some(want.as_str()) {
204            return false;
205        }
206    }
207    if let Some(ref tid) = filter.trace_id
208        && event.trace_id.as_deref() != Some(tid.as_str())
209    {
210        return false;
211    }
212    if let Some(n) = needle {
213        let hay_msg = event.message.as_deref().unwrap_or("").to_ascii_lowercase();
214        let hay_attrs = event.attributes.to_string().to_ascii_lowercase();
215        if !hay_msg.contains(n) && !hay_attrs.contains(n) {
216            return false;
217        }
218    }
219    true
220}
221
222/// Find a single event by id. Scans the file backwards from the end.
223pub fn find_event_by_id(path: &Path, id: &str) -> Result<Option<LogEvent>> {
224    if !path.exists() {
225        return Ok(None);
226    }
227    let file = File::open(path).with_context(|| format!("opening log: {}", path.display()))?;
228    let reader = BufReader::new(file);
229    let mut found: Option<LogEvent> = None;
230    for line in reader.lines() {
231        let line = line.context("reading log line")?;
232        let trimmed = line.trim();
233        if trimmed.is_empty() {
234            continue;
235        }
236        if let Ok(event) = serde_json::from_str::<LogEvent>(trimmed)
237            && event.id == id
238        {
239            found = Some(event); // Don't break — last write wins for duplicate ids.
240        }
241    }
242    Ok(found)
243}
244
245/// Helper for the gateway: the path the writer is configured to use.
246#[must_use]
247pub fn current_log_path() -> Option<PathBuf> {
248    crate::writer::runtime_trace_path()
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use crate::event::{EventCategory, Severity};
255    use std::io::Write;
256
257    fn write_jsonl(path: &Path, events: &[LogEvent]) {
258        let mut file = std::fs::File::create(path).unwrap();
259        for event in events {
260            let line = serde_json::to_string(event).unwrap();
261            file.write_all(line.as_bytes()).unwrap();
262            file.write_all(b"\n").unwrap();
263        }
264    }
265
266    fn make_event(action: &str, agent: Option<&str>) -> LogEvent {
267        let mut event = LogEvent::new(Severity::Info, action, EventCategory::Agent);
268        if let Some(alias) = agent {
269            event.zeroclaw.set("agent_alias", alias);
270        }
271        event
272    }
273
274    #[test]
275    fn empty_file_returns_at_end() {
276        let tmp = tempfile::tempdir().unwrap();
277        let path = tmp.path().join("trace.jsonl");
278        let page = load_page(&path, &LogFilter::default(), 10).unwrap();
279        assert!(page.events.is_empty());
280        assert!(page.at_end);
281    }
282
283    #[test]
284    fn returns_newest_first_within_limit() {
285        let tmp = tempfile::tempdir().unwrap();
286        let path = tmp.path().join("trace.jsonl");
287        let mut events = Vec::new();
288        for index in 0..5 {
289            let mut event = make_event("test", None);
290            // Force monotonically increasing timestamp.
291            event.timestamp = format!("2026-05-15T19:00:0{index}.000Z");
292            event.message = Some(format!("event-{index}"));
293            events.push(event);
294        }
295        write_jsonl(&path, &events);
296
297        let page = load_page(&path, &LogFilter::default(), 3).unwrap();
298        assert_eq!(page.events.len(), 3);
299        assert_eq!(page.events[0].message.as_deref(), Some("event-4"));
300        assert_eq!(page.events[1].message.as_deref(), Some("event-3"));
301        assert_eq!(page.events[2].message.as_deref(), Some("event-2"));
302        assert!(!page.at_end);
303    }
304
305    #[test]
306    fn filter_by_agent() {
307        let tmp = tempfile::tempdir().unwrap();
308        let path = tmp.path().join("trace.jsonl");
309        let events = vec![
310            make_event("a", Some("clamps")),
311            make_event("b", Some("glados")),
312            make_event("c", Some("clamps")),
313        ];
314        write_jsonl(&path, &events);
315
316        let mut field_eq = BTreeMap::new();
317        field_eq.insert("agent_alias".into(), "clamps".into());
318        let filter = LogFilter {
319            field_eq,
320            ..Default::default()
321        };
322        let page = load_page(&path, &filter, 10).unwrap();
323        assert_eq!(page.events.len(), 2);
324    }
325
326    #[test]
327    fn hide_internal_drops_internal_category() {
328        let tmp = tempfile::tempdir().unwrap();
329        let path = tmp.path().join("trace.jsonl");
330        let mut agent_event = make_event("a", None);
331        agent_event.event.category = "agent".into();
332        let mut internal_event = make_event("b", None);
333        internal_event.event.category = "internal".into();
334        write_jsonl(&path, &[agent_event, internal_event]);
335
336        let filter = LogFilter {
337            hide_internal: true,
338            ..Default::default()
339        };
340        let page = load_page(&path, &filter, 10).unwrap();
341        assert_eq!(page.events.len(), 1);
342        assert_eq!(page.events[0].event.action, "a");
343    }
344
345    #[test]
346    fn substring_query_matches_message_and_attributes() {
347        let tmp = tempfile::tempdir().unwrap();
348        let path = tmp.path().join("trace.jsonl");
349        let mut with_alpha_message = make_event("a", None);
350        with_alpha_message.message = Some("alpha bravo".into());
351        let mut with_attr_payload = make_event("b", None);
352        with_attr_payload.attributes = serde_json::json!({ "k": "delta echo" });
353        let mut with_foxtrot_message = make_event("c", None);
354        with_foxtrot_message.message = Some("foxtrot".into());
355        write_jsonl(
356            &path,
357            &[with_alpha_message, with_attr_payload, with_foxtrot_message],
358        );
359
360        let filter = LogFilter {
361            q: Some("bravo".into()),
362            ..Default::default()
363        };
364        let page = load_page(&path, &filter, 10).unwrap();
365        assert_eq!(page.events.len(), 1);
366        assert_eq!(page.events[0].event.action, "a");
367
368        let attr_filter = LogFilter {
369            q: Some("delta".into()),
370            ..Default::default()
371        };
372        let attr_page = load_page(&path, &attr_filter, 10).unwrap();
373        assert_eq!(attr_page.events.len(), 1);
374        assert_eq!(attr_page.events[0].event.action, "b");
375    }
376
377    #[test]
378    fn cursor_pagination_returns_older_pages() {
379        let tmp = tempfile::tempdir().unwrap();
380        let path = tmp.path().join("trace.jsonl");
381        let mut events = Vec::new();
382        for index in 0..6 {
383            let mut event = make_event("test", None);
384            event.timestamp = format!("2026-05-15T19:00:0{index}.000Z");
385            event.message = Some(format!("event-{index}"));
386            events.push(event);
387        }
388        write_jsonl(&path, &events);
389
390        let first_page = load_page(&path, &LogFilter::default(), 3).unwrap();
391        assert_eq!(first_page.events[0].message.as_deref(), Some("event-5"));
392        let (cursor_ts, cursor_id) = first_page.next_cursor.unwrap();
393
394        let older_filter = LogFilter {
395            until_ts: Some(cursor_ts),
396            until_id: Some(cursor_id),
397            ..Default::default()
398        };
399        let older_page = load_page(&path, &older_filter, 3).unwrap();
400        assert_eq!(older_page.events[0].message.as_deref(), Some("event-2"));
401        assert_eq!(older_page.events[1].message.as_deref(), Some("event-1"));
402        assert_eq!(older_page.events[2].message.as_deref(), Some("event-0"));
403        assert!(older_page.at_end);
404    }
405}