Skip to main content

zeroclaw_log/
layer.rs

1//! `tracing-subscriber` Layer that captures `record!` emissions and
2//! `attribution_span!` spans, assembling alias-bound `LogEvent`s and
3//! routing them to JSONL persistence, the broadcast hook, and the
4//! Observer bridge.
5//!
6//! Two recognized span/event shapes:
7//!
8//! 1. `attribution_span!(thing)` — opens a span with `target =
9//!    "zeroclaw_log_internal_attribution"` carrying `zc_role_family`,
10//!    `zc_role_type`, `zc_attribution_field`, `zc_composite_prefix`,
11//!    `zc_default_category`, and `zc_alias`. The layer stashes a
12//!    `ZeroclawAttribution` snapshot in the span's extensions; no
13//!    LogEvent is emitted for the span itself.
14//! 2. `record!(LEVEL, Event::new(...), "msg")` — emits an event with
15//!    `target = "zeroclaw_log_event"` carrying `zc_name`, `zc_action`,
16//!    `zc_outcome`, `zc_category`, `zc_attrs`, `zc_has_duration`,
17//!    `zc_duration_ms`, and `message`. The layer walks the span scope
18//!    leaf→root, merges every attribution snapshot it finds, and
19//!    writes a fully populated `LogEvent`.
20
21use std::fmt::Write;
22
23use serde_json::{Map as JsonMap, Value};
24use tracing::field::{Field, Visit};
25use tracing::span::{Attributes, Record};
26use tracing::{Event, Id, Subscriber};
27use tracing_subscriber::layer::Context;
28use tracing_subscriber::registry::LookupSpan;
29
30use crate::event::{
31    ATTRIBUTION_FIELDS, COMPOSITE_PREFIXES, EventCategory, EventOutcome, LogEvent, Severity,
32    ZeroclawAttribution,
33};
34use crate::writer::record_event;
35
36const TARGET_EVENT: &str = "zeroclaw_log_event";
37const TARGET_ATTRIBUTION_SPAN: &str = "zeroclaw_log_internal_attribution";
38const TARGET_SCOPE_SPAN: &str = "zeroclaw_log_internal_scope";
39const TARGET_SUPPRESS_PREFIX: &str = "zeroclaw_log_internal";
40
41const F_NAME: &str = "zc_name";
42const F_ACTION: &str = "zc_action";
43const F_OUTCOME: &str = "zc_outcome";
44const F_CATEGORY: &str = "zc_category";
45const F_ATTRS: &str = "zc_attrs";
46const F_HAS_DURATION: &str = "zc_has_duration";
47const F_DURATION_MS: &str = "zc_duration_ms";
48const F_FILE: &str = "zc_file";
49const F_LINE: &str = "zc_line";
50const F_MESSAGE: &str = "message";
51
52const F_ROLE_FAMILY: &str = "zc_role_family";
53const F_ROLE_TYPE: &str = "zc_role_type";
54const F_ATTRIB_FIELD: &str = "zc_attribution_field";
55const F_COMPOSITE_PREFIX: &str = "zc_composite_prefix";
56const F_DEFAULT_CATEGORY: &str = "zc_default_category";
57const F_ALIAS: &str = "zc_alias";
58
59pub struct LogCaptureLayer;
60
61impl<S> tracing_subscriber::Layer<S> for LogCaptureLayer
62where
63    S: Subscriber + for<'lookup> LookupSpan<'lookup>,
64{
65    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
66        let target = attrs.metadata().target();
67        let Some(span) = ctx.span(id) else { return };
68        if target == TARGET_ATTRIBUTION_SPAN {
69            let mut v = AttributionSpanCollector::default();
70            attrs.record(&mut v);
71            let mut attribution = ZeroclawAttribution::default();
72            let default_category = v.default_category.as_deref().and_then(EventCategory::parse);
73            v.apply_into(&mut attribution);
74            let mut exts = span.extensions_mut();
75            exts.insert(attribution);
76            if let Some(cat) = default_category {
77                exts.insert(SpanCategory(cat));
78            }
79            return;
80        }
81        if target == TARGET_SCOPE_SPAN {
82            let mut v = ScopeSpanCollector::default();
83            attrs.record(&mut v);
84            v.install(span);
85        }
86    }
87
88    fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
89        let Some(span) = ctx.span(id) else { return };
90        let target = span.metadata().target();
91        if target == TARGET_ATTRIBUTION_SPAN {
92            let mut v = AttributionSpanCollector::default();
93            values.record(&mut v);
94            let mut attribution = ZeroclawAttribution::default();
95            v.apply_into(&mut attribution);
96            let mut exts = span.extensions_mut();
97            if let Some(existing) = exts.get_mut::<ZeroclawAttribution>() {
98                existing.merge_from(&attribution);
99            } else {
100                exts.insert(attribution);
101            }
102            return;
103        }
104        if target == TARGET_SCOPE_SPAN {
105            let mut v = ScopeSpanCollector::default();
106            values.record(&mut v);
107            v.install(span);
108        }
109    }
110
111    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
112        let metadata = event.metadata();
113        let target = metadata.target();
114
115        if target.starts_with(TARGET_SUPPRESS_PREFIX) {
116            return;
117        }
118
119        let severity = Severity::from_tracing_level(*metadata.level());
120
121        // Two emission paths:
122        //   1. `record!` → target == TARGET_EVENT; structured fields.
123        //   2. raw `tracing::*!` outside `record!` → arbitrary fields;
124        //      we treat the message as the entire payload.
125        let mut visitor = EventCollector::default();
126        event.record(&mut visitor);
127
128        let action_str = visitor
129            .action
130            .as_deref()
131            .map(str::to_string)
132            .unwrap_or_else(|| metadata.name().to_string());
133
134        let category = visitor
135            .category
136            .as_deref()
137            .filter(|s| !s.is_empty())
138            .and_then(EventCategory::parse)
139            .or_else(|| {
140                ctx.lookup_current()
141                    .into_iter()
142                    .flat_map(|span| span.scope())
143                    .find_map(|span| span.extensions().get::<SpanCategory>().map(|c| c.0))
144            })
145            .unwrap_or(EventCategory::Internal);
146
147        let name_for_action = visitor
148            .name
149            .as_deref()
150            .unwrap_or(action_str.as_str())
151            .to_string();
152
153        let mut log_event = LogEvent::new(severity, &name_for_action, category);
154
155        if target == TARGET_EVENT {
156            log_event.event.action = action_str;
157        }
158
159        if let Some(outcome) = visitor.outcome.as_deref().and_then(EventOutcome::parse) {
160            log_event.set_outcome(outcome);
161        }
162
163        log_event.message = Some(visitor.message.unwrap_or_default());
164
165        if visitor.has_duration.unwrap_or(false) {
166            log_event.zeroclaw.duration_ms = visitor.duration_ms;
167        }
168
169        if let Some(attrs_json) = visitor.attrs_json
170            && !attrs_json.is_empty()
171            && let Ok(v) = serde_json::from_str::<Value>(&attrs_json)
172        {
173            log_event.attributes = v;
174        }
175        if !visitor.extra.is_empty() {
176            if log_event.attributes.is_null() {
177                log_event.attributes = Value::Object(visitor.extra);
178            } else if let Value::Object(map) = &mut log_event.attributes {
179                for (k, v) in visitor.extra {
180                    map.entry(k).or_insert(v);
181                }
182            }
183        }
184
185        // Attach source location for jump-to-source from log viewers.
186        if visitor.file.is_some() || visitor.line.is_some() {
187            let map = match &mut log_event.attributes {
188                Value::Object(m) => m,
189                _ => {
190                    log_event.attributes = Value::Object(JsonMap::new());
191                    match &mut log_event.attributes {
192                        Value::Object(m) => m,
193                        _ => unreachable!(),
194                    }
195                }
196            };
197            if let Some(f) = visitor.file {
198                map.entry("_file".to_string()).or_insert(Value::String(f));
199            }
200            if let Some(l) = visitor.line {
201                map.entry("_line".to_string()).or_insert(Value::from(l));
202            }
203        }
204
205        // Walk span scope leaf→root, merging every attribution snapshot
206        // and every ScopeExtra stash along the way. Inner spans win
207        // because we merge_from() / entry().or_insert() which fills only
208        // absent keys.
209        if let Some(span_ref) = ctx.lookup_current() {
210            let mut current = Some(span_ref);
211            while let Some(span) = current {
212                let exts = span.extensions();
213                if let Some(parent) = exts.get::<ZeroclawAttribution>() {
214                    log_event.zeroclaw.merge_from(parent);
215                }
216                if let Some(scope_extra) = exts.get::<ScopeExtra>() {
217                    if log_event.attributes.is_null() {
218                        log_event.attributes = Value::Object(scope_extra.extra.clone());
219                    } else if let Value::Object(map) = &mut log_event.attributes {
220                        for (k, v) in &scope_extra.extra {
221                            map.entry(k.clone()).or_insert_with(|| v.clone());
222                        }
223                    }
224                }
225                drop(exts);
226                current = span.parent();
227            }
228        }
229
230        record_event(log_event);
231    }
232}
233
234#[derive(Clone, Copy)]
235struct SpanCategory(EventCategory);
236
237#[derive(Default)]
238struct EventCollector {
239    name: Option<String>,
240    action: Option<String>,
241    outcome: Option<String>,
242    category: Option<String>,
243    attrs_json: Option<String>,
244    has_duration: Option<bool>,
245    duration_ms: Option<u64>,
246    file: Option<String>,
247    line: Option<u64>,
248    message: Option<String>,
249    extra: JsonMap<String, Value>,
250}
251
252impl Visit for EventCollector {
253    fn record_str(&mut self, field: &Field, value: &str) {
254        self.put(field.name(), Value::String(value.to_string()));
255    }
256
257    fn record_bool(&mut self, field: &Field, value: bool) {
258        if field.name() == F_HAS_DURATION {
259            self.has_duration = Some(value);
260            return;
261        }
262        self.put(field.name(), Value::Bool(value));
263    }
264
265    fn record_i64(&mut self, field: &Field, value: i64) {
266        self.put(field.name(), Value::from(value));
267    }
268
269    fn record_u64(&mut self, field: &Field, value: u64) {
270        if field.name() == F_DURATION_MS {
271            self.duration_ms = Some(value);
272            return;
273        }
274        self.put(field.name(), Value::from(value));
275    }
276
277    fn record_f64(&mut self, field: &Field, value: f64) {
278        self.put(
279            field.name(),
280            serde_json::Number::from_f64(value)
281                .map(Value::Number)
282                .unwrap_or(Value::Null),
283        );
284    }
285
286    fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
287        let mut buf = String::new();
288        let _ = write!(&mut buf, "{value}");
289        let mut current = value.source();
290        while let Some(src) = current {
291            let _ = write!(&mut buf, ": {src}");
292            current = src.source();
293        }
294        self.put(field.name(), Value::String(buf));
295    }
296
297    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
298        let mut buf = String::new();
299        let _ = write!(&mut buf, "{value:?}");
300        if field.name() == F_MESSAGE {
301            self.message = Some(strip_outer_quotes(&buf));
302            return;
303        }
304        if field.name() == F_HAS_DURATION {
305            // `%bool` on tracing comes through Display, not Debug, but
306            // guard anyway.
307            self.has_duration = Some(buf == "true");
308            return;
309        }
310        self.put(field.name(), Value::String(buf));
311    }
312}
313
314impl EventCollector {
315    fn put(&mut self, name: &str, value: Value) {
316        match name {
317            F_NAME => {
318                if let Value::String(s) = value {
319                    self.name = Some(s);
320                }
321            }
322            F_ACTION => {
323                if let Value::String(s) = value {
324                    self.action = Some(s);
325                }
326            }
327            F_OUTCOME => {
328                if let Value::String(s) = value {
329                    self.outcome = Some(s);
330                }
331            }
332            F_CATEGORY => {
333                if let Value::String(s) = value {
334                    self.category = Some(s);
335                }
336            }
337            F_ATTRS => {
338                if let Value::String(s) = value {
339                    self.attrs_json = Some(s);
340                }
341            }
342            F_DURATION_MS => {
343                if let Value::Number(n) = &value
344                    && let Some(u) = n.as_u64()
345                {
346                    self.duration_ms = Some(u);
347                } else if let Value::String(s) = &value
348                    && let Ok(u) = s.parse::<u64>()
349                {
350                    self.duration_ms = Some(u);
351                }
352            }
353            F_HAS_DURATION => {
354                if let Value::Bool(b) = value {
355                    self.has_duration = Some(b);
356                } else if let Value::String(s) = value {
357                    self.has_duration = Some(s == "true");
358                }
359            }
360            F_MESSAGE => {
361                if let Value::String(s) = value {
362                    self.message = Some(s);
363                }
364            }
365            F_FILE => {
366                if let Value::String(s) = value {
367                    self.file = Some(s);
368                }
369            }
370            F_LINE => {
371                if let Value::Number(n) = &value
372                    && let Some(u) = n.as_u64()
373                {
374                    self.line = Some(u);
375                } else if let Value::String(s) = &value
376                    && let Ok(u) = s.parse::<u64>()
377                {
378                    self.line = Some(u);
379                }
380            }
381            _ => {
382                self.extra.insert(name.to_string(), value);
383            }
384        }
385    }
386}
387
388#[derive(Default)]
389struct AttributionSpanCollector {
390    role_family: Option<String>,
391    role_type: Option<String>,
392    attribution_field: Option<String>,
393    composite_prefix: Option<String>,
394    default_category: Option<String>,
395    alias: Option<String>,
396}
397
398impl AttributionSpanCollector {
399    fn apply_into(self, attr: &mut ZeroclawAttribution) {
400        let Some(alias) = self.alias.as_deref().filter(|s| !s.is_empty()) else {
401            return;
402        };
403
404        if let Some(prefix) = self.composite_prefix.as_deref().filter(|s| !s.is_empty()) {
405            // Composite role: build `<type>.<alias>` if we have type;
406            // otherwise just the alias as the bare composite value.
407            let ty = self.role_type.as_deref().unwrap_or("");
408            if !ty.is_empty() {
409                attr.set_composite(prefix, &format!("{ty}.{alias}"));
410            } else {
411                attr.set_composite(prefix, alias);
412            }
413        } else if let Some(field) = self.attribution_field.as_deref().filter(|s| !s.is_empty()) {
414            attr.set(field, alias);
415        }
416
417        if let Some(family) = self.role_family.as_deref().filter(|s| !s.is_empty()) {
418            attr.set("zc_role", family);
419        }
420    }
421}
422
423impl Visit for AttributionSpanCollector {
424    fn record_str(&mut self, field: &Field, value: &str) {
425        self.put(field.name(), value);
426    }
427
428    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
429        let mut buf = String::new();
430        let _ = write!(&mut buf, "{value:?}");
431        let trimmed = strip_outer_quotes(&buf);
432        self.put(field.name(), &trimmed);
433    }
434}
435
436impl AttributionSpanCollector {
437    fn put(&mut self, name: &str, value: &str) {
438        match name {
439            F_ROLE_FAMILY => self.role_family = Some(value.to_string()),
440            F_ROLE_TYPE => self.role_type = Some(value.to_string()),
441            F_ATTRIB_FIELD => self.attribution_field = Some(value.to_string()),
442            F_COMPOSITE_PREFIX => self.composite_prefix = Some(value.to_string()),
443            F_DEFAULT_CATEGORY => self.default_category = Some(value.to_string()),
444            F_ALIAS => self.alias = Some(value.to_string()),
445            _ => {}
446        }
447    }
448}
449
450/// Carries ad-hoc per-scope context (sender id, message id, turn id,
451/// etc.) emitted via [`crate::scope!`]. Recognized attribution fields
452/// land in `attribution`; any free-form keys land in `extra`. Both
453/// stashes ride on the span's extensions and are merged onto every
454/// descendant event by the layer's scope walk.
455#[derive(Default)]
456struct ScopeExtra {
457    extra: JsonMap<String, Value>,
458}
459
460#[derive(Default)]
461struct ScopeSpanCollector {
462    category: Option<String>,
463    attribution: ZeroclawAttribution,
464    extra: JsonMap<String, Value>,
465}
466
467impl ScopeSpanCollector {
468    fn install<'a>(
469        self,
470        span: tracing_subscriber::registry::SpanRef<
471            'a,
472            impl Subscriber + for<'lookup> LookupSpan<'lookup>,
473        >,
474    ) {
475        if !self.attribution.fields.is_empty() || self.attribution.duration_ms.is_some() {
476            let mut exts = span.extensions_mut();
477            if let Some(existing) = exts.get_mut::<ZeroclawAttribution>() {
478                existing.merge_from(&self.attribution);
479            } else {
480                exts.insert(self.attribution);
481            }
482        }
483        if !self.extra.is_empty() {
484            let mut exts = span.extensions_mut();
485            if let Some(existing) = exts.get_mut::<ScopeExtra>() {
486                for (k, v) in self.extra {
487                    existing.extra.entry(k).or_insert(v);
488                }
489            } else {
490                exts.insert(ScopeExtra { extra: self.extra });
491            }
492        }
493        if let Some(cat) = self.category.as_deref().and_then(EventCategory::parse) {
494            span.extensions_mut().insert(SpanCategory(cat));
495        }
496    }
497
498    fn put(&mut self, name: &str, value: Value) {
499        if name == "category" {
500            if let Value::String(s) = value {
501                self.category = Some(s);
502            }
503            return;
504        }
505
506        for prefix in COMPOSITE_PREFIXES {
507            if name == *prefix
508                && let Value::String(s) = &value
509            {
510                if s.contains('.') {
511                    self.attribution.set_composite(prefix, s);
512                } else {
513                    self.attribution.set(format!("{prefix}_type"), s.clone());
514                }
515                return;
516            }
517        }
518
519        if ATTRIBUTION_FIELDS.contains(&name)
520            && let Value::String(s) = value
521        {
522            self.attribution.set(name, s);
523            return;
524        }
525
526        self.extra.insert(name.to_string(), value);
527    }
528}
529
530impl Visit for ScopeSpanCollector {
531    fn record_str(&mut self, field: &Field, value: &str) {
532        self.put(field.name(), Value::String(value.to_string()));
533    }
534
535    fn record_bool(&mut self, field: &Field, value: bool) {
536        self.put(field.name(), Value::Bool(value));
537    }
538
539    fn record_i64(&mut self, field: &Field, value: i64) {
540        self.put(field.name(), Value::from(value));
541    }
542
543    fn record_u64(&mut self, field: &Field, value: u64) {
544        if field.name() == "duration_ms" {
545            self.attribution.duration_ms = Some(value);
546            return;
547        }
548        self.put(field.name(), Value::from(value));
549    }
550
551    fn record_f64(&mut self, field: &Field, value: f64) {
552        self.put(
553            field.name(),
554            serde_json::Number::from_f64(value)
555                .map(Value::Number)
556                .unwrap_or(Value::Null),
557        );
558    }
559
560    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
561        let mut buf = String::new();
562        let _ = write!(&mut buf, "{value:?}");
563        self.put(field.name(), Value::String(strip_outer_quotes(&buf)));
564    }
565}
566
567fn strip_outer_quotes(s: &str) -> String {
568    let trimmed = s.trim();
569    if trimmed.starts_with('"') && trimmed.ends_with('"') && trimmed.len() >= 2 {
570        return trimmed[1..trimmed.len() - 1].to_string();
571    }
572    trimmed.to_string()
573}
574
575#[cfg(test)]
576mod e2e_tests {
577    use crate as zeroclaw_log;
578    use crate::{
579        Action, Event, EventOutcome, subscribe_or_install, try_install_capture_subscriber,
580    };
581    use ::zeroclaw_api::attribution::{Attributable, ChannelKind, Role};
582
583    /// Synthetic Attributable test fixture standing in for a real
584    /// channel impl. Keeps the test free of every channel impl's
585    /// transitive deps.
586    struct FakeTelegramChannel {
587        alias: String,
588    }
589
590    impl Attributable for FakeTelegramChannel {
591        fn role(&self) -> Role {
592            Role::Channel(ChannelKind::Telegram)
593        }
594        fn alias(&self) -> &str {
595            &self.alias
596        }
597    }
598
599    static TEST_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
600
601    #[allow(clippy::await_holding_lock)]
602    #[tokio::test]
603    async fn attribution_span_populates_alias_bound_fields() {
604        // Hold both the subscriber lock and the writer lock: this test
605        // fires record! through the global LogCaptureLayer, which forwards
606        // to writer::record_event. Without the writer lock, a concurrent
607        // writer::tests run sees this test's event land in its tempdir.
608        let _subscriber_guard = TEST_LOCK.lock();
609        let _writer_guard = crate::writer::WRITER_TEST_LOCK.lock();
610
611        try_install_capture_subscriber();
612        let mut rx = subscribe_or_install();
613        // Drain any pre-existing buffered events from prior tests.
614        while rx.try_recv().is_ok() {}
615
616        let thing = FakeTelegramChannel {
617            alias: "clamps".into(),
618        };
619
620        {
621            use zeroclaw_log::Instrument;
622            async {
623                zeroclaw_log::record!(
624                    INFO,
625                    Event::new(module_path!(), Action::Note).with_outcome(EventOutcome::Success),
626                    "attribution-span e2e test"
627                );
628            }
629            .instrument(zeroclaw_log::attribution_span!(&thing))
630            .await;
631        }
632
633        // Drain captured events and find ours. `recv` is awaited inside a
634        // deadline so the receiver can recover from `Lagged` errors caused
635        // by other workspace tests firing `record!` into the same global
636        // broadcast hook in parallel; a single Lagged would otherwise abort
637        // the search prematurely.
638        let mut found = false;
639        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
640        while !found && std::time::Instant::now() < deadline {
641            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
642            let step = remaining.min(std::time::Duration::from_millis(50));
643            match tokio::time::timeout(step, rx.recv()).await {
644                Ok(Ok(value)) => {
645                    if value
646                        .get("message")
647                        .and_then(|v| v.as_str())
648                        .map(|s| s.contains("attribution-span e2e test"))
649                        .unwrap_or(false)
650                    {
651                        let zc = value.get("zeroclaw").expect("zeroclaw block present");
652                        assert_eq!(
653                            zc.get("channel").and_then(|v| v.as_str()),
654                            Some("telegram.clamps"),
655                            "expected channel composite, got: {zc:?}"
656                        );
657                        assert_eq!(
658                            zc.get("channel_type").and_then(|v| v.as_str()),
659                            Some("telegram"),
660                        );
661                        assert_eq!(
662                            zc.get("channel_alias").and_then(|v| v.as_str()),
663                            Some("clamps"),
664                        );
665                        found = true;
666                    }
667                }
668                Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => {}
669                Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => break,
670                Err(_elapsed) => {}
671            }
672        }
673        assert!(
674            found,
675            "did not find the test event with attribution-span fields",
676        );
677
678        // Clean up so subsequent parallel tests aren't affected.
679        crate::clear_broadcast_hook();
680    }
681}