1use crate::ModelProviderRuntimeOptions;
2use crate::auth::AuthService;
3use crate::auth::openai_oauth::extract_account_id_from_jwt;
4use crate::multimodal;
5use crate::stream_guard::AbortOnDrop;
6use crate::traits::{
7 ChatMessage, ChatRequest as ProviderChatRequest, ChatResponse as ProviderChatResponse,
8 ModelProvider, ProviderCapabilities, StreamChunk, StreamError, StreamEvent, StreamOptions,
9 StreamResult, ToolCall as ProviderToolCall,
10};
11use async_trait::async_trait;
12use futures_util::StreamExt;
13use futures_util::stream;
14use reqwest::Client;
15use serde::{Deserialize, Serialize};
16use serde_json::Value;
17use std::collections::{HashMap, HashSet};
18use std::path::PathBuf;
19use zeroclaw_api::tool::ToolSpec;
20
21const DEFAULT_CODEX_RESPONSES_URL: &str = "https://chatgpt.com/backend-api/codex/responses";
22const DEFAULT_CODEX_INSTRUCTIONS: &str =
23 "You are ZeroClaw, a concise and helpful coding assistant.";
24const WIRE_API: &str = "responses";
26const RESPONSES_HISTORY_PROVIDER: &str = "openai_codex";
27const RESPONSES_HISTORY_KIND: &str = "responses_output_items";
28
29#[derive(Clone)]
30pub struct OpenAiCodexModelProvider {
31 alias: String,
33 auth: AuthService,
34 auth_profile_override: Option<String>,
35 responses_url: String,
36 custom_endpoint: bool,
37 gateway_api_key: Option<String>,
38 reasoning_effort: Option<String>,
39 client: Client,
40}
41
42#[derive(Debug, Serialize)]
43struct ResponsesRequest {
44 model: String,
45 input: Vec<Value>,
46 instructions: String,
47 store: bool,
48 stream: bool,
49 text: ResponsesTextOptions,
50 reasoning: ResponsesReasoningOptions,
51 include: Vec<String>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 tools: Option<Vec<ResponsesToolSpec>>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 tool_choice: Option<String>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 parallel_tool_calls: Option<bool>,
58}
59
60#[derive(Debug, Serialize)]
61pub(crate) struct ResponsesToolSpec {
62 #[serde(rename = "type")]
63 pub(crate) kind: String,
64 pub(crate) name: String,
65 pub(crate) description: String,
66 pub(crate) parameters: Value,
67 pub(crate) strict: bool,
68}
69
70#[derive(Debug, Serialize)]
71struct ResponsesTextOptions {
72 verbosity: String,
73}
74
75#[derive(Debug, Serialize)]
76struct ResponsesReasoningOptions {
77 effort: String,
78 summary: String,
79}
80
81#[derive(Debug, Deserialize)]
82struct ResponsesResponse {
83 #[serde(default)]
84 output: Vec<Value>,
85 #[serde(default)]
86 output_text: Option<String>,
87}
88
89#[derive(Debug, Default)]
90pub(crate) struct ResponsesStreamState {
91 pub(crate) saw_text_delta: bool,
92 pub(crate) text_accumulator: String,
93 pub(crate) fallback_text: Option<String>,
94 pub(crate) tool_calls: HashMap<String, PendingToolCall>,
95 pub(crate) emitted_tool_call_ids: HashSet<String>,
96 pub(crate) collected_tool_calls: Vec<ProviderToolCall>,
97 pub(crate) output_items: Vec<Value>,
98}
99
100#[derive(Debug, Default, Clone)]
101pub(crate) struct PendingToolCall {
102 pub(crate) item_id: Option<String>,
103 pub(crate) call_id: Option<String>,
104 pub(crate) name: Option<String>,
105 pub(crate) arguments: String,
106}
107
108#[derive(Debug, Default)]
109pub(crate) struct ResponsesTurnResult {
110 pub(crate) text: Option<String>,
111 pub(crate) tool_calls: Vec<ProviderToolCall>,
112 pub(crate) reasoning_content: Option<String>,
113}
114
115impl OpenAiCodexModelProvider {
116 pub fn new(
117 alias: &str,
118 options: &ModelProviderRuntimeOptions,
119 gateway_api_key: Option<&str>,
120 ) -> anyhow::Result<Self> {
121 let state_dir = options
122 .zeroclaw_dir
123 .clone()
124 .unwrap_or_else(default_zeroclaw_dir);
125 let auth = AuthService::new(&state_dir, options.secrets_encrypt);
126 let responses_url = resolve_responses_url(options)?;
127
128 Ok(Self {
129 alias: alias.to_string(),
130 auth,
131 auth_profile_override: options.auth_profile_override.clone(),
132 custom_endpoint: !is_default_responses_url(&responses_url),
133 responses_url,
134 gateway_api_key: gateway_api_key.map(ToString::to_string),
135 reasoning_effort: options.reasoning_effort.clone(),
136 client: Client::builder()
137 .connect_timeout(std::time::Duration::from_secs(10))
138 .read_timeout(std::time::Duration::from_secs(300))
139 .build()
140 .unwrap_or_else(|_| Client::new()),
141 })
142 }
143}
144
145fn default_zeroclaw_dir() -> PathBuf {
146 directories::UserDirs::new().map_or_else(
147 || PathBuf::from(".zeroclaw"),
148 |dirs| dirs.home_dir().join(".zeroclaw"),
149 )
150}
151
152fn build_responses_url(base_or_endpoint: &str) -> anyhow::Result<String> {
153 let candidate = base_or_endpoint.trim();
154 if candidate.is_empty() {
155 anyhow::bail!("OpenAI Codex endpoint override cannot be empty");
156 }
157
158 let mut parsed = reqwest::Url::parse(candidate).map_err(|_| {
159 ::zeroclaw_log::record!(
160 WARN,
161 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
162 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
163 .with_attrs(::serde_json::json!({"candidate": candidate})),
164 "openai_codex: endpoint override is not a valid URL"
165 );
166 anyhow::Error::msg("OpenAI Codex endpoint override must be a valid URL")
167 })?;
168
169 match parsed.scheme() {
170 "http" | "https" => {}
171 _ => anyhow::bail!("OpenAI Codex endpoint override must use http:// or https://"),
172 }
173
174 let path = parsed.path().trim_end_matches('/');
175 if !path.ends_with("/responses") {
176 let with_suffix = if path.is_empty() || path == "/" {
177 "/responses".to_string()
178 } else {
179 format!("{path}/responses")
180 };
181 parsed.set_path(&with_suffix);
182 }
183
184 parsed.set_query(None);
185 parsed.set_fragment(None);
186
187 Ok(parsed.to_string())
188}
189
190fn resolve_responses_url(options: &ModelProviderRuntimeOptions) -> anyhow::Result<String> {
191 if let Some(api_url) = options
192 .provider_api_url
193 .as_deref()
194 .and_then(|value| first_nonempty(Some(value)))
195 {
196 return build_responses_url(&api_url);
197 }
198
199 Ok(DEFAULT_CODEX_RESPONSES_URL.to_string())
200}
201
202fn canonical_endpoint(url: &str) -> Option<(String, String, u16, String)> {
203 let parsed = reqwest::Url::parse(url).ok()?;
204 let host = parsed.host_str()?.to_ascii_lowercase();
205 let port = parsed.port_or_known_default()?;
206 let path = parsed.path().trim_end_matches('/').to_string();
207 Some((parsed.scheme().to_ascii_lowercase(), host, port, path))
208}
209
210fn is_default_responses_url(url: &str) -> bool {
211 canonical_endpoint(url) == canonical_endpoint(DEFAULT_CODEX_RESPONSES_URL)
212}
213
214pub(crate) fn first_nonempty(text: Option<&str>) -> Option<String> {
215 text.and_then(|value| {
216 let trimmed = value.trim();
217 if trimmed.is_empty() {
218 None
219 } else {
220 Some(trimmed.to_string())
221 }
222 })
223}
224
225fn normalize_model_id(model: &str) -> &str {
226 model.rsplit('/').next().unwrap_or(model)
227}
228
229pub(crate) fn convert_tools(tools: Option<&[ToolSpec]>) -> Option<Vec<ResponsesToolSpec>> {
230 let items = tools?;
231 if items.is_empty() {
232 return None;
233 }
234
235 Some(
236 items
237 .iter()
238 .map(|tool| ResponsesToolSpec {
239 kind: "function".to_string(),
240 name: tool.name.clone(),
241 description: tool.description.clone(),
242 parameters: tool.parameters.clone(),
243 strict: false,
244 })
245 .collect(),
246 )
247}
248
249fn response_message_item(role: &str, content: Vec<Value>) -> Value {
250 serde_json::json!({
251 "type": "message",
252 "role": role,
253 "content": content,
254 })
255}
256
257fn legacy_tool_output_message(content: &str) -> Value {
258 response_message_item(
259 "user",
260 vec![serde_json::json!({
261 "type": "input_text",
262 "text": format!("Legacy tool output without call_id:\n{content}"),
263 })],
264 )
265}
266
267fn response_item_type(item: &Value) -> Option<&str> {
268 item.get("type").and_then(Value::as_str)
269}
270
271fn is_replayable_responses_output_item(item: &Value) -> bool {
272 matches!(
273 response_item_type(item),
274 Some("message" | "reasoning" | "function_call")
275 )
276}
277
278fn encode_responses_history_items(output_items: &[Value], has_tool_calls: bool) -> Option<String> {
279 if !has_tool_calls {
280 return None;
281 }
282
283 let replay_items = output_items
284 .iter()
285 .filter(|item| is_replayable_responses_output_item(item))
286 .cloned()
287 .collect::<Vec<_>>();
288
289 if !replay_items
290 .iter()
291 .any(|item| response_item_type(item) == Some("function_call"))
292 {
293 return None;
294 }
295
296 serde_json::to_string(&serde_json::json!({
297 "provider": RESPONSES_HISTORY_PROVIDER,
298 "kind": RESPONSES_HISTORY_KIND,
299 "items": replay_items,
300 }))
301 .ok()
302}
303
304fn decode_responses_history_items(reasoning_content: &str) -> Option<Vec<Value>> {
305 let value = serde_json::from_str::<Value>(reasoning_content).ok()?;
306 if value.get("provider").and_then(Value::as_str) != Some(RESPONSES_HISTORY_PROVIDER)
307 || value.get("kind").and_then(Value::as_str) != Some(RESPONSES_HISTORY_KIND)
308 {
309 return None;
310 }
311
312 let items = value
313 .get("items")
314 .and_then(Value::as_array)?
315 .iter()
316 .filter(|item| is_replayable_responses_output_item(item))
317 .cloned()
318 .collect::<Vec<_>>();
319
320 (!items.is_empty()).then_some(items)
321}
322
323pub(crate) fn build_responses_input(messages: &[ChatMessage]) -> (String, Vec<Value>) {
324 let mut system_parts: Vec<&str> = Vec::new();
325 let mut input: Vec<Value> = Vec::new();
326
327 for msg in messages {
328 match msg.role.as_str() {
329 "system" => system_parts.push(&msg.content),
330 "user" => {
331 let (cleaned_text, image_refs) = multimodal::parse_image_markers(&msg.content);
332
333 let mut content_items = Vec::new();
334
335 if !cleaned_text.trim().is_empty() {
336 content_items.push(serde_json::json!({
337 "type": "input_text",
338 "text": cleaned_text,
339 }));
340 }
341
342 for image_ref in image_refs {
343 content_items.push(serde_json::json!({
344 "type": "input_image",
345 "image_url": image_ref,
346 }));
347 }
348
349 if content_items.is_empty() {
350 content_items.push(serde_json::json!({
351 "type": "input_text",
352 "text": "",
353 }));
354 }
355
356 input.push(response_message_item("user", content_items));
357 }
358 "assistant" => {
359 if let Ok(value) = serde_json::from_str::<Value>(&msg.content)
360 && let Some(tool_calls_value) = value.get("tool_calls")
361 && let Ok(parsed_calls) =
362 serde_json::from_value::<Vec<ProviderToolCall>>(tool_calls_value.clone())
363 {
364 let content = value
365 .get("content")
366 .and_then(Value::as_str)
367 .filter(|content| !content.trim().is_empty());
368 let responses_history_items = value
369 .get("reasoning_content")
370 .and_then(Value::as_str)
371 .and_then(decode_responses_history_items);
372
373 if let Some(items) = responses_history_items {
374 if let Some(content) = content
375 && !items
376 .iter()
377 .any(|item| response_item_type(item) == Some("message"))
378 {
379 input.push(response_message_item(
380 "assistant",
381 vec![serde_json::json!({
382 "type": "output_text",
383 "text": content,
384 })],
385 ));
386 }
387
388 input.extend(items);
389 continue;
390 }
391
392 if let Some(content) = value
393 .get("content")
394 .and_then(Value::as_str)
395 .filter(|content| !content.trim().is_empty())
396 {
397 input.push(response_message_item(
398 "assistant",
399 vec![serde_json::json!({
400 "type": "output_text",
401 "text": content,
402 })],
403 ));
404 }
405
406 for call in parsed_calls {
407 input.push(serde_json::json!({
408 "type": "function_call",
409 "call_id": call.id,
410 "name": call.name,
411 "arguments": call.arguments,
412 }));
413 }
414 } else if !msg.content.trim().is_empty() {
415 input.push(response_message_item(
416 "assistant",
417 vec![serde_json::json!({
418 "type": "output_text",
419 "text": msg.content,
420 })],
421 ));
422 }
423 }
424 "tool" => {
425 if let Ok(value) = serde_json::from_str::<Value>(&msg.content) {
426 if let Some(call_id) = value
427 .get("tool_call_id")
428 .and_then(Value::as_str)
429 .and_then(|id| first_nonempty(Some(id)))
430 {
431 let output = value
432 .get("content")
433 .and_then(Value::as_str)
434 .unwrap_or_default();
435 input.push(serde_json::json!({
436 "type": "function_call_output",
437 "call_id": call_id,
438 "output": output,
439 }));
440 } else if !msg.content.trim().is_empty() {
441 input.push(legacy_tool_output_message(&msg.content));
442 }
443 } else if !msg.content.trim().is_empty() {
444 input.push(legacy_tool_output_message(&msg.content));
445 }
446 }
447 _ => {}
448 }
449 }
450
451 let instructions = if system_parts.is_empty() {
452 DEFAULT_CODEX_INSTRUCTIONS.to_string()
453 } else {
454 system_parts.join("\n\n")
455 };
456
457 (instructions, input)
458}
459
460fn clamp_reasoning_effort(model: &str, effort: &str) -> String {
461 let id = normalize_model_id(model);
462 if id == "gpt-5-codex" {
464 return match effort {
465 "low" | "medium" | "high" => effort.to_string(),
466 "minimal" => "low".to_string(),
467 _ => "high".to_string(),
468 };
469 }
470 if (id.starts_with("gpt-5.2") || id.starts_with("gpt-5.3")) && effort == "minimal" {
471 return "low".to_string();
472 }
473 if id.starts_with("gpt-5-codex") && effort == "xhigh" {
474 return "high".to_string();
475 }
476 if id == "gpt-5.1" && effort == "xhigh" {
477 return "high".to_string();
478 }
479 if id == "gpt-5.1-codex-mini" {
480 return if effort == "high" || effort == "xhigh" {
481 "high".to_string()
482 } else {
483 "medium".to_string()
484 };
485 }
486 effort.to_string()
487}
488
489fn resolve_reasoning_effort(model_id: &str, configured: Option<&str>) -> String {
490 let raw = configured
491 .and_then(|value| first_nonempty(Some(value)))
492 .map(|s| s.to_ascii_lowercase())
493 .unwrap_or_else(|| "xhigh".to_string());
494 clamp_reasoning_effort(model_id, &raw)
495}
496
497pub(crate) fn nonempty_preserve(text: Option<&str>) -> Option<String> {
498 text.and_then(|value| {
499 if value.is_empty() {
500 None
501 } else {
502 Some(value.to_string())
503 }
504 })
505}
506
507fn extract_responses_text(response: &ResponsesResponse) -> Option<String> {
508 if let Some(text) = first_nonempty(response.output_text.as_deref()) {
509 return Some(text);
510 }
511
512 for item in &response.output {
513 if response_item_type(item) != Some("message") {
514 continue;
515 }
516
517 if let Some(parts) = item.get("content").and_then(Value::as_array) {
518 for content in parts {
519 if response_item_type(content) == Some("output_text")
520 && let Some(text) = first_nonempty(content.get("text").and_then(Value::as_str))
521 {
522 return Some(text);
523 }
524 }
525 }
526 }
527
528 for item in &response.output {
529 if let Some(parts) = item.get("content").and_then(Value::as_array) {
530 for content in parts {
531 if let Some(text) = first_nonempty(content.get("text").and_then(Value::as_str)) {
532 return Some(text);
533 }
534 }
535 }
536 }
537
538 None
539}
540
541fn extract_responses_tool_calls(response: &ResponsesResponse) -> Vec<ProviderToolCall> {
542 response
543 .output
544 .iter()
545 .filter(|item| response_item_type(item) == Some("function_call"))
546 .filter_map(|item| {
547 let name = item.get("name").and_then(Value::as_str)?.to_string();
548 let arguments = item
549 .get("arguments")
550 .and_then(Value::as_str)
551 .unwrap_or_default()
552 .to_string();
553 Some(ProviderToolCall {
554 id: item
555 .get("call_id")
556 .and_then(Value::as_str)
557 .or_else(|| item.get("id").and_then(Value::as_str))
558 .map(ToString::to_string)
559 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
560 name,
561 arguments,
562 extra_content: None,
563 })
564 })
565 .collect()
566}
567
568fn responses_turn_from_response(response: &ResponsesResponse) -> ResponsesTurnResult {
569 let tool_calls = extract_responses_tool_calls(response);
570 let reasoning_content =
571 encode_responses_history_items(&response.output, !tool_calls.is_empty());
572
573 ResponsesTurnResult {
574 text: extract_responses_text(response),
575 tool_calls,
576 reasoning_content,
577 }
578}
579
580fn record_responses_output_item(state: &mut ResponsesStreamState, item: Value) {
581 if !is_replayable_responses_output_item(&item) {
582 return;
583 }
584
585 let item_id = item
586 .get("id")
587 .and_then(Value::as_str)
588 .or_else(|| item.get("call_id").and_then(Value::as_str));
589 if let Some(item_id) = item_id
590 && state.output_items.iter().any(|existing| {
591 existing
592 .get("id")
593 .and_then(Value::as_str)
594 .or_else(|| existing.get("call_id").and_then(Value::as_str))
595 == Some(item_id)
596 })
597 {
598 return;
599 }
600
601 state.output_items.push(item);
602}
603
604fn replace_responses_output_items(state: &mut ResponsesStreamState, items: &[Value]) {
605 let replay_items = items
606 .iter()
607 .filter(|item| is_replayable_responses_output_item(item))
608 .cloned()
609 .collect::<Vec<_>>();
610
611 if !replay_items.is_empty() {
612 state.output_items = replay_items;
613 }
614}
615
616fn response_output_text_from_event_item(item: &Value) -> Option<String> {
617 if item.get("type").and_then(Value::as_str) != Some("message") {
618 return None;
619 }
620
621 item.get("content")
622 .and_then(Value::as_array)
623 .and_then(|parts| {
624 parts.iter().find_map(|part| {
625 if part.get("type").and_then(Value::as_str) == Some("output_text") {
626 first_nonempty(part.get("text").and_then(Value::as_str))
627 } else {
628 None
629 }
630 })
631 })
632}
633
634fn pending_tool_call_key(item_id: Option<&str>, output_index: Option<u64>) -> Option<String> {
635 item_id
636 .map(ToString::to_string)
637 .or_else(|| output_index.map(|index| format!("output:{index}")))
638}
639
640fn emit_tool_call(
641 state: &mut ResponsesStreamState,
642 tool_call: ProviderToolCall,
643) -> Option<ProviderToolCall> {
644 if state.emitted_tool_call_ids.insert(tool_call.id.clone()) {
645 state.collected_tool_calls.push(tool_call.clone());
646 Some(tool_call)
647 } else {
648 None
649 }
650}
651
652#[derive(Debug)]
653pub(crate) struct ResponsesStreamApiError(pub(crate) String);
654
655impl std::fmt::Display for ResponsesStreamApiError {
656 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
657 write!(f, "OpenAI responses stream error: {}", self.0)
658 }
659}
660
661impl std::error::Error for ResponsesStreamApiError {}
662
663pub(crate) fn process_responses_stream_event(
664 event: Value,
665 state: &mut ResponsesStreamState,
666) -> anyhow::Result<Vec<StreamEvent>> {
667 if let Some(message) = extract_stream_error_message(&event) {
668 return Err(ResponsesStreamApiError(message).into());
669 }
670
671 let mut emitted = Vec::new();
672 match event.get("type").and_then(Value::as_str) {
673 Some("response.output_text.delta") => {
674 if let Some(text) = nonempty_preserve(event.get("delta").and_then(Value::as_str)) {
675 state.saw_text_delta = true;
676 state.text_accumulator.push_str(&text);
677 emitted.push(StreamEvent::TextDelta(StreamChunk::delta(text)));
678 }
679 }
680 Some("response.output_text.done") if !state.saw_text_delta => {
681 state.fallback_text = nonempty_preserve(event.get("text").and_then(Value::as_str));
682 }
683 Some("response.output_item.added") => {
684 let item = event.get("item");
685 let item_type = item
686 .and_then(|value| value.get("type"))
687 .and_then(Value::as_str);
688 if item_type == Some("function_call") {
689 let key = pending_tool_call_key(
690 item.and_then(|value| value.get("id"))
691 .and_then(Value::as_str),
692 event.get("output_index").and_then(Value::as_u64),
693 );
694 if let Some(key) = key {
695 let entry = state.tool_calls.entry(key).or_default();
696 entry.item_id = item
697 .and_then(|value| value.get("id"))
698 .and_then(Value::as_str)
699 .map(ToString::to_string);
700 entry.call_id = item
701 .and_then(|value| value.get("call_id"))
702 .and_then(Value::as_str)
703 .map(ToString::to_string);
704 entry.name = item
705 .and_then(|value| value.get("name"))
706 .and_then(Value::as_str)
707 .map(ToString::to_string);
708 if let Some(arguments) = item
709 .and_then(|value| value.get("arguments"))
710 .and_then(Value::as_str)
711 {
712 entry.arguments = arguments.to_string();
713 }
714 }
715 }
716 }
717 Some("response.function_call_arguments.delta") => {
718 if let Some(key) = pending_tool_call_key(
719 event.get("item_id").and_then(Value::as_str),
720 event.get("output_index").and_then(Value::as_u64),
721 ) {
722 let entry = state.tool_calls.entry(key).or_default();
723 entry.item_id = event
724 .get("item_id")
725 .and_then(Value::as_str)
726 .map(ToString::to_string);
727 entry.arguments.push_str(
728 event
729 .get("delta")
730 .and_then(Value::as_str)
731 .unwrap_or_default(),
732 );
733 }
734 }
735 Some("response.function_call_arguments.done") => {
736 let key = pending_tool_call_key(
737 event.get("item_id").and_then(Value::as_str),
738 event.get("output_index").and_then(Value::as_u64),
739 );
740 let mut pending = key
741 .as_ref()
742 .and_then(|key| state.tool_calls.remove(key))
743 .unwrap_or_default();
744 pending.item_id = pending.item_id.or_else(|| {
745 event
746 .get("item_id")
747 .and_then(Value::as_str)
748 .map(ToString::to_string)
749 });
750 pending.call_id = pending.call_id.or_else(|| {
751 event
752 .get("call_id")
753 .and_then(Value::as_str)
754 .map(ToString::to_string)
755 });
756 pending.name = pending.name.or_else(|| {
757 event
758 .get("name")
759 .and_then(Value::as_str)
760 .map(ToString::to_string)
761 });
762 if let Some(arguments) = event.get("arguments").and_then(Value::as_str) {
763 pending.arguments = arguments.to_string();
764 }
765
766 if let Some(name) = pending.name {
767 let tool_call = ProviderToolCall {
768 id: pending
769 .call_id
770 .or(pending.item_id)
771 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
772 name,
773 arguments: pending.arguments,
774 extra_content: None,
775 };
776 if let Some(tool_call) = emit_tool_call(state, tool_call) {
777 emitted.push(StreamEvent::ToolCall(tool_call));
778 }
779 }
780 }
781 Some("response.output_item.done") => {
782 if let Some(item) = event.get("item") {
783 record_responses_output_item(state, item.clone());
784 match item.get("type").and_then(Value::as_str) {
785 Some("message") if !state.saw_text_delta && state.fallback_text.is_none() => {
786 state.fallback_text = response_output_text_from_event_item(item);
787 }
788 Some("function_call") => {
789 if let Some(name) = item.get("name").and_then(Value::as_str) {
790 let tool_call = ProviderToolCall {
791 id: item
792 .get("call_id")
793 .and_then(Value::as_str)
794 .or_else(|| item.get("id").and_then(Value::as_str))
795 .unwrap_or_default()
796 .to_string(),
797 name: name.to_string(),
798 arguments: item
799 .get("arguments")
800 .and_then(Value::as_str)
801 .unwrap_or_default()
802 .to_string(),
803 extra_content: None,
804 };
805 if let Some(tool_call) = emit_tool_call(state, tool_call) {
806 emitted.push(StreamEvent::ToolCall(tool_call));
807 }
808 }
809 }
810 _ => {}
811 }
812 }
813 }
814 Some("response.completed" | "response.done") => {
815 if let Some(response) = event
816 .get("response")
817 .and_then(|value| serde_json::from_value::<ResponsesResponse>(value.clone()).ok())
818 {
819 if !state.saw_text_delta && state.fallback_text.is_none() {
820 state.fallback_text = extract_responses_text(&response);
821 }
822 replace_responses_output_items(state, &response.output);
823 for tool_call in extract_responses_tool_calls(&response) {
824 if let Some(tool_call) = emit_tool_call(state, tool_call) {
825 emitted.push(StreamEvent::ToolCall(tool_call));
826 }
827 }
828 }
829 }
830 _ => {}
831 }
832
833 Ok(emitted)
834}
835
836pub(crate) fn process_sse_chunk(
837 chunk: &str,
838 state: &mut ResponsesStreamState,
839) -> anyhow::Result<Vec<StreamEvent>> {
840 let data_lines: Vec<String> = chunk
841 .lines()
842 .filter_map(|line| line.strip_prefix("data:"))
843 .map(|line| line.trim().to_string())
844 .collect();
845 if data_lines.is_empty() {
846 return Ok(Vec::new());
847 }
848
849 let joined = data_lines.join("\n");
850 let trimmed = joined.trim();
851 if trimmed.is_empty() || trimmed == "[DONE]" {
852 return Ok(Vec::new());
853 }
854
855 if let Ok(event) = serde_json::from_str::<Value>(trimmed) {
856 return process_responses_stream_event(event, state);
857 }
858
859 let mut emitted = Vec::new();
860 for line in data_lines {
861 let line = line.trim();
862 if line.is_empty() || line == "[DONE]" {
863 continue;
864 }
865 let event = serde_json::from_str::<Value>(line).map_err(|err| {
866 let sanitized = super::sanitize_api_error(line);
867 ::zeroclaw_log::record!(
868 ERROR,
869 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
870 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
871 .with_attrs(::serde_json::json!({
872 "phase": "sse_parse",
873 "payload": &sanitized,
874 "error": format!("{}", err),
875 })),
876 "openai_codex: SSE data parse failed"
877 );
878 anyhow::Error::msg(format!(
879 "OpenAI Codex SSE data parse failed: {err}. Payload: {sanitized}"
880 ))
881 })?;
882 emitted.extend(process_responses_stream_event(event, state)?);
883 }
884
885 Ok(emitted)
886}
887
888fn parse_sse_turn(body: &str) -> anyhow::Result<ResponsesTurnResult> {
889 let mut state = ResponsesStreamState::default();
890 let mut buffer = body.to_string();
891
892 while let Some(idx) = buffer.find("\n\n") {
893 let chunk = buffer[..idx].to_string();
894 buffer = buffer[idx + 2..].to_string();
895 process_sse_chunk(&chunk, &mut state)?;
896 }
897
898 if !buffer.trim().is_empty() {
899 process_sse_chunk(&buffer, &mut state)?;
900 }
901
902 Ok(ResponsesTurnResult {
903 text: if state.saw_text_delta {
904 nonempty_preserve(Some(&state.text_accumulator))
905 } else {
906 state.fallback_text
907 },
908 reasoning_content: encode_responses_history_items(
909 &state.output_items,
910 !state.collected_tool_calls.is_empty(),
911 ),
912 tool_calls: state.collected_tool_calls,
913 })
914}
915
916fn ensure_nonempty_responses_turn(
917 result: ResponsesTurnResult,
918 empty_error: impl FnOnce() -> anyhow::Error,
919) -> anyhow::Result<ResponsesTurnResult> {
920 if result.text.as_deref().is_some_and(|text| !text.is_empty()) || !result.tool_calls.is_empty()
921 {
922 Ok(result)
923 } else {
924 Err(empty_error())
925 }
926}
927
928pub(crate) fn extract_stream_error_message(event: &Value) -> Option<String> {
929 let event_type = event.get("type").and_then(Value::as_str);
930
931 if event_type == Some("error") {
932 return first_nonempty(
933 event
934 .get("message")
935 .and_then(Value::as_str)
936 .or_else(|| event.get("code").and_then(Value::as_str))
937 .or_else(|| {
938 event
939 .get("error")
940 .and_then(|error| error.get("message"))
941 .and_then(Value::as_str)
942 }),
943 );
944 }
945
946 if event_type == Some("response.failed") {
947 return first_nonempty(
948 event
949 .get("response")
950 .and_then(|response| response.get("error"))
951 .and_then(|error| error.get("message"))
952 .and_then(Value::as_str),
953 );
954 }
955
956 None
957}
958
959pub(crate) fn append_utf8_stream_chunk(
960 body: &mut String,
961 pending: &mut Vec<u8>,
962 chunk: &[u8],
963) -> anyhow::Result<()> {
964 if pending.is_empty()
965 && let Ok(text) = std::str::from_utf8(chunk)
966 {
967 body.push_str(text);
968 return Ok(());
969 }
970
971 if !chunk.is_empty() {
972 pending.extend_from_slice(chunk);
973 }
974 if pending.is_empty() {
975 return Ok(());
976 }
977
978 match std::str::from_utf8(pending) {
979 Ok(text) => {
980 body.push_str(text);
981 pending.clear();
982 Ok(())
983 }
984 Err(err) => {
985 let valid_up_to = err.valid_up_to();
986 if valid_up_to > 0 {
987 let prefix = std::str::from_utf8(&pending[..valid_up_to])
989 .expect("valid UTF-8 prefix from Utf8Error::valid_up_to");
990 body.push_str(prefix);
991 pending.drain(..valid_up_to);
992 }
993
994 if err.error_len().is_some() {
995 ::zeroclaw_log::record!(
996 ERROR,
997 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
998 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
999 .with_attrs(::serde_json::json!({
1000 "phase": "utf8_decode",
1001 "error": format!("{}", err),
1002 })),
1003 "openai_codex: response contained invalid UTF-8"
1004 );
1005 return Err(anyhow::Error::msg(format!(
1006 "OpenAI Codex response contained invalid UTF-8: {err}"
1007 )));
1008 }
1009
1010 Ok(())
1013 }
1014 }
1015}
1016
1017fn parse_responses_body(body: &str) -> anyhow::Result<ResponsesTurnResult> {
1018 let body_trimmed = body.trim_start();
1019 let looks_like_sse = body_trimmed.starts_with("event:") || body_trimmed.starts_with("data:");
1020 if looks_like_sse {
1021 let result = parse_sse_turn(body)?;
1022 return ensure_nonempty_responses_turn(result, || {
1023 let sanitized = super::sanitize_api_error(body);
1024 ::zeroclaw_log::record!(
1025 ERROR,
1026 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1027 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1028 .with_attrs(::serde_json::json!({"payload": &sanitized})),
1029 "openai_codex: empty SSE stream payload"
1030 );
1031 anyhow::Error::msg(format!(
1032 "No response from OpenAI Codex stream payload: {sanitized}"
1033 ))
1034 });
1035 }
1036
1037 let parsed: ResponsesResponse = serde_json::from_str(body).map_err(|err| {
1038 let sanitized = super::sanitize_api_error(body);
1039 ::zeroclaw_log::record!(
1040 ERROR,
1041 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1042 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1043 .with_attrs(::serde_json::json!({
1044 "payload": &sanitized,
1045 "error": format!("{}", err),
1046 })),
1047 "openai_codex: JSON parse failed"
1048 );
1049 anyhow::Error::msg(format!(
1050 "OpenAI Codex JSON parse failed: {err}. Payload: {sanitized}"
1051 ))
1052 })?;
1053 let result = responses_turn_from_response(&parsed);
1054 ensure_nonempty_responses_turn(result, || {
1055 let sanitized = super::sanitize_api_error(body);
1056 ::zeroclaw_log::record!(
1057 ERROR,
1058 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1059 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1060 .with_attrs(::serde_json::json!({"payload": &sanitized})),
1061 "openai_codex: empty response"
1062 );
1063 anyhow::Error::msg(format!("No response from OpenAI Codex: {sanitized}"))
1064 })
1065}
1066
1067async fn decode_responses_body(response: reqwest::Response) -> anyhow::Result<ResponsesTurnResult> {
1074 let mut body = String::new();
1075 let mut pending_utf8 = Vec::new();
1076 let mut stream = response.bytes_stream();
1077
1078 while let Some(chunk) = stream.next().await {
1079 let bytes = chunk.map_err(|err| {
1080 ::zeroclaw_log::record!(
1081 ERROR,
1082 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1083 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1084 .with_attrs(::serde_json::json!({
1085 "phase": "stream_read",
1086 "error": format!("{}", err),
1087 })),
1088 "openai_codex: error reading response stream"
1089 );
1090 anyhow::Error::msg(format!("error reading OpenAI Codex response stream: {err}"))
1091 })?;
1092 append_utf8_stream_chunk(&mut body, &mut pending_utf8, &bytes)?;
1093 }
1094
1095 if !pending_utf8.is_empty() {
1096 let err = match std::str::from_utf8(&pending_utf8) {
1097 Err(e) => e,
1098 Ok(_) => {
1099 ::zeroclaw_log::record!(
1104 ERROR,
1105 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1106 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1107 "openai_codex: pending bytes were valid UTF-8 (invariant violated)"
1108 );
1109 return Err(anyhow::Error::msg(
1110 "OpenAI Codex response stream ended with valid UTF-8 in pending bytes (unexpected)",
1111 ));
1112 }
1113 };
1114 ::zeroclaw_log::record!(
1115 ERROR,
1116 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1117 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1118 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
1119 "openai_codex: response ended with incomplete UTF-8"
1120 );
1121 return Err(anyhow::Error::msg(format!(
1122 "OpenAI Codex response ended with incomplete UTF-8: {err}"
1123 )));
1124 }
1125
1126 parse_responses_body(&body)
1127}
1128
1129struct ResolvedCodexCredentials {
1130 bearer_token: String,
1131 account_id: Option<String>,
1132 access_token: Option<String>,
1133 use_gateway_api_key_auth: bool,
1134}
1135
1136impl OpenAiCodexModelProvider {
1137 async fn resolve_credentials(&self) -> anyhow::Result<ResolvedCodexCredentials> {
1138 let use_gateway_api_key_auth = self.custom_endpoint && self.gateway_api_key.is_some();
1139
1140 let profile = match self
1141 .auth
1142 .get_profile("openai-codex", self.auth_profile_override.as_deref())
1143 .await
1144 {
1145 Ok(profile) => profile,
1146 Err(err) if use_gateway_api_key_auth => {
1147 ::zeroclaw_log::record!(
1148 WARN,
1149 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1150 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1151 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
1152 "failed to load OpenAI Codex profile; continuing with custom endpoint API key mode"
1153 );
1154 None
1155 }
1156 Err(err) => return Err(err),
1157 };
1158
1159 let oauth_access_token = match self
1160 .auth
1161 .get_valid_openai_access_token(self.auth_profile_override.as_deref())
1162 .await
1163 {
1164 Ok(token) => token,
1165 Err(err) if use_gateway_api_key_auth => {
1166 ::zeroclaw_log::record!(
1167 WARN,
1168 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1169 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1170 .with_attrs(::serde_json::json!({"error": format!("{}", err)})),
1171 "failed to refresh OpenAI token; continuing with custom endpoint API key mode"
1172 );
1173 None
1174 }
1175 Err(err) => return Err(err),
1176 };
1177
1178 let account_id = profile.and_then(|p| p.account_id).or_else(|| {
1179 oauth_access_token
1180 .as_deref()
1181 .and_then(extract_account_id_from_jwt)
1182 });
1183
1184 let access_token = if use_gateway_api_key_auth {
1185 oauth_access_token
1186 } else {
1187 Some(oauth_access_token.ok_or_else(|| {
1188 ::zeroclaw_log::record!(
1189 ERROR,
1190 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
1191 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1192 .with_attrs(::serde_json::json!({"missing": "oauth_access_token"})),
1193 "openai_codex: auth profile not found"
1194 );
1195 anyhow::Error::msg(
1196 "OpenAI Codex auth profile not found. Run `zeroclaw auth login --provider openai-codex`.",
1197 )
1198 })?)
1199 };
1200
1201 let account_id = if use_gateway_api_key_auth {
1202 account_id
1203 } else {
1204 Some(account_id.ok_or_else(|| {
1205 ::zeroclaw_log::record!(
1206 ERROR,
1207 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
1208 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1209 .with_attrs(::serde_json::json!({"missing": "account_id"})),
1210 "openai_codex: account_id not found in profile/token"
1211 );
1212 anyhow::Error::msg(
1213 "OpenAI Codex account id not found in auth profile/token. Run `zeroclaw auth login --provider openai-codex` again.",
1214 )
1215 })?)
1216 };
1217
1218 let bearer_token = if use_gateway_api_key_auth {
1219 self.gateway_api_key.clone().unwrap_or_default()
1220 } else {
1221 access_token.clone().unwrap_or_default()
1222 };
1223
1224 Ok(ResolvedCodexCredentials {
1225 bearer_token,
1226 account_id,
1227 access_token,
1228 use_gateway_api_key_auth,
1229 })
1230 }
1231
1232 fn responses_request_builder(
1233 &self,
1234 bearer_token: &str,
1235 account_id: Option<&str>,
1236 access_token: Option<&str>,
1237 use_gateway_api_key_auth: bool,
1238 request: &ResponsesRequest,
1239 ) -> reqwest::RequestBuilder {
1240 let mut request_builder = self
1241 .client
1242 .post(&self.responses_url)
1243 .header("Authorization", format!("Bearer {bearer_token}"))
1244 .header("OpenAI-Beta", "responses=experimental")
1245 .header("originator", "pi")
1246 .header("Content-Type", "application/json");
1247
1248 if request.stream {
1249 request_builder = request_builder.header("accept", "text/event-stream");
1250 }
1251
1252 if let Some(account_id) = account_id {
1253 request_builder = request_builder.header("chatgpt-account-id", account_id);
1254 }
1255
1256 if use_gateway_api_key_auth {
1257 if let Some(access_token) = access_token {
1258 request_builder = request_builder.header("x-openai-access-token", access_token);
1259 }
1260 if let Some(account_id) = account_id {
1261 request_builder = request_builder.header("x-openai-account-id", account_id);
1262 }
1263 }
1264
1265 request_builder
1266 }
1267
1268 async fn send_responses_request(
1269 &self,
1270 input: Vec<Value>,
1271 instructions: String,
1272 tools: Option<Vec<ResponsesToolSpec>>,
1273 model: &str,
1274 ) -> anyhow::Result<ResponsesTurnResult> {
1275 let creds = self.resolve_credentials().await?;
1276 let normalized_model = normalize_model_id(model);
1277
1278 let has_tools = tools.is_some();
1279 let mut request = ResponsesRequest {
1280 model: normalized_model.to_string(),
1281 input,
1282 instructions,
1283 store: false,
1284 stream: true,
1285 text: ResponsesTextOptions {
1286 verbosity: "medium".to_string(),
1287 },
1288 reasoning: ResponsesReasoningOptions {
1289 effort: resolve_reasoning_effort(
1290 normalized_model,
1291 self.reasoning_effort.as_deref(),
1292 ),
1293 summary: "auto".to_string(),
1294 },
1295 include: vec!["reasoning.encrypted_content".to_string()],
1296 tools,
1297 tool_choice: has_tools.then(|| "auto".to_string()),
1298 parallel_tool_calls: has_tools.then_some(true),
1299 };
1300
1301 let request_builder = self.responses_request_builder(
1302 &creds.bearer_token,
1303 creds.account_id.as_deref(),
1304 creds.access_token.as_deref(),
1305 creds.use_gateway_api_key_auth,
1306 &request,
1307 );
1308
1309 let response = request_builder.json(&request).send().await?;
1310
1311 if !response.status().is_success() {
1312 return Err(super::api_error("OpenAI Codex", response).await);
1313 }
1314
1315 match decode_responses_body(response).await {
1316 Ok(result) => Ok(result),
1317 Err(stream_err) => {
1318 if stream_err
1319 .downcast_ref::<ResponsesStreamApiError>()
1320 .is_some()
1321 {
1322 return Err(stream_err);
1323 }
1324
1325 ::zeroclaw_log::record!(
1326 WARN,
1327 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1328 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1329 .with_attrs(::serde_json::json!({"error": format!("{}", stream_err)})),
1330 "OpenAI Codex streaming response decode failed, retrying without streaming"
1331 );
1332
1333 request.stream = false;
1334 let non_streaming_response = self
1335 .responses_request_builder(
1336 &creds.bearer_token,
1337 creds.account_id.as_deref(),
1338 creds.access_token.as_deref(),
1339 creds.use_gateway_api_key_auth,
1340 &request,
1341 )
1342 .json(&request)
1343 .send()
1344 .await?;
1345
1346 if !non_streaming_response.status().is_success() {
1347 return Err(super::api_error("OpenAI Codex", non_streaming_response).await);
1348 }
1349
1350 decode_responses_body(non_streaming_response)
1351 .await
1352 .map_err(|fallback_err| {
1353 ::zeroclaw_log::record!(
1354 ERROR,
1355 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1356 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1357 .with_attrs(::serde_json::json!({
1358 "stream_err": format!("{}", stream_err),
1359 "fallback_err": format!("{}", fallback_err),
1360 })),
1361 "openai_codex: stream + non-stream fallback both failed"
1362 );
1363 anyhow::Error::msg(format!(
1364 "OpenAI Codex streaming response decode failed ({stream_err}); non-streaming retry failed ({fallback_err})"
1365 ))
1366 })
1367 }
1368 }
1369 }
1370}
1371
1372#[async_trait]
1373impl ModelProvider for OpenAiCodexModelProvider {
1374 fn default_wire_api(&self) -> &str {
1376 WIRE_API
1377 }
1378
1379 fn default_base_url(&self) -> Option<&str> {
1380 Some(DEFAULT_CODEX_RESPONSES_URL)
1381 }
1382
1383 fn capabilities(&self) -> ProviderCapabilities {
1384 ProviderCapabilities {
1385 native_tool_calling: true,
1386 vision: true,
1387 prompt_caching: false,
1388 extended_thinking: false,
1389 }
1390 }
1391
1392 async fn chat_with_system(
1393 &self,
1394 system_prompt: Option<&str>,
1395 message: &str,
1396 model: &str,
1397 _temperature: Option<f64>,
1398 ) -> anyhow::Result<String> {
1399 let mut messages = Vec::new();
1401 if let Some(sys) = system_prompt {
1402 messages.push(ChatMessage::system(sys));
1403 }
1404 messages.push(ChatMessage::user(message));
1405
1406 let config = zeroclaw_config::schema::MultimodalConfig::default();
1408 let prepared = crate::multimodal::prepare_messages_for_provider(&messages, &config).await?;
1409
1410 let (instructions, input) = build_responses_input(&prepared.messages);
1411 self.send_responses_request(input, instructions, None, model)
1412 .await
1413 .map(|response| response.text.unwrap_or_default())
1414 }
1415
1416 async fn chat_with_history(
1417 &self,
1418 messages: &[ChatMessage],
1419 model: &str,
1420 _temperature: Option<f64>,
1421 ) -> anyhow::Result<String> {
1422 let config = zeroclaw_config::schema::MultimodalConfig::default();
1424 let prepared = crate::multimodal::prepare_messages_for_provider(messages, &config).await?;
1425
1426 let (instructions, input) = build_responses_input(&prepared.messages);
1427 self.send_responses_request(input, instructions, None, model)
1428 .await
1429 .map(|response| response.text.unwrap_or_default())
1430 }
1431
1432 async fn chat(
1433 &self,
1434 request: ProviderChatRequest<'_>,
1435 model: &str,
1436 _temperature: Option<f64>,
1437 ) -> anyhow::Result<ProviderChatResponse> {
1438 let config = zeroclaw_config::schema::MultimodalConfig::default();
1439 let prepared =
1440 crate::multimodal::prepare_messages_for_provider(request.messages, &config).await?;
1441 let (instructions, input) = build_responses_input(&prepared.messages);
1442 let response = self
1443 .send_responses_request(input, instructions, convert_tools(request.tools), model)
1444 .await?;
1445
1446 Ok(ProviderChatResponse {
1447 text: response.text,
1448 tool_calls: response.tool_calls,
1449 usage: None,
1450 reasoning_content: response.reasoning_content,
1451 })
1452 }
1453
1454 fn supports_streaming(&self) -> bool {
1455 true
1456 }
1457
1458 fn supports_streaming_tool_events(&self) -> bool {
1459 true
1460 }
1461
1462 fn stream_chat(
1463 &self,
1464 request: ProviderChatRequest<'_>,
1465 model: &str,
1466 _temperature: Option<f64>,
1467 options: StreamOptions,
1468 ) -> stream::BoxStream<'static, StreamResult<StreamEvent>> {
1469 if !options.enabled {
1470 return stream::once(async { Ok(StreamEvent::Final) }).boxed();
1471 }
1472
1473 let provider = self.clone();
1474 let messages = request.messages.to_vec();
1475 let tools = request.tools.map(|items| items.to_vec());
1476 let model = model.to_string();
1477 let count_tokens = options.count_tokens;
1478 let (tx, rx) = tokio::sync::mpsc::channel::<StreamResult<StreamEvent>>(100);
1479
1480 let handle = ::zeroclaw_spawn::spawn!(async move {
1481 let config = zeroclaw_config::schema::MultimodalConfig::default();
1482 let prepared =
1483 match crate::multimodal::prepare_messages_for_provider(&messages, &config).await {
1484 Ok(prepared) => prepared,
1485 Err(err) => {
1486 let _ = tx
1487 .send(Err(StreamError::ModelProvider(err.to_string())))
1488 .await;
1489 return;
1490 }
1491 };
1492
1493 let creds = match provider.resolve_credentials().await {
1494 Ok(c) => c,
1495 Err(err) => {
1496 let _ = tx
1497 .send(Err(StreamError::ModelProvider(err.to_string())))
1498 .await;
1499 return;
1500 }
1501 };
1502
1503 let (instructions, input) = build_responses_input(&prepared.messages);
1504 let normalized_model = normalize_model_id(&model);
1505 let tools = convert_tools(tools.as_deref());
1506 let has_tools = tools.is_some();
1507 let request = ResponsesRequest {
1508 model: normalized_model.to_string(),
1509 input,
1510 instructions,
1511 store: false,
1512 stream: true,
1513 text: ResponsesTextOptions {
1514 verbosity: "medium".to_string(),
1515 },
1516 reasoning: ResponsesReasoningOptions {
1517 effort: resolve_reasoning_effort(
1518 normalized_model,
1519 provider.reasoning_effort.as_deref(),
1520 ),
1521 summary: "auto".to_string(),
1522 },
1523 include: vec!["reasoning.encrypted_content".to_string()],
1524 tools,
1525 tool_choice: has_tools.then(|| "auto".to_string()),
1526 parallel_tool_calls: has_tools.then_some(true),
1527 };
1528
1529 let request_builder = provider
1530 .responses_request_builder(
1531 &creds.bearer_token,
1532 creds.account_id.as_deref(),
1533 creds.access_token.as_deref(),
1534 creds.use_gateway_api_key_auth,
1535 &request,
1536 )
1537 .json(&request);
1538
1539 crate::openai::run_responses_sse(request_builder, &tx, count_tokens).await;
1540 });
1541
1542 let guard = AbortOnDrop::new(handle.abort_handle());
1543 stream::unfold((rx, guard), |(mut rx, guard)| async move {
1544 rx.recv().await.map(|event| (event, (rx, guard)))
1545 })
1546 .boxed()
1547 }
1548}
1549
1550impl ::zeroclaw_api::attribution::Attributable for OpenAiCodexModelProvider {
1551 fn role(&self) -> ::zeroclaw_api::attribution::Role {
1552 ::zeroclaw_api::attribution::Role::Provider(
1553 ::zeroclaw_api::attribution::ProviderKind::Model(
1554 ::zeroclaw_api::attribution::ModelProviderKind::OpenAiCodex,
1555 ),
1556 )
1557 }
1558 fn alias(&self) -> &str {
1559 &self.alias
1560 }
1561}
1562
1563#[cfg(test)]
1564mod tests {
1565 use super::*;
1566
1567 enum MockCodexReply {
1568 Sse(&'static str),
1569 Json(serde_json::Value),
1570 Status(axum::http::StatusCode, &'static str),
1571 }
1572
1573 async fn mock_codex_provider(
1574 replies: Vec<MockCodexReply>,
1575 ) -> (
1576 OpenAiCodexModelProvider,
1577 std::sync::Arc<std::sync::Mutex<Vec<serde_json::Value>>>,
1578 tokio::task::JoinHandle<()>,
1579 tempfile::TempDir,
1580 ) {
1581 use axum::http::header;
1582 use axum::response::IntoResponse;
1583 use axum::{Json, Router, routing::post};
1584 use std::collections::VecDeque;
1585 use std::sync::{Arc, Mutex};
1586 use tokio::net::TcpListener;
1587
1588 let captured: Arc<Mutex<Vec<serde_json::Value>>> = Arc::new(Mutex::new(Vec::new()));
1589 let captured_clone = Arc::clone(&captured);
1590 let replies = Arc::new(Mutex::new(VecDeque::from(replies)));
1591 let replies_clone = Arc::clone(&replies);
1592
1593 let app = Router::new().route(
1594 "/responses",
1595 post(move |Json(body): Json<serde_json::Value>| {
1596 let captured = Arc::clone(&captured_clone);
1597 let replies = Arc::clone(&replies_clone);
1598 async move {
1599 captured.lock().unwrap().push(body);
1600 match replies
1601 .lock()
1602 .unwrap()
1603 .pop_front()
1604 .unwrap_or(MockCodexReply::Status(
1605 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
1606 "",
1607 )) {
1608 MockCodexReply::Sse(body) => (
1609 axum::http::StatusCode::OK,
1610 [(header::CONTENT_TYPE, "text/event-stream")],
1611 body.to_string(),
1612 )
1613 .into_response(),
1614 MockCodexReply::Json(body) => Json(body).into_response(),
1615 MockCodexReply::Status(status, body) => {
1616 (status, body.to_string()).into_response()
1617 }
1618 }
1619 }
1620 }),
1621 );
1622
1623 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1624 let addr = listener.local_addr().unwrap();
1625 let server_handle = zeroclaw_spawn::spawn!(async move {
1626 axum::serve(listener, app).await.unwrap();
1627 });
1628
1629 let temp_dir = tempfile::tempdir().unwrap();
1630 let options = ModelProviderRuntimeOptions {
1631 provider_api_url: Some(format!("http://{addr}")),
1632 zeroclaw_dir: Some(temp_dir.path().to_path_buf()),
1633 secrets_encrypt: false,
1634 ..ModelProviderRuntimeOptions::default()
1635 };
1636 let provider = OpenAiCodexModelProvider::new("test", &options, Some("test-key")).unwrap();
1637
1638 (provider, captured, server_handle, temp_dir)
1639 }
1640
1641 #[test]
1642 fn extracts_output_text_first() {
1643 let response = ResponsesResponse {
1644 output: vec![],
1645 output_text: Some("hello".into()),
1646 };
1647 assert_eq!(extract_responses_text(&response).as_deref(), Some("hello"));
1648 }
1649
1650 #[test]
1651 fn extracts_nested_output_text() {
1652 let response = ResponsesResponse {
1653 output: vec![serde_json::json!({
1654 "type": "message",
1655 "role": "assistant",
1656 "content": [
1657 {
1658 "type": "output_text",
1659 "text": "nested"
1660 }
1661 ]
1662 })],
1663 output_text: None,
1664 };
1665 assert_eq!(extract_responses_text(&response).as_deref(), Some("nested"));
1666 }
1667
1668 #[test]
1669 fn default_state_dir_is_non_empty() {
1670 let path = default_zeroclaw_dir();
1671 assert!(!path.as_os_str().is_empty());
1672 }
1673
1674 #[test]
1675 fn build_responses_url_appends_suffix_for_base_url() {
1676 assert_eq!(
1677 build_responses_url("https://api.tonsof.blue/v1").unwrap(),
1678 "https://api.tonsof.blue/v1/responses"
1679 );
1680 }
1681
1682 #[test]
1683 fn build_responses_url_keeps_existing_responses_endpoint() {
1684 assert_eq!(
1685 build_responses_url("https://api.tonsof.blue/v1/responses").unwrap(),
1686 "https://api.tonsof.blue/v1/responses"
1687 );
1688 }
1689
1690 #[test]
1691 fn resolve_responses_url_uses_provider_api_url_override() {
1692 let options = ModelProviderRuntimeOptions {
1693 provider_api_url: Some("https://proxy.example.com/v1".to_string()),
1694 ..ModelProviderRuntimeOptions::default()
1695 };
1696
1697 assert_eq!(
1698 resolve_responses_url(&options).unwrap(),
1699 "https://proxy.example.com/v1/responses"
1700 );
1701 }
1702
1703 #[test]
1704 fn default_responses_url_detector_handles_equivalent_urls() {
1705 assert!(is_default_responses_url(DEFAULT_CODEX_RESPONSES_URL));
1706 assert!(is_default_responses_url(
1707 "https://chatgpt.com/backend-api/codex/responses/"
1708 ));
1709 assert!(!is_default_responses_url(
1710 "https://api.tonsof.blue/v1/responses"
1711 ));
1712 }
1713
1714 #[test]
1715 fn constructor_enables_custom_endpoint_key_mode() {
1716 let options = ModelProviderRuntimeOptions {
1717 provider_api_url: Some("https://api.tonsof.blue/v1".to_string()),
1718 ..ModelProviderRuntimeOptions::default()
1719 };
1720
1721 let provider = OpenAiCodexModelProvider::new("test", &options, Some("test-key")).unwrap();
1722 assert!(provider.custom_endpoint);
1723 assert_eq!(provider.gateway_api_key.as_deref(), Some("test-key"));
1724 }
1725
1726 #[tokio::test]
1727 async fn codex_retries_non_streaming_when_stream_decode_fails() {
1728 let (provider, captured, server_handle, _temp_dir) = mock_codex_provider(vec![
1729 MockCodexReply::Sse("data: not-json\n\ndata: [DONE]\n"),
1730 MockCodexReply::Json(serde_json::json!({
1731 "output_text": "fallback ok",
1732 "output": []
1733 })),
1734 ])
1735 .await;
1736
1737 let messages = vec![ChatMessage::user("hello")];
1738 let response = provider
1739 .chat(
1740 ProviderChatRequest {
1741 messages: &messages,
1742 tools: None,
1743 thinking: None,
1744 },
1745 "gpt-5-codex",
1746 None,
1747 )
1748 .await
1749 .expect("provider should retry with stream=false after streaming decode failure");
1750
1751 assert_eq!(response.text.as_deref(), Some("fallback ok"));
1752
1753 let requests = captured.lock().unwrap();
1754 assert_eq!(requests.len(), 2, "expected one retry request");
1755 assert_eq!(requests[0]["stream"], true);
1756 assert_eq!(requests[1]["stream"], false);
1757
1758 server_handle.abort();
1759 }
1760
1761 #[tokio::test]
1762 async fn codex_retries_non_streaming_when_stream_contains_malformed_frame_after_text() {
1763 let (provider, captured, server_handle, _temp_dir) = mock_codex_provider(vec![
1764 MockCodexReply::Sse(
1765 "data: {\"type\":\"response.output_text.delta\",\"delta\":\"partial\"}\ndata: not-json\n\ndata: [DONE]\n",
1766 ),
1767 MockCodexReply::Json(serde_json::json!({
1768 "output_text": "fallback after partial",
1769 "output": []
1770 })),
1771 ])
1772 .await;
1773
1774 let messages = vec![ChatMessage::user("hello")];
1775 let response = provider
1776 .chat(
1777 ProviderChatRequest {
1778 messages: &messages,
1779 tools: None,
1780 thinking: None,
1781 },
1782 "gpt-5-codex",
1783 None,
1784 )
1785 .await
1786 .expect("provider should retry after malformed stream frame");
1787
1788 assert_eq!(response.text.as_deref(), Some("fallback after partial"));
1789
1790 let requests = captured.lock().unwrap();
1791 assert_eq!(requests.len(), 2, "expected one retry request");
1792 assert_eq!(requests[0]["stream"], true);
1793 assert_eq!(requests[1]["stream"], false);
1794
1795 server_handle.abort();
1796 }
1797
1798 #[tokio::test]
1799 async fn codex_does_not_retry_stream_api_error_events() {
1800 let (provider, captured, server_handle, _temp_dir) = mock_codex_provider(vec![
1801 MockCodexReply::Sse(
1802 "data: {\"type\":\"response.failed\",\"response\":{\"error\":{\"message\":\"quota exceeded\"}}}\n\ndata: [DONE]\n",
1803 ),
1804 ])
1805 .await;
1806
1807 let messages = vec![ChatMessage::user("hello")];
1808 let err = provider
1809 .chat(
1810 ProviderChatRequest {
1811 messages: &messages,
1812 tools: None,
1813 thinking: None,
1814 },
1815 "gpt-5-codex",
1816 None,
1817 )
1818 .await
1819 .expect_err("stream API errors should not be retried");
1820
1821 assert!(
1822 err.to_string()
1823 .contains("OpenAI responses stream error: quota exceeded"),
1824 "{err}"
1825 );
1826 assert_eq!(captured.lock().unwrap().len(), 1);
1827
1828 server_handle.abort();
1829 }
1830
1831 #[tokio::test]
1832 async fn codex_does_not_retry_failed_http_status() {
1833 let (provider, captured, server_handle, _temp_dir) =
1834 mock_codex_provider(vec![MockCodexReply::Status(
1835 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
1836 "server down",
1837 )])
1838 .await;
1839
1840 let messages = vec![ChatMessage::user("hello")];
1841 provider
1842 .chat(
1843 ProviderChatRequest {
1844 messages: &messages,
1845 tools: None,
1846 thinking: None,
1847 },
1848 "gpt-5-codex",
1849 None,
1850 )
1851 .await
1852 .expect_err("HTTP errors should not be retried");
1853
1854 assert_eq!(captured.lock().unwrap().len(), 1);
1855
1856 server_handle.abort();
1857 }
1858
1859 #[test]
1860 fn clamp_reasoning_effort_adjusts_known_models() {
1861 assert_eq!(
1862 clamp_reasoning_effort("gpt-5-codex", "xhigh"),
1863 "high".to_string()
1864 );
1865 assert_eq!(
1866 clamp_reasoning_effort("gpt-5-codex", "minimal"),
1867 "low".to_string()
1868 );
1869 assert_eq!(
1870 clamp_reasoning_effort("gpt-5-codex", "medium"),
1871 "medium".to_string()
1872 );
1873 assert_eq!(
1874 clamp_reasoning_effort("gpt-5.3-codex", "minimal"),
1875 "low".to_string()
1876 );
1877 assert_eq!(
1878 clamp_reasoning_effort("gpt-5.1", "xhigh"),
1879 "high".to_string()
1880 );
1881 assert_eq!(
1882 clamp_reasoning_effort("gpt-5-codex", "xhigh"),
1883 "high".to_string()
1884 );
1885 assert_eq!(
1886 clamp_reasoning_effort("gpt-5.1-codex-mini", "low"),
1887 "medium".to_string()
1888 );
1889 assert_eq!(
1890 clamp_reasoning_effort("gpt-5.1-codex-mini", "xhigh"),
1891 "high".to_string()
1892 );
1893 assert_eq!(
1894 clamp_reasoning_effort("gpt-5.3-codex", "xhigh"),
1895 "xhigh".to_string()
1896 );
1897 }
1898
1899 #[test]
1900 fn resolve_reasoning_effort_prefers_configured_override() {
1901 assert_eq!(
1903 resolve_reasoning_effort("gpt-5-codex", Some("high")),
1904 "high".to_string()
1905 );
1906 }
1907
1908 #[test]
1909 fn resolve_reasoning_effort_defaults_when_unconfigured() {
1910 assert_eq!(
1911 resolve_reasoning_effort("gpt-5-codex", None),
1912 "high".to_string()
1913 );
1914 }
1915
1916 #[test]
1917 fn parse_sse_turn_reads_output_text_delta() {
1918 let payload = r#"data: {"type":"response.created","response":{"id":"resp_123"}}
1919
1920data: {"type":"response.output_text.delta","delta":"Hello"}
1921data: {"type":"response.output_text.delta","delta":" world"}
1922data: {"type":"response.completed","response":{"output_text":"Hello world"}}
1923data: [DONE]
1924"#;
1925
1926 assert_eq!(
1927 parse_sse_turn(payload).unwrap().text.as_deref(),
1928 Some("Hello world")
1929 );
1930 }
1931
1932 #[test]
1933 fn parse_sse_turn_falls_back_to_completed_response() {
1934 let payload = r#"data: {"type":"response.completed","response":{"output_text":"Done"}}
1935data: [DONE]
1936"#;
1937
1938 assert_eq!(
1939 parse_sse_turn(payload).unwrap().text.as_deref(),
1940 Some("Done")
1941 );
1942 }
1943
1944 #[test]
1945 fn parse_responses_body_rejects_unrecognized_sse_without_payload() {
1946 let payload = r#"data: not-json
1947data: [DONE]
1948"#;
1949
1950 let err = parse_responses_body(payload).expect_err("empty SSE should fail closed");
1951 assert!(
1952 err.to_string()
1953 .contains("OpenAI Codex SSE data parse failed"),
1954 "{err}"
1955 );
1956 }
1957
1958 #[test]
1959 fn parse_responses_body_rejects_json_without_text_or_tool_calls() {
1960 let payload = r#"{"output":[]}"#;
1961
1962 let err = parse_responses_body(payload).expect_err("empty JSON should fail closed");
1963 assert!(
1964 err.to_string().contains("No response from OpenAI Codex"),
1965 "{err}"
1966 );
1967 }
1968
1969 #[test]
1970 fn parse_responses_body_allows_sse_markers_inside_json_text() {
1971 let payload = serde_json::json!({
1972 "output_text": "Example SSE frame:\ndata: {\"type\":\"example\"}\nevent: response.done",
1973 "output": []
1974 })
1975 .to_string();
1976
1977 let result = parse_responses_body(&payload).expect("JSON text should not be parsed as SSE");
1978 assert_eq!(
1979 result.text.as_deref(),
1980 Some("Example SSE frame:\ndata: {\"type\":\"example\"}\nevent: response.done")
1981 );
1982 assert!(result.tool_calls.is_empty());
1983 }
1984
1985 #[test]
1986 fn parse_responses_body_preserves_reasoning_items_for_tool_calls() {
1987 let payload = serde_json::json!({
1988 "output": [
1989 {
1990 "type": "reasoning",
1991 "id": "rs_1",
1992 "summary": [],
1993 "encrypted_content": "enc_reasoning"
1994 },
1995 {
1996 "type": "function_call",
1997 "id": "fc_1",
1998 "call_id": "call_1",
1999 "name": "shell",
2000 "arguments": "{\"command\":\"pwd\"}",
2001 "status": "completed"
2002 }
2003 ]
2004 })
2005 .to_string();
2006
2007 let result = parse_responses_body(&payload).expect("tool call response should parse");
2008 assert_eq!(result.tool_calls.len(), 1);
2009 assert_eq!(result.tool_calls[0].id, "call_1");
2010
2011 let items = decode_responses_history_items(
2012 result
2013 .reasoning_content
2014 .as_deref()
2015 .expect("Responses history items should be captured"),
2016 )
2017 .expect("history envelope should decode");
2018
2019 assert_eq!(items.len(), 2);
2020 assert_eq!(items[0]["type"], "reasoning");
2021 assert_eq!(items[0]["encrypted_content"], "enc_reasoning");
2022 assert_eq!(items[1]["type"], "function_call");
2023 assert_eq!(items[1]["call_id"], "call_1");
2024 }
2025
2026 #[test]
2027 fn build_responses_input_maps_content_types_by_role() {
2028 let messages = vec![
2029 ChatMessage {
2030 role: "system".into(),
2031 content: "You are helpful.".into(),
2032 },
2033 ChatMessage {
2034 role: "user".into(),
2035 content: "Hi".into(),
2036 },
2037 ChatMessage {
2038 role: "assistant".into(),
2039 content: "Hello!".into(),
2040 },
2041 ChatMessage {
2042 role: "user".into(),
2043 content: "Thanks".into(),
2044 },
2045 ];
2046 let (instructions, input) = build_responses_input(&messages);
2047 assert_eq!(instructions, "You are helpful.");
2048 assert_eq!(input.len(), 3);
2049
2050 let json: Vec<Value> = input
2051 .iter()
2052 .map(|item| serde_json::to_value(item).unwrap())
2053 .collect();
2054 assert_eq!(json[0]["role"], "user");
2055 assert_eq!(json[0]["content"][0]["type"], "input_text");
2056 assert_eq!(json[1]["role"], "assistant");
2057 assert_eq!(json[1]["content"][0]["type"], "output_text");
2058 assert_eq!(json[2]["role"], "user");
2059 assert_eq!(json[2]["content"][0]["type"], "input_text");
2060 }
2061
2062 #[test]
2063 fn build_responses_input_uses_default_instructions_without_system() {
2064 let messages = vec![ChatMessage {
2065 role: "user".into(),
2066 content: "Hello".into(),
2067 }];
2068 let (instructions, input) = build_responses_input(&messages);
2069 assert_eq!(instructions, DEFAULT_CODEX_INSTRUCTIONS);
2070 assert_eq!(input.len(), 1);
2071 }
2072
2073 #[test]
2074 fn build_responses_input_maps_tool_outputs() {
2075 let messages = vec![
2076 ChatMessage {
2077 role: "tool".into(),
2078 content: r#"{"tool_call_id":"call_123","content":"result"}"#.into(),
2079 },
2080 ChatMessage {
2081 role: "user".into(),
2082 content: "Go".into(),
2083 },
2084 ];
2085 let (instructions, input) = build_responses_input(&messages);
2086 assert_eq!(instructions, DEFAULT_CODEX_INSTRUCTIONS);
2087 assert_eq!(input.len(), 2);
2088 assert_eq!(input[0]["type"], "function_call_output");
2089 assert_eq!(input[0]["call_id"], "call_123");
2090 assert_eq!(input[0]["output"], "result");
2091 assert_eq!(input[1]["role"], "user");
2092 }
2093
2094 #[test]
2095 fn build_responses_input_replays_plain_tool_text_without_synthetic_call_id() {
2096 let messages = vec![ChatMessage {
2097 role: "tool".into(),
2098 content: "legacy plain text result".into(),
2099 }];
2100
2101 let (_, input) = build_responses_input(&messages);
2102
2103 assert_eq!(input.len(), 1);
2104 assert_eq!(input[0]["type"], "message");
2105 assert_eq!(input[0]["role"], "user");
2106 assert_eq!(input[0]["content"][0]["type"], "input_text");
2107 assert_eq!(
2108 input[0]["content"][0]["text"],
2109 "Legacy tool output without call_id:\nlegacy plain text result"
2110 );
2111 assert!(input[0].get("call_id").is_none());
2112 }
2113
2114 #[test]
2115 fn build_responses_input_replays_tool_json_without_call_id_as_text() {
2116 let messages = vec![ChatMessage {
2117 role: "tool".into(),
2118 content: r#"{"content":"legacy result","status":"ok"}"#.into(),
2119 }];
2120
2121 let (_, input) = build_responses_input(&messages);
2122
2123 assert_eq!(input.len(), 1);
2124 assert_eq!(input[0]["type"], "message");
2125 assert_eq!(input[0]["role"], "user");
2126 assert_eq!(input[0]["content"][0]["type"], "input_text");
2127 assert_eq!(
2128 input[0]["content"][0]["text"],
2129 r#"Legacy tool output without call_id:
2130{"content":"legacy result","status":"ok"}"#
2131 );
2132 assert!(input[0].get("call_id").is_none());
2133 }
2134
2135 #[test]
2136 fn build_responses_input_replays_blank_tool_call_id_as_legacy_text() {
2137 for raw_id in ["", " "] {
2138 let messages = vec![ChatMessage {
2139 role: "tool".into(),
2140 content: serde_json::json!({
2141 "tool_call_id": raw_id,
2142 "content": "legacy result"
2143 })
2144 .to_string(),
2145 }];
2146
2147 let (_, input) = build_responses_input(&messages);
2148
2149 assert_eq!(input.len(), 1);
2150 assert_eq!(input[0]["type"], "message");
2151 assert_eq!(input[0]["role"], "user");
2152 assert_eq!(input[0]["content"][0]["type"], "input_text");
2153 assert!(input[0].get("call_id").is_none());
2154 assert!(
2155 input[0]["content"][0]["text"]
2156 .as_str()
2157 .unwrap()
2158 .contains("\"legacy result\"")
2159 );
2160 }
2161 }
2162
2163 #[test]
2164 fn build_responses_input_maps_native_assistant_tool_calls() {
2165 let messages = vec![ChatMessage::assistant(
2166 r#"{"content":"Using shell","tool_calls":[{"id":"call_abc","name":"shell","arguments":"{\"command\":\"pwd\"}"}]}"#,
2167 )];
2168 let (_, input) = build_responses_input(&messages);
2169
2170 assert_eq!(input.len(), 2);
2171 assert_eq!(input[0]["type"], "message");
2172 assert_eq!(input[0]["role"], "assistant");
2173 assert_eq!(input[0]["content"][0]["type"], "output_text");
2174 assert_eq!(input[1]["type"], "function_call");
2175 assert_eq!(input[1]["call_id"], "call_abc");
2176 assert_eq!(input[1]["name"], "shell");
2177 }
2178
2179 #[test]
2180 fn build_responses_input_replays_reasoning_item_before_tool_result() {
2181 let reasoning_item = serde_json::json!({
2182 "type": "reasoning",
2183 "id": "rs_1",
2184 "summary": [],
2185 "encrypted_content": "enc_reasoning"
2186 });
2187 let function_call_item = serde_json::json!({
2188 "type": "function_call",
2189 "id": "fc_1",
2190 "call_id": "call_1",
2191 "name": "shell",
2192 "arguments": "{\"command\":\"pwd\"}",
2193 "status": "completed"
2194 });
2195 let reasoning_content =
2196 encode_responses_history_items(&[reasoning_item, function_call_item], true)
2197 .expect("history envelope should encode");
2198 let messages = vec![
2199 ChatMessage::assistant(
2200 serde_json::json!({
2201 "content": null,
2202 "tool_calls": [
2203 {
2204 "id": "call_1",
2205 "name": "shell",
2206 "arguments": "{\"command\":\"pwd\"}"
2207 }
2208 ],
2209 "reasoning_content": reasoning_content
2210 })
2211 .to_string(),
2212 ),
2213 ChatMessage::tool(
2214 serde_json::json!({
2215 "tool_call_id": "call_1",
2216 "content": "ok"
2217 })
2218 .to_string(),
2219 ),
2220 ];
2221
2222 let (_, input) = build_responses_input(&messages);
2223 assert_eq!(input.len(), 3);
2224 assert_eq!(input[0]["type"], "reasoning");
2225 assert_eq!(input[0]["encrypted_content"], "enc_reasoning");
2226 assert_eq!(input[1]["type"], "function_call");
2227 assert_eq!(input[1]["call_id"], "call_1");
2228 assert_eq!(input[2]["type"], "function_call_output");
2229 assert_eq!(input[2]["call_id"], "call_1");
2230 assert_eq!(input[2]["output"], "ok");
2231 }
2232
2233 #[test]
2234 fn convert_tools_opts_out_of_responses_strict_mode() {
2235 let tools = vec![ToolSpec {
2236 name: "jira".to_string(),
2237 description: "Interact with Jira".to_string(),
2238 parameters: serde_json::json!({
2239 "type": "object",
2240 "properties": {
2241 "action": { "type": "string" },
2242 "issue_key": { "type": "string" }
2243 },
2244 "required": ["action"]
2245 }),
2246 }];
2247
2248 let converted = convert_tools(Some(&tools)).expect("tool should convert");
2249 let value = serde_json::to_value(&converted[0]).expect("tool should serialize");
2250 assert_eq!(value["type"], "function");
2251 assert_eq!(value["name"], "jira");
2252 assert_eq!(value["strict"], false);
2253 assert_eq!(value["parameters"]["required"][0], "action");
2254 }
2255
2256 #[test]
2257 fn parse_sse_turn_collects_function_calls() {
2258 let payload = r#"data: {"type":"response.output_item.added","output_index":0,"item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"shell","arguments":""}}
2259
2260data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","output_index":0,"delta":"{\"command\":\"pw"}
2261data: {"type":"response.function_call_arguments.done","item_id":"fc_1","output_index":0,"name":"shell","arguments":"{\"command\":\"pwd\"}"}
2262data: {"type":"response.completed","response":{"output":[]}}
2263data: [DONE]
2264"#;
2265
2266 let result = parse_sse_turn(payload).unwrap();
2267 assert_eq!(result.tool_calls.len(), 1);
2268 assert_eq!(result.tool_calls[0].id, "call_1");
2269 assert_eq!(result.tool_calls[0].name, "shell");
2270 assert_eq!(result.tool_calls[0].arguments, "{\"command\":\"pwd\"}");
2271 }
2272
2273 #[test]
2274 fn build_responses_input_handles_image_markers() {
2275 let messages = vec![ChatMessage::user(
2276 "Describe this\n\n[IMAGE:data:image/png;base64,abc]",
2277 )];
2278 let (_, input) = build_responses_input(&messages);
2279
2280 assert_eq!(input.len(), 1);
2281 assert_eq!(input[0]["role"], "user");
2282 assert_eq!(input[0]["content"].as_array().unwrap().len(), 2);
2283
2284 let json = input[0]["content"].as_array().unwrap();
2285
2286 assert_eq!(json[0]["type"], "input_text");
2288 assert!(json[0]["text"].as_str().unwrap().contains("Describe this"));
2289
2290 assert_eq!(json[1]["type"], "input_image");
2292 assert_eq!(json[1]["image_url"], "data:image/png;base64,abc");
2293 }
2294
2295 #[test]
2296 fn build_responses_input_preserves_text_only_messages() {
2297 let messages = vec![ChatMessage::user("Hello without images")];
2298 let (_, input) = build_responses_input(&messages);
2299
2300 assert_eq!(input.len(), 1);
2301 assert_eq!(input[0]["content"].as_array().unwrap().len(), 1);
2302
2303 let json = &input[0]["content"][0];
2304 assert_eq!(json["type"], "input_text");
2305 assert_eq!(json["text"], "Hello without images");
2306 }
2307
2308 #[test]
2309 fn build_responses_input_handles_multiple_images() {
2310 let messages = vec![ChatMessage::user(
2311 "Compare these: [IMAGE:data:image/png;base64,img1] and [IMAGE:data:image/jpeg;base64,img2]",
2312 )];
2313 let (_, input) = build_responses_input(&messages);
2314
2315 assert_eq!(input.len(), 1);
2316 assert_eq!(input[0]["content"].as_array().unwrap().len(), 3); let json = input[0]["content"].as_array().unwrap();
2319
2320 assert_eq!(json[0]["type"], "input_text");
2321 assert_eq!(json[1]["type"], "input_image");
2322 assert_eq!(json[2]["type"], "input_image");
2323 }
2324
2325 #[test]
2326 fn capabilities_includes_vision() {
2327 let options = ModelProviderRuntimeOptions {
2328 secrets_encrypt: false,
2329 ..ModelProviderRuntimeOptions::default()
2330 };
2331 let provider = OpenAiCodexModelProvider::new("test", &options, None)
2332 .expect("provider should initialize");
2333 let caps = provider.capabilities();
2334
2335 assert!(caps.native_tool_calling);
2336 assert!(caps.vision);
2337 }
2338
2339 #[test]
2340 fn provider_advertises_streaming_tool_events() {
2341 let provider =
2342 OpenAiCodexModelProvider::new("test", &ModelProviderRuntimeOptions::default(), None)
2343 .expect("provider should initialize");
2344
2345 assert!(provider.supports_streaming());
2346 assert!(provider.supports_streaming_tool_events());
2347 }
2348}