zeroclaw_channels/orchestrator/
media_pipeline.rs1use base64::{Engine as _, engine::general_purpose::STANDARD};
16use std::borrow::Cow;
17use zeroclaw_config::schema::MediaPipelineConfig;
18
19use super::super::transcription::TranscriptionManager;
20
21pub use zeroclaw_api::media::{MediaAttachment, MediaKind};
23
24pub struct MediaPipeline<'a> {
29 config: &'a MediaPipelineConfig,
30 transcription_manager: Option<&'a TranscriptionManager>,
31 vision_available: bool,
32}
33
34impl<'a> MediaPipeline<'a> {
35 pub fn new(
40 config: &'a MediaPipelineConfig,
41 transcription_manager: Option<&'a TranscriptionManager>,
42 vision_available: bool,
43 ) -> Self {
44 Self {
45 config,
46 transcription_manager,
47 vision_available,
48 }
49 }
50
51 pub async fn process(&self, original_text: &str, attachments: &[MediaAttachment]) -> String {
55 if !self.config.enabled || attachments.is_empty() {
56 return original_text.to_string();
57 }
58
59 let mut annotations = Vec::new();
60
61 for attachment in attachments {
62 match attachment.kind() {
63 MediaKind::Audio if self.config.transcribe_audio => {
64 let annotation = self.process_audio(attachment).await;
65 annotations.push(annotation);
66 }
67 MediaKind::Image if self.config.describe_images => {
68 let annotation = self.process_image(attachment);
69 annotations.push(annotation);
70 }
71 MediaKind::Video if self.config.summarize_video => {
72 let annotation = self.process_video(attachment);
73 annotations.push(annotation);
74 }
75 _ => {}
76 }
77 }
78
79 if annotations.is_empty() {
80 return original_text.to_string();
81 }
82
83 let mut enriched = String::with_capacity(
84 annotations.iter().map(|a| a.len() + 1).sum::<usize>() + original_text.len() + 2,
85 );
86
87 for annotation in &annotations {
88 enriched.push_str(annotation);
89 enriched.push('\n');
90 }
91
92 if !original_text.is_empty() {
93 enriched.push('\n');
94 enriched.push_str(original_text);
95 }
96
97 enriched.trim().to_string()
98 }
99
100 async fn process_audio(&self, attachment: &MediaAttachment) -> String {
102 let Some(manager) = self.transcription_manager else {
103 return "[Audio: attached]".to_string();
104 };
105
106 match manager
107 .transcribe(&attachment.data, &attachment.file_name)
108 .await
109 {
110 Ok(text) => {
111 let trimmed = text.trim();
112 if trimmed.is_empty() {
113 "[Audio transcription: (empty)]".to_string()
114 } else {
115 format!("[Audio transcription: {trimmed}]")
116 }
117 }
118 Err(err) => {
119 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"file": attachment.file_name, "error": format!("{}", err)})), "Media pipeline: audio transcription failed");
120 "[Audio: transcription failed]".to_string()
121 }
122 }
123 }
124
125 fn process_image(&self, attachment: &MediaAttachment) -> String {
131 if self.vision_available {
132 let (mime, data) = image_payload_for_vision(attachment);
133 let b64 = STANDARD.encode(data.as_ref());
134 format!(
135 "[Image: {} attached, will be processed by vision model]\n[IMAGE:data:{};base64,{}]",
136 attachment.file_name, mime, b64
137 )
138 } else {
139 format!("[Image: {} attached]", attachment.file_name)
140 }
141 }
142
143 fn process_video(&self, attachment: &MediaAttachment) -> String {
148 format!("[Video: {} attached]", attachment.file_name)
149 }
150}
151
152fn image_payload_for_vision(attachment: &MediaAttachment) -> (String, Cow<'_, [u8]>) {
153 let mime = attachment.mime_type.as_deref().unwrap_or("image/jpeg");
154
155 #[cfg(feature = "image-normalization")]
156 if is_webp_attachment(attachment, mime) {
157 match webp_to_png(&attachment.data) {
158 Ok(png) => return ("image/png".to_string(), Cow::Owned(png)),
159 Err(err) => {
160 ::zeroclaw_log::record!(
161 WARN,
162 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
163 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
164 .with_attrs(::serde_json::json!({
165 "file": attachment.file_name,
166 "error": format!("{}", err),
167 "error_key": "media_pipeline_webp_to_png_failed",
168 })),
169 "Media pipeline: failed to normalize WebP image for vision"
170 );
171 }
172 }
173 }
174
175 (mime.to_string(), Cow::Borrowed(&attachment.data))
176}
177
178#[cfg(feature = "image-normalization")]
179fn is_webp_attachment(attachment: &MediaAttachment, mime: &str) -> bool {
180 mime.eq_ignore_ascii_case("image/webp")
181 || attachment
182 .file_name
183 .rsplit_once('.')
184 .is_some_and(|(_, ext)| ext.eq_ignore_ascii_case("webp"))
185}
186
187#[cfg(feature = "image-normalization")]
188fn webp_to_png(data: &[u8]) -> anyhow::Result<Vec<u8>> {
189 let image = image::load_from_memory_with_format(data, image::ImageFormat::WebP)?;
190 let mut cursor = std::io::Cursor::new(Vec::new());
191 image.write_to(&mut cursor, image::ImageFormat::Png)?;
192 Ok(cursor.into_inner())
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 fn default_pipeline_config(enabled: bool) -> MediaPipelineConfig {
200 MediaPipelineConfig {
201 enabled,
202 transcribe_audio: true,
203 describe_images: true,
204 summarize_video: true,
205 }
206 }
207
208 fn sample_audio() -> MediaAttachment {
209 MediaAttachment {
210 file_name: "voice.ogg".to_string(),
211 data: vec![0u8; 100],
212 mime_type: Some("audio/ogg".to_string()),
213 }
214 }
215
216 fn sample_image() -> MediaAttachment {
217 MediaAttachment {
218 file_name: "photo.jpg".to_string(),
219 data: vec![0u8; 50],
220 mime_type: Some("image/jpeg".to_string()),
221 }
222 }
223
224 fn sample_video() -> MediaAttachment {
225 MediaAttachment {
226 file_name: "clip.mp4".to_string(),
227 data: vec![0u8; 200],
228 mime_type: Some("video/mp4".to_string()),
229 }
230 }
231
232 #[test]
233 fn media_kind_from_mime() {
234 let audio = MediaAttachment {
235 file_name: "file".to_string(),
236 data: vec![],
237 mime_type: Some("audio/ogg".to_string()),
238 };
239 assert_eq!(audio.kind(), MediaKind::Audio);
240
241 let image = MediaAttachment {
242 file_name: "file".to_string(),
243 data: vec![],
244 mime_type: Some("image/png".to_string()),
245 };
246 assert_eq!(image.kind(), MediaKind::Image);
247
248 let video = MediaAttachment {
249 file_name: "file".to_string(),
250 data: vec![],
251 mime_type: Some("video/mp4".to_string()),
252 };
253 assert_eq!(video.kind(), MediaKind::Video);
254 }
255
256 #[test]
257 fn media_kind_from_extension() {
258 let audio = MediaAttachment {
259 file_name: "voice.ogg".to_string(),
260 data: vec![],
261 mime_type: None,
262 };
263 assert_eq!(audio.kind(), MediaKind::Audio);
264
265 let image = MediaAttachment {
266 file_name: "photo.png".to_string(),
267 data: vec![],
268 mime_type: None,
269 };
270 assert_eq!(image.kind(), MediaKind::Image);
271
272 let video = MediaAttachment {
273 file_name: "clip.mp4".to_string(),
274 data: vec![],
275 mime_type: None,
276 };
277 assert_eq!(video.kind(), MediaKind::Video);
278
279 let unknown = MediaAttachment {
280 file_name: "data.bin".to_string(),
281 data: vec![],
282 mime_type: None,
283 };
284 assert_eq!(unknown.kind(), MediaKind::Unknown);
285 }
286
287 #[tokio::test]
288 async fn disabled_pipeline_returns_original_text() {
289 let config = default_pipeline_config(false);
290 let pipeline = MediaPipeline::new(&config, None, false);
291
292 let result = pipeline.process("hello", &[sample_audio()]).await;
293 assert_eq!(result, "hello");
294 }
295
296 #[tokio::test]
297 async fn empty_attachments_returns_original_text() {
298 let config = default_pipeline_config(true);
299 let pipeline = MediaPipeline::new(&config, None, false);
300
301 let result = pipeline.process("hello", &[]).await;
302 assert_eq!(result, "hello");
303 }
304
305 #[tokio::test]
306 async fn image_annotation_with_vision() {
307 let config = default_pipeline_config(true);
308 let pipeline = MediaPipeline::new(&config, None, true);
309
310 let result = pipeline.process("check this", &[sample_image()]).await;
311 assert!(
312 result.contains("[Image: photo.jpg attached, will be processed by vision model]"),
313 "expected vision annotation, got: {result}"
314 );
315 assert!(
316 result.contains("[IMAGE:data:image/jpeg;base64,"),
317 "expected image data marker, got: {result}"
318 );
319 assert!(result.contains("check this"));
320 }
321
322 #[cfg(feature = "image-normalization")]
323 #[tokio::test]
324 async fn webp_image_is_normalized_to_png_for_vision() {
325 let config = default_pipeline_config(true);
326 let pipeline = MediaPipeline::new(&config, None, true);
327 let mut cursor = std::io::Cursor::new(Vec::new());
328 let webp = image::DynamicImage::ImageRgba8(image::RgbaImage::from_pixel(
329 1,
330 1,
331 image::Rgba([255, 0, 0, 255]),
332 ));
333 webp.write_to(&mut cursor, image::ImageFormat::WebP)
334 .expect("test WebP should encode");
335
336 let sticker = MediaAttachment {
337 file_name: "sticker.webp".to_string(),
338 data: cursor.into_inner(),
339 mime_type: Some("image/webp".to_string()),
340 };
341
342 let result = pipeline.process("what is this?", &[sticker]).await;
343
344 assert!(result.contains("[IMAGE:data:image/png;base64,"));
345 assert!(!result.contains("[IMAGE:data:image/webp;base64,"));
346 assert!(result.contains("what is this?"));
347 }
348
349 #[tokio::test]
350 async fn image_annotation_without_vision() {
351 let config = default_pipeline_config(true);
352 let pipeline = MediaPipeline::new(&config, None, false);
353
354 let result = pipeline.process("check this", &[sample_image()]).await;
355 assert!(
356 result.contains("[Image: photo.jpg attached]"),
357 "expected basic image annotation, got: {result}"
358 );
359 assert!(
360 !result.contains("[IMAGE:data:"),
361 "non-vision path must not inline image data, got: {result}"
362 );
363 }
364
365 #[tokio::test]
366 async fn video_annotation() {
367 let config = default_pipeline_config(true);
368 let pipeline = MediaPipeline::new(&config, None, false);
369
370 let result = pipeline.process("watch", &[sample_video()]).await;
371 assert!(
372 result.contains("[Video: clip.mp4 attached]"),
373 "expected video annotation, got: {result}"
374 );
375 }
376
377 #[tokio::test]
378 async fn audio_without_transcription_enabled() {
379 let config = default_pipeline_config(true);
380 let pipeline = MediaPipeline::new(&config, None, false);
381
382 let result = pipeline.process("", &[sample_audio()]).await;
383 assert_eq!(result, "[Audio: attached]");
384 }
385
386 #[tokio::test]
387 async fn multiple_attachments_produce_multiple_annotations() {
388 let config = default_pipeline_config(true);
389 let pipeline = MediaPipeline::new(&config, None, false);
390
391 let attachments = vec![sample_audio(), sample_image(), sample_video()];
392 let result = pipeline.process("context", &attachments).await;
393
394 assert!(
395 result.contains("[Audio: attached]"),
396 "missing audio annotation"
397 );
398 assert!(
399 result.contains("[Image: photo.jpg attached]"),
400 "missing image annotation"
401 );
402 assert!(
403 result.contains("[Video: clip.mp4 attached]"),
404 "missing video annotation"
405 );
406 assert!(result.contains("context"), "missing original text");
407 }
408
409 #[tokio::test]
410 async fn disabled_sub_features_skip_processing() {
411 let config = MediaPipelineConfig {
412 enabled: true,
413 transcribe_audio: false,
414 describe_images: false,
415 summarize_video: false,
416 };
417 let pipeline = MediaPipeline::new(&config, None, false);
418
419 let attachments = vec![sample_audio(), sample_image(), sample_video()];
420 let result = pipeline.process("hello", &attachments).await;
421 assert_eq!(result, "hello");
422 }
423}