zeroclaw_channels/orchestrator/
media_pipeline.rs1use base64::{Engine as _, engine::general_purpose::STANDARD};
16use zeroclaw_config::schema::MediaPipelineConfig;
17
18use super::super::transcription::TranscriptionManager;
19
20pub use zeroclaw_api::media::{MediaAttachment, MediaKind};
22
23pub struct MediaPipeline<'a> {
28 config: &'a MediaPipelineConfig,
29 transcription_manager: Option<&'a TranscriptionManager>,
30 vision_available: bool,
31}
32
33impl<'a> MediaPipeline<'a> {
34 pub fn new(
39 config: &'a MediaPipelineConfig,
40 transcription_manager: Option<&'a TranscriptionManager>,
41 vision_available: bool,
42 ) -> Self {
43 Self {
44 config,
45 transcription_manager,
46 vision_available,
47 }
48 }
49
50 pub async fn process(&self, original_text: &str, attachments: &[MediaAttachment]) -> String {
54 if !self.config.enabled || attachments.is_empty() {
55 return original_text.to_string();
56 }
57
58 let mut annotations = Vec::new();
59
60 for attachment in attachments {
61 match attachment.kind() {
62 MediaKind::Audio if self.config.transcribe_audio => {
63 let annotation = self.process_audio(attachment).await;
64 annotations.push(annotation);
65 }
66 MediaKind::Image if self.config.describe_images => {
67 let annotation = self.process_image(attachment);
68 annotations.push(annotation);
69 }
70 MediaKind::Video if self.config.summarize_video => {
71 let annotation = self.process_video(attachment);
72 annotations.push(annotation);
73 }
74 _ => {}
75 }
76 }
77
78 if annotations.is_empty() {
79 return original_text.to_string();
80 }
81
82 let mut enriched = String::with_capacity(
83 annotations.iter().map(|a| a.len() + 1).sum::<usize>() + original_text.len() + 2,
84 );
85
86 for annotation in &annotations {
87 enriched.push_str(annotation);
88 enriched.push('\n');
89 }
90
91 if !original_text.is_empty() {
92 enriched.push('\n');
93 enriched.push_str(original_text);
94 }
95
96 enriched.trim().to_string()
97 }
98
99 async fn process_audio(&self, attachment: &MediaAttachment) -> String {
101 let Some(manager) = self.transcription_manager else {
102 return "[Audio: attached]".to_string();
103 };
104
105 match manager
106 .transcribe(&attachment.data, &attachment.file_name)
107 .await
108 {
109 Ok(text) => {
110 let trimmed = text.trim();
111 if trimmed.is_empty() {
112 "[Audio transcription: (empty)]".to_string()
113 } else {
114 format!("[Audio transcription: {trimmed}]")
115 }
116 }
117 Err(err) => {
118 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"file": attachment.file_name, "error": format!("{}", err)})), "Media pipeline: audio transcription failed");
119 "[Audio: transcription failed]".to_string()
120 }
121 }
122 }
123
124 fn process_image(&self, attachment: &MediaAttachment) -> String {
130 if self.vision_available {
131 let mime = attachment.mime_type.as_deref().unwrap_or("image/jpeg");
132 let b64 = STANDARD.encode(&attachment.data);
133 format!(
134 "[Image: {} attached, will be processed by vision model]\n[IMAGE:data:{};base64,{}]",
135 attachment.file_name, mime, b64
136 )
137 } else {
138 format!("[Image: {} attached]", attachment.file_name)
139 }
140 }
141
142 fn process_video(&self, attachment: &MediaAttachment) -> String {
147 format!("[Video: {} attached]", attachment.file_name)
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 fn default_pipeline_config(enabled: bool) -> MediaPipelineConfig {
156 MediaPipelineConfig {
157 enabled,
158 transcribe_audio: true,
159 describe_images: true,
160 summarize_video: true,
161 }
162 }
163
164 fn sample_audio() -> MediaAttachment {
165 MediaAttachment {
166 file_name: "voice.ogg".to_string(),
167 data: vec![0u8; 100],
168 mime_type: Some("audio/ogg".to_string()),
169 }
170 }
171
172 fn sample_image() -> MediaAttachment {
173 MediaAttachment {
174 file_name: "photo.jpg".to_string(),
175 data: vec![0u8; 50],
176 mime_type: Some("image/jpeg".to_string()),
177 }
178 }
179
180 fn sample_video() -> MediaAttachment {
181 MediaAttachment {
182 file_name: "clip.mp4".to_string(),
183 data: vec![0u8; 200],
184 mime_type: Some("video/mp4".to_string()),
185 }
186 }
187
188 #[test]
189 fn media_kind_from_mime() {
190 let audio = MediaAttachment {
191 file_name: "file".to_string(),
192 data: vec![],
193 mime_type: Some("audio/ogg".to_string()),
194 };
195 assert_eq!(audio.kind(), MediaKind::Audio);
196
197 let image = MediaAttachment {
198 file_name: "file".to_string(),
199 data: vec![],
200 mime_type: Some("image/png".to_string()),
201 };
202 assert_eq!(image.kind(), MediaKind::Image);
203
204 let video = MediaAttachment {
205 file_name: "file".to_string(),
206 data: vec![],
207 mime_type: Some("video/mp4".to_string()),
208 };
209 assert_eq!(video.kind(), MediaKind::Video);
210 }
211
212 #[test]
213 fn media_kind_from_extension() {
214 let audio = MediaAttachment {
215 file_name: "voice.ogg".to_string(),
216 data: vec![],
217 mime_type: None,
218 };
219 assert_eq!(audio.kind(), MediaKind::Audio);
220
221 let image = MediaAttachment {
222 file_name: "photo.png".to_string(),
223 data: vec![],
224 mime_type: None,
225 };
226 assert_eq!(image.kind(), MediaKind::Image);
227
228 let video = MediaAttachment {
229 file_name: "clip.mp4".to_string(),
230 data: vec![],
231 mime_type: None,
232 };
233 assert_eq!(video.kind(), MediaKind::Video);
234
235 let unknown = MediaAttachment {
236 file_name: "data.bin".to_string(),
237 data: vec![],
238 mime_type: None,
239 };
240 assert_eq!(unknown.kind(), MediaKind::Unknown);
241 }
242
243 #[tokio::test]
244 async fn disabled_pipeline_returns_original_text() {
245 let config = default_pipeline_config(false);
246 let pipeline = MediaPipeline::new(&config, None, false);
247
248 let result = pipeline.process("hello", &[sample_audio()]).await;
249 assert_eq!(result, "hello");
250 }
251
252 #[tokio::test]
253 async fn empty_attachments_returns_original_text() {
254 let config = default_pipeline_config(true);
255 let pipeline = MediaPipeline::new(&config, None, false);
256
257 let result = pipeline.process("hello", &[]).await;
258 assert_eq!(result, "hello");
259 }
260
261 #[tokio::test]
262 async fn image_annotation_with_vision() {
263 let config = default_pipeline_config(true);
264 let pipeline = MediaPipeline::new(&config, None, true);
265
266 let result = pipeline.process("check this", &[sample_image()]).await;
267 assert!(
268 result.contains("[Image: photo.jpg attached, will be processed by vision model]"),
269 "expected vision annotation, got: {result}"
270 );
271 assert!(
272 result.contains("[IMAGE:data:image/jpeg;base64,"),
273 "expected image data marker, got: {result}"
274 );
275 assert!(result.contains("check this"));
276 }
277
278 #[tokio::test]
279 async fn image_annotation_without_vision() {
280 let config = default_pipeline_config(true);
281 let pipeline = MediaPipeline::new(&config, None, false);
282
283 let result = pipeline.process("check this", &[sample_image()]).await;
284 assert!(
285 result.contains("[Image: photo.jpg attached]"),
286 "expected basic image annotation, got: {result}"
287 );
288 assert!(
289 !result.contains("[IMAGE:data:"),
290 "non-vision path must not inline image data, got: {result}"
291 );
292 }
293
294 #[tokio::test]
295 async fn video_annotation() {
296 let config = default_pipeline_config(true);
297 let pipeline = MediaPipeline::new(&config, None, false);
298
299 let result = pipeline.process("watch", &[sample_video()]).await;
300 assert!(
301 result.contains("[Video: clip.mp4 attached]"),
302 "expected video annotation, got: {result}"
303 );
304 }
305
306 #[tokio::test]
307 async fn audio_without_transcription_enabled() {
308 let config = default_pipeline_config(true);
309 let pipeline = MediaPipeline::new(&config, None, false);
310
311 let result = pipeline.process("", &[sample_audio()]).await;
312 assert_eq!(result, "[Audio: attached]");
313 }
314
315 #[tokio::test]
316 async fn multiple_attachments_produce_multiple_annotations() {
317 let config = default_pipeline_config(true);
318 let pipeline = MediaPipeline::new(&config, None, false);
319
320 let attachments = vec![sample_audio(), sample_image(), sample_video()];
321 let result = pipeline.process("context", &attachments).await;
322
323 assert!(
324 result.contains("[Audio: attached]"),
325 "missing audio annotation"
326 );
327 assert!(
328 result.contains("[Image: photo.jpg attached]"),
329 "missing image annotation"
330 );
331 assert!(
332 result.contains("[Video: clip.mp4 attached]"),
333 "missing video annotation"
334 );
335 assert!(result.contains("context"), "missing original text");
336 }
337
338 #[tokio::test]
339 async fn disabled_sub_features_skip_processing() {
340 let config = MediaPipelineConfig {
341 enabled: true,
342 transcribe_audio: false,
343 describe_images: false,
344 summarize_video: false,
345 };
346 let pipeline = MediaPipeline::new(&config, None, false);
347
348 let attachments = vec![sample_audio(), sample_image(), sample_video()];
349 let result = pipeline.process("hello", &attachments).await;
350 assert_eq!(result, "hello");
351 }
352}