Skip to main content

zeroclaw_channels/orchestrator/
media_pipeline.rs

1//! Automatic media understanding pipeline for inbound channel messages.
2//!
3//! Pre-processes media attachments (audio, images, video) before the agent sees
4//! the message, enriching the text with human-readable annotations:
5//!
6//! - **Audio**: transcribed via the existing [`super::transcription`] infrastructure,
7//!   prepended as `[Audio transcription: ...]`.
8//! - **Images**: when a vision-capable model_provider is active, described as `[Image: <description>]`.
9//!   Falls back to `[Image: attached]` when vision is unavailable.
10//! - **Video**: summarised as `[Video summary: ...]` when an API is available,
11//!   otherwise `[Video: attached]`.
12//!
13//! The pipeline is **opt-in** via `[media_pipeline] enabled = true` in config.
14
15use base64::{Engine as _, engine::general_purpose::STANDARD};
16use std::borrow::Cow;
17use zeroclaw_config::schema::MediaPipelineConfig;
18
19use super::super::transcription::TranscriptionManager;
20
21// Re-export media types from zeroclaw-types for backwards compatibility.
22pub use zeroclaw_api::media::{MediaAttachment, MediaKind};
23
24/// The media understanding pipeline.
25///
26/// Consumes a message's text and attachments, returning enriched text with
27/// media annotations prepended.
28pub struct MediaPipeline<'a> {
29    config: &'a MediaPipelineConfig,
30    transcription_manager: Option<&'a TranscriptionManager>,
31    vision_available: bool,
32}
33
34impl<'a> MediaPipeline<'a> {
35    /// Create a new pipeline. `vision_available` indicates whether the current
36    /// model provider supports vision (image description). `transcription_manager`
37    /// is `None` when transcription is disabled at the channel level — audio
38    /// attachments fall back to `[Audio: attached]` annotations.
39    pub fn new(
40        config: &'a MediaPipelineConfig,
41        transcription_manager: Option<&'a TranscriptionManager>,
42        vision_available: bool,
43    ) -> Self {
44        Self {
45            config,
46            transcription_manager,
47            vision_available,
48        }
49    }
50
51    /// Process a message's attachments and return enriched text.
52    ///
53    /// If the pipeline is disabled via config, returns `original_text` unchanged.
54    pub async fn process(&self, original_text: &str, attachments: &[MediaAttachment]) -> String {
55        if !self.config.enabled || attachments.is_empty() {
56            return original_text.to_string();
57        }
58
59        let mut annotations = Vec::new();
60
61        for attachment in attachments {
62            match attachment.kind() {
63                MediaKind::Audio if self.config.transcribe_audio => {
64                    let annotation = self.process_audio(attachment).await;
65                    annotations.push(annotation);
66                }
67                MediaKind::Image if self.config.describe_images => {
68                    let annotation = self.process_image(attachment);
69                    annotations.push(annotation);
70                }
71                MediaKind::Video if self.config.summarize_video => {
72                    let annotation = self.process_video(attachment);
73                    annotations.push(annotation);
74                }
75                _ => {}
76            }
77        }
78
79        if annotations.is_empty() {
80            return original_text.to_string();
81        }
82
83        let mut enriched = String::with_capacity(
84            annotations.iter().map(|a| a.len() + 1).sum::<usize>() + original_text.len() + 2,
85        );
86
87        for annotation in &annotations {
88            enriched.push_str(annotation);
89            enriched.push('\n');
90        }
91
92        if !original_text.is_empty() {
93            enriched.push('\n');
94            enriched.push_str(original_text);
95        }
96
97        enriched.trim().to_string()
98    }
99
100    /// Transcribe an audio attachment using the existing transcription infra.
101    async fn process_audio(&self, attachment: &MediaAttachment) -> String {
102        let Some(manager) = self.transcription_manager else {
103            return "[Audio: attached]".to_string();
104        };
105
106        match manager
107            .transcribe(&attachment.data, &attachment.file_name)
108            .await
109        {
110            Ok(text) => {
111                let trimmed = text.trim();
112                if trimmed.is_empty() {
113                    "[Audio transcription: (empty)]".to_string()
114                } else {
115                    format!("[Audio transcription: {trimmed}]")
116                }
117            }
118            Err(err) => {
119                ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"file": attachment.file_name, "error": format!("{}", err)})), "Media pipeline: audio transcription failed");
120                "[Audio: transcription failed]".to_string()
121            }
122        }
123    }
124
125    /// Describe an image attachment.
126    ///
127    /// When vision is available, the image will be passed through to the
128    /// model_provider as an `[IMAGE:]` marker and described by the model in the
129    /// normal flow.
130    fn process_image(&self, attachment: &MediaAttachment) -> String {
131        if self.vision_available {
132            let (mime, data) = image_payload_for_vision(attachment);
133            let b64 = STANDARD.encode(data.as_ref());
134            format!(
135                "[Image: {} attached, will be processed by vision model]\n[IMAGE:data:{};base64,{}]",
136                attachment.file_name, mime, b64
137            )
138        } else {
139            format!("[Image: {} attached]", attachment.file_name)
140        }
141    }
142
143    /// Summarize a video attachment.
144    ///
145    /// Video analysis requires external APIs not currently integrated.
146    /// For now we add a placeholder annotation.
147    fn process_video(&self, attachment: &MediaAttachment) -> String {
148        format!("[Video: {} attached]", attachment.file_name)
149    }
150}
151
152fn image_payload_for_vision(attachment: &MediaAttachment) -> (String, Cow<'_, [u8]>) {
153    let mime = attachment.mime_type.as_deref().unwrap_or("image/jpeg");
154
155    #[cfg(feature = "image-normalization")]
156    if is_webp_attachment(attachment, mime) {
157        match webp_to_png(&attachment.data) {
158            Ok(png) => return ("image/png".to_string(), Cow::Owned(png)),
159            Err(err) => {
160                ::zeroclaw_log::record!(
161                    WARN,
162                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
163                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
164                        .with_attrs(::serde_json::json!({
165                            "file": attachment.file_name,
166                            "error": format!("{}", err),
167                            "error_key": "media_pipeline_webp_to_png_failed",
168                        })),
169                    "Media pipeline: failed to normalize WebP image for vision"
170                );
171            }
172        }
173    }
174
175    (mime.to_string(), Cow::Borrowed(&attachment.data))
176}
177
178#[cfg(feature = "image-normalization")]
179fn is_webp_attachment(attachment: &MediaAttachment, mime: &str) -> bool {
180    mime.eq_ignore_ascii_case("image/webp")
181        || attachment
182            .file_name
183            .rsplit_once('.')
184            .is_some_and(|(_, ext)| ext.eq_ignore_ascii_case("webp"))
185}
186
187#[cfg(feature = "image-normalization")]
188fn webp_to_png(data: &[u8]) -> anyhow::Result<Vec<u8>> {
189    let image = image::load_from_memory_with_format(data, image::ImageFormat::WebP)?;
190    let mut cursor = std::io::Cursor::new(Vec::new());
191    image.write_to(&mut cursor, image::ImageFormat::Png)?;
192    Ok(cursor.into_inner())
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    fn default_pipeline_config(enabled: bool) -> MediaPipelineConfig {
200        MediaPipelineConfig {
201            enabled,
202            transcribe_audio: true,
203            describe_images: true,
204            summarize_video: true,
205        }
206    }
207
208    fn sample_audio() -> MediaAttachment {
209        MediaAttachment {
210            file_name: "voice.ogg".to_string(),
211            data: vec![0u8; 100],
212            mime_type: Some("audio/ogg".to_string()),
213        }
214    }
215
216    fn sample_image() -> MediaAttachment {
217        MediaAttachment {
218            file_name: "photo.jpg".to_string(),
219            data: vec![0u8; 50],
220            mime_type: Some("image/jpeg".to_string()),
221        }
222    }
223
224    fn sample_video() -> MediaAttachment {
225        MediaAttachment {
226            file_name: "clip.mp4".to_string(),
227            data: vec![0u8; 200],
228            mime_type: Some("video/mp4".to_string()),
229        }
230    }
231
232    #[test]
233    fn media_kind_from_mime() {
234        let audio = MediaAttachment {
235            file_name: "file".to_string(),
236            data: vec![],
237            mime_type: Some("audio/ogg".to_string()),
238        };
239        assert_eq!(audio.kind(), MediaKind::Audio);
240
241        let image = MediaAttachment {
242            file_name: "file".to_string(),
243            data: vec![],
244            mime_type: Some("image/png".to_string()),
245        };
246        assert_eq!(image.kind(), MediaKind::Image);
247
248        let video = MediaAttachment {
249            file_name: "file".to_string(),
250            data: vec![],
251            mime_type: Some("video/mp4".to_string()),
252        };
253        assert_eq!(video.kind(), MediaKind::Video);
254    }
255
256    #[test]
257    fn media_kind_from_extension() {
258        let audio = MediaAttachment {
259            file_name: "voice.ogg".to_string(),
260            data: vec![],
261            mime_type: None,
262        };
263        assert_eq!(audio.kind(), MediaKind::Audio);
264
265        let image = MediaAttachment {
266            file_name: "photo.png".to_string(),
267            data: vec![],
268            mime_type: None,
269        };
270        assert_eq!(image.kind(), MediaKind::Image);
271
272        let video = MediaAttachment {
273            file_name: "clip.mp4".to_string(),
274            data: vec![],
275            mime_type: None,
276        };
277        assert_eq!(video.kind(), MediaKind::Video);
278
279        let unknown = MediaAttachment {
280            file_name: "data.bin".to_string(),
281            data: vec![],
282            mime_type: None,
283        };
284        assert_eq!(unknown.kind(), MediaKind::Unknown);
285    }
286
287    #[tokio::test]
288    async fn disabled_pipeline_returns_original_text() {
289        let config = default_pipeline_config(false);
290        let pipeline = MediaPipeline::new(&config, None, false);
291
292        let result = pipeline.process("hello", &[sample_audio()]).await;
293        assert_eq!(result, "hello");
294    }
295
296    #[tokio::test]
297    async fn empty_attachments_returns_original_text() {
298        let config = default_pipeline_config(true);
299        let pipeline = MediaPipeline::new(&config, None, false);
300
301        let result = pipeline.process("hello", &[]).await;
302        assert_eq!(result, "hello");
303    }
304
305    #[tokio::test]
306    async fn image_annotation_with_vision() {
307        let config = default_pipeline_config(true);
308        let pipeline = MediaPipeline::new(&config, None, true);
309
310        let result = pipeline.process("check this", &[sample_image()]).await;
311        assert!(
312            result.contains("[Image: photo.jpg attached, will be processed by vision model]"),
313            "expected vision annotation, got: {result}"
314        );
315        assert!(
316            result.contains("[IMAGE:data:image/jpeg;base64,"),
317            "expected image data marker, got: {result}"
318        );
319        assert!(result.contains("check this"));
320    }
321
322    #[cfg(feature = "image-normalization")]
323    #[tokio::test]
324    async fn webp_image_is_normalized_to_png_for_vision() {
325        let config = default_pipeline_config(true);
326        let pipeline = MediaPipeline::new(&config, None, true);
327        let mut cursor = std::io::Cursor::new(Vec::new());
328        let webp = image::DynamicImage::ImageRgba8(image::RgbaImage::from_pixel(
329            1,
330            1,
331            image::Rgba([255, 0, 0, 255]),
332        ));
333        webp.write_to(&mut cursor, image::ImageFormat::WebP)
334            .expect("test WebP should encode");
335
336        let sticker = MediaAttachment {
337            file_name: "sticker.webp".to_string(),
338            data: cursor.into_inner(),
339            mime_type: Some("image/webp".to_string()),
340        };
341
342        let result = pipeline.process("what is this?", &[sticker]).await;
343
344        assert!(result.contains("[IMAGE:data:image/png;base64,"));
345        assert!(!result.contains("[IMAGE:data:image/webp;base64,"));
346        assert!(result.contains("what is this?"));
347    }
348
349    #[tokio::test]
350    async fn image_annotation_without_vision() {
351        let config = default_pipeline_config(true);
352        let pipeline = MediaPipeline::new(&config, None, false);
353
354        let result = pipeline.process("check this", &[sample_image()]).await;
355        assert!(
356            result.contains("[Image: photo.jpg attached]"),
357            "expected basic image annotation, got: {result}"
358        );
359        assert!(
360            !result.contains("[IMAGE:data:"),
361            "non-vision path must not inline image data, got: {result}"
362        );
363    }
364
365    #[tokio::test]
366    async fn video_annotation() {
367        let config = default_pipeline_config(true);
368        let pipeline = MediaPipeline::new(&config, None, false);
369
370        let result = pipeline.process("watch", &[sample_video()]).await;
371        assert!(
372            result.contains("[Video: clip.mp4 attached]"),
373            "expected video annotation, got: {result}"
374        );
375    }
376
377    #[tokio::test]
378    async fn audio_without_transcription_enabled() {
379        let config = default_pipeline_config(true);
380        let pipeline = MediaPipeline::new(&config, None, false);
381
382        let result = pipeline.process("", &[sample_audio()]).await;
383        assert_eq!(result, "[Audio: attached]");
384    }
385
386    #[tokio::test]
387    async fn multiple_attachments_produce_multiple_annotations() {
388        let config = default_pipeline_config(true);
389        let pipeline = MediaPipeline::new(&config, None, false);
390
391        let attachments = vec![sample_audio(), sample_image(), sample_video()];
392        let result = pipeline.process("context", &attachments).await;
393
394        assert!(
395            result.contains("[Audio: attached]"),
396            "missing audio annotation"
397        );
398        assert!(
399            result.contains("[Image: photo.jpg attached]"),
400            "missing image annotation"
401        );
402        assert!(
403            result.contains("[Video: clip.mp4 attached]"),
404            "missing video annotation"
405        );
406        assert!(result.contains("context"), "missing original text");
407    }
408
409    #[tokio::test]
410    async fn disabled_sub_features_skip_processing() {
411        let config = MediaPipelineConfig {
412            enabled: true,
413            transcribe_audio: false,
414            describe_images: false,
415            summarize_video: false,
416        };
417        let pipeline = MediaPipeline::new(&config, None, false);
418
419        let attachments = vec![sample_audio(), sample_image(), sample_video()];
420        let result = pipeline.process("hello", &attachments).await;
421        assert_eq!(result, "hello");
422    }
423}