Skip to main content

zeroclaw_channels/orchestrator/
media_pipeline.rs

1//! Automatic media understanding pipeline for inbound channel messages.
2//!
3//! Pre-processes media attachments (audio, images, video) before the agent sees
4//! the message, enriching the text with human-readable annotations:
5//!
6//! - **Audio**: transcribed via the existing [`super::transcription`] infrastructure,
7//!   prepended as `[Audio transcription: ...]`.
8//! - **Images**: when a vision-capable model_provider is active, described as `[Image: <description>]`.
9//!   Falls back to `[Image: attached]` when vision is unavailable.
10//! - **Video**: summarised as `[Video summary: ...]` when an API is available,
11//!   otherwise `[Video: attached]`.
12//!
13//! The pipeline is **opt-in** via `[media_pipeline] enabled = true` in config.
14
15use base64::{Engine as _, engine::general_purpose::STANDARD};
16use zeroclaw_config::schema::MediaPipelineConfig;
17
18use super::super::transcription::TranscriptionManager;
19
20// Re-export media types from zeroclaw-types for backwards compatibility.
21pub use zeroclaw_api::media::{MediaAttachment, MediaKind};
22
23/// The media understanding pipeline.
24///
25/// Consumes a message's text and attachments, returning enriched text with
26/// media annotations prepended.
27pub struct MediaPipeline<'a> {
28    config: &'a MediaPipelineConfig,
29    transcription_manager: Option<&'a TranscriptionManager>,
30    vision_available: bool,
31}
32
33impl<'a> MediaPipeline<'a> {
34    /// Create a new pipeline. `vision_available` indicates whether the current
35    /// model provider supports vision (image description). `transcription_manager`
36    /// is `None` when transcription is disabled at the channel level — audio
37    /// attachments fall back to `[Audio: attached]` annotations.
38    pub fn new(
39        config: &'a MediaPipelineConfig,
40        transcription_manager: Option<&'a TranscriptionManager>,
41        vision_available: bool,
42    ) -> Self {
43        Self {
44            config,
45            transcription_manager,
46            vision_available,
47        }
48    }
49
50    /// Process a message's attachments and return enriched text.
51    ///
52    /// If the pipeline is disabled via config, returns `original_text` unchanged.
53    pub async fn process(&self, original_text: &str, attachments: &[MediaAttachment]) -> String {
54        if !self.config.enabled || attachments.is_empty() {
55            return original_text.to_string();
56        }
57
58        let mut annotations = Vec::new();
59
60        for attachment in attachments {
61            match attachment.kind() {
62                MediaKind::Audio if self.config.transcribe_audio => {
63                    let annotation = self.process_audio(attachment).await;
64                    annotations.push(annotation);
65                }
66                MediaKind::Image if self.config.describe_images => {
67                    let annotation = self.process_image(attachment);
68                    annotations.push(annotation);
69                }
70                MediaKind::Video if self.config.summarize_video => {
71                    let annotation = self.process_video(attachment);
72                    annotations.push(annotation);
73                }
74                _ => {}
75            }
76        }
77
78        if annotations.is_empty() {
79            return original_text.to_string();
80        }
81
82        let mut enriched = String::with_capacity(
83            annotations.iter().map(|a| a.len() + 1).sum::<usize>() + original_text.len() + 2,
84        );
85
86        for annotation in &annotations {
87            enriched.push_str(annotation);
88            enriched.push('\n');
89        }
90
91        if !original_text.is_empty() {
92            enriched.push('\n');
93            enriched.push_str(original_text);
94        }
95
96        enriched.trim().to_string()
97    }
98
99    /// Transcribe an audio attachment using the existing transcription infra.
100    async fn process_audio(&self, attachment: &MediaAttachment) -> String {
101        let Some(manager) = self.transcription_manager else {
102            return "[Audio: attached]".to_string();
103        };
104
105        match manager
106            .transcribe(&attachment.data, &attachment.file_name)
107            .await
108        {
109            Ok(text) => {
110                let trimmed = text.trim();
111                if trimmed.is_empty() {
112                    "[Audio transcription: (empty)]".to_string()
113                } else {
114                    format!("[Audio transcription: {trimmed}]")
115                }
116            }
117            Err(err) => {
118                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"file": attachment.file_name, "error": format!("{}", err)})), "Media pipeline: audio transcription failed");
119                "[Audio: transcription failed]".to_string()
120            }
121        }
122    }
123
124    /// Describe an image attachment.
125    ///
126    /// When vision is available, the image will be passed through to the
127    /// model_provider as an `[IMAGE:]` marker and described by the model in the
128    /// normal flow.
129    fn process_image(&self, attachment: &MediaAttachment) -> String {
130        if self.vision_available {
131            let mime = attachment.mime_type.as_deref().unwrap_or("image/jpeg");
132            let b64 = STANDARD.encode(&attachment.data);
133            format!(
134                "[Image: {} attached, will be processed by vision model]\n[IMAGE:data:{};base64,{}]",
135                attachment.file_name, mime, b64
136            )
137        } else {
138            format!("[Image: {} attached]", attachment.file_name)
139        }
140    }
141
142    /// Summarize a video attachment.
143    ///
144    /// Video analysis requires external APIs not currently integrated.
145    /// For now we add a placeholder annotation.
146    fn process_video(&self, attachment: &MediaAttachment) -> String {
147        format!("[Video: {} attached]", attachment.file_name)
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    fn default_pipeline_config(enabled: bool) -> MediaPipelineConfig {
156        MediaPipelineConfig {
157            enabled,
158            transcribe_audio: true,
159            describe_images: true,
160            summarize_video: true,
161        }
162    }
163
164    fn sample_audio() -> MediaAttachment {
165        MediaAttachment {
166            file_name: "voice.ogg".to_string(),
167            data: vec![0u8; 100],
168            mime_type: Some("audio/ogg".to_string()),
169        }
170    }
171
172    fn sample_image() -> MediaAttachment {
173        MediaAttachment {
174            file_name: "photo.jpg".to_string(),
175            data: vec![0u8; 50],
176            mime_type: Some("image/jpeg".to_string()),
177        }
178    }
179
180    fn sample_video() -> MediaAttachment {
181        MediaAttachment {
182            file_name: "clip.mp4".to_string(),
183            data: vec![0u8; 200],
184            mime_type: Some("video/mp4".to_string()),
185        }
186    }
187
188    #[test]
189    fn media_kind_from_mime() {
190        let audio = MediaAttachment {
191            file_name: "file".to_string(),
192            data: vec![],
193            mime_type: Some("audio/ogg".to_string()),
194        };
195        assert_eq!(audio.kind(), MediaKind::Audio);
196
197        let image = MediaAttachment {
198            file_name: "file".to_string(),
199            data: vec![],
200            mime_type: Some("image/png".to_string()),
201        };
202        assert_eq!(image.kind(), MediaKind::Image);
203
204        let video = MediaAttachment {
205            file_name: "file".to_string(),
206            data: vec![],
207            mime_type: Some("video/mp4".to_string()),
208        };
209        assert_eq!(video.kind(), MediaKind::Video);
210    }
211
212    #[test]
213    fn media_kind_from_extension() {
214        let audio = MediaAttachment {
215            file_name: "voice.ogg".to_string(),
216            data: vec![],
217            mime_type: None,
218        };
219        assert_eq!(audio.kind(), MediaKind::Audio);
220
221        let image = MediaAttachment {
222            file_name: "photo.png".to_string(),
223            data: vec![],
224            mime_type: None,
225        };
226        assert_eq!(image.kind(), MediaKind::Image);
227
228        let video = MediaAttachment {
229            file_name: "clip.mp4".to_string(),
230            data: vec![],
231            mime_type: None,
232        };
233        assert_eq!(video.kind(), MediaKind::Video);
234
235        let unknown = MediaAttachment {
236            file_name: "data.bin".to_string(),
237            data: vec![],
238            mime_type: None,
239        };
240        assert_eq!(unknown.kind(), MediaKind::Unknown);
241    }
242
243    #[tokio::test]
244    async fn disabled_pipeline_returns_original_text() {
245        let config = default_pipeline_config(false);
246        let pipeline = MediaPipeline::new(&config, None, false);
247
248        let result = pipeline.process("hello", &[sample_audio()]).await;
249        assert_eq!(result, "hello");
250    }
251
252    #[tokio::test]
253    async fn empty_attachments_returns_original_text() {
254        let config = default_pipeline_config(true);
255        let pipeline = MediaPipeline::new(&config, None, false);
256
257        let result = pipeline.process("hello", &[]).await;
258        assert_eq!(result, "hello");
259    }
260
261    #[tokio::test]
262    async fn image_annotation_with_vision() {
263        let config = default_pipeline_config(true);
264        let pipeline = MediaPipeline::new(&config, None, true);
265
266        let result = pipeline.process("check this", &[sample_image()]).await;
267        assert!(
268            result.contains("[Image: photo.jpg attached, will be processed by vision model]"),
269            "expected vision annotation, got: {result}"
270        );
271        assert!(
272            result.contains("[IMAGE:data:image/jpeg;base64,"),
273            "expected image data marker, got: {result}"
274        );
275        assert!(result.contains("check this"));
276    }
277
278    #[tokio::test]
279    async fn image_annotation_without_vision() {
280        let config = default_pipeline_config(true);
281        let pipeline = MediaPipeline::new(&config, None, false);
282
283        let result = pipeline.process("check this", &[sample_image()]).await;
284        assert!(
285            result.contains("[Image: photo.jpg attached]"),
286            "expected basic image annotation, got: {result}"
287        );
288        assert!(
289            !result.contains("[IMAGE:data:"),
290            "non-vision path must not inline image data, got: {result}"
291        );
292    }
293
294    #[tokio::test]
295    async fn video_annotation() {
296        let config = default_pipeline_config(true);
297        let pipeline = MediaPipeline::new(&config, None, false);
298
299        let result = pipeline.process("watch", &[sample_video()]).await;
300        assert!(
301            result.contains("[Video: clip.mp4 attached]"),
302            "expected video annotation, got: {result}"
303        );
304    }
305
306    #[tokio::test]
307    async fn audio_without_transcription_enabled() {
308        let config = default_pipeline_config(true);
309        let pipeline = MediaPipeline::new(&config, None, false);
310
311        let result = pipeline.process("", &[sample_audio()]).await;
312        assert_eq!(result, "[Audio: attached]");
313    }
314
315    #[tokio::test]
316    async fn multiple_attachments_produce_multiple_annotations() {
317        let config = default_pipeline_config(true);
318        let pipeline = MediaPipeline::new(&config, None, false);
319
320        let attachments = vec![sample_audio(), sample_image(), sample_video()];
321        let result = pipeline.process("context", &attachments).await;
322
323        assert!(
324            result.contains("[Audio: attached]"),
325            "missing audio annotation"
326        );
327        assert!(
328            result.contains("[Image: photo.jpg attached]"),
329            "missing image annotation"
330        );
331        assert!(
332            result.contains("[Video: clip.mp4 attached]"),
333            "missing video annotation"
334        );
335        assert!(result.contains("context"), "missing original text");
336    }
337
338    #[tokio::test]
339    async fn disabled_sub_features_skip_processing() {
340        let config = MediaPipelineConfig {
341            enabled: true,
342            transcribe_audio: false,
343            describe_images: false,
344            summarize_video: false,
345        };
346        let pipeline = MediaPipeline::new(&config, None, false);
347
348        let attachments = vec![sample_audio(), sample_image(), sample_video()];
349        let result = pipeline.process("hello", &attachments).await;
350        assert_eq!(result, "hello");
351    }
352}