Skip to main content

zeroclaw_providers/
anthropic.rs

1use crate::traits::{
2    ChatMessage, ChatRequest as ProviderChatRequest, ChatResponse as ProviderChatResponse,
3    ModelProvider, ProviderCapabilities, StreamChunk, StreamError, StreamEvent, StreamOptions,
4    StreamResult, TokenUsage, ToolCall as ProviderToolCall,
5};
6use async_trait::async_trait;
7use base64::Engine as _;
8use futures_util::stream::{self, StreamExt};
9use reqwest::Client;
10use serde::{Deserialize, Serialize};
11use zeroclaw_api::tool::ToolSpec;
12
13/// Anthropic's API documentation lists 1.0 as the default sampling temperature.
14const TEMPERATURE_DEFAULT: f64 = 1.0;
15/// Anthropic's public API endpoint. Overrideable via `model_providers.<name>.base_url`.
16pub(crate) const BASE_URL: &str = "https://api.anthropic.com";
17/// Max wait for the next SSE line before the stream is treated as stalled.
18/// reqwest's overall `.timeout()` does not reliably fire once a streaming body
19/// is being drained chunk-by-chunk, so a connection that goes silent after
20/// `message_start` (proxy/load-balancer hiccup) parks `next_line().await`
21/// forever — the detached parser task leaks and the turn hangs on "working".
22/// A per-line idle bound converts that into a retryable `StreamError`.
23const SSE_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(90);
24
25use crate::stream_guard::AbortOnDrop;
26
27pub struct AnthropicModelProvider {
28    /// `[providers.models.anthropic.<alias>]` config-key alias.
29    alias: String,
30    credential: Option<String>,
31    base_url: String,
32    max_tokens: u32,
33}
34
35#[cfg(test)]
36#[derive(Debug, Serialize)]
37struct ChatRequest {
38    model: String,
39    max_tokens: u32,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    system: Option<String>,
42    messages: Vec<Message>,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    temperature: Option<f64>,
45}
46
47#[cfg(test)]
48#[derive(Debug, Serialize)]
49struct Message {
50    role: String,
51    content: String,
52}
53
54#[cfg(test)]
55#[derive(Debug, Deserialize)]
56struct ChatResponse {
57    content: Vec<ContentBlock>,
58}
59
60#[cfg(test)]
61#[derive(Debug, Deserialize)]
62struct ContentBlock {
63    #[serde(rename = "type")]
64    kind: String,
65    #[serde(default)]
66    text: Option<String>,
67}
68
69#[derive(Debug, Serialize)]
70struct NativeChatRequest<'a> {
71    model: String,
72    max_tokens: u32,
73    #[serde(skip_serializing_if = "Option::is_none")]
74    system: Option<SystemPrompt>,
75    messages: Vec<NativeMessage>,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    temperature: Option<f64>,
78    #[serde(skip_serializing_if = "Option::is_none")]
79    tools: Option<Vec<NativeToolSpec<'a>>>,
80    #[serde(skip_serializing_if = "Option::is_none")]
81    tool_choice: Option<serde_json::Value>,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    stream: Option<bool>,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    thinking: Option<NativeThinkingConfig>,
86}
87
88#[derive(Debug, Serialize)]
89struct NativeThinkingConfig {
90    #[serde(rename = "type")]
91    kind: &'static str,
92    budget_tokens: u32,
93}
94
95/// Whether a model accepts the fixed-budget native-thinking request shape
96/// (`{"thinking": {"type": "enabled", "budget_tokens": N}}`). Opus 4.7 supports
97/// only adaptive thinking and rejects fixed budgets with a 400; until adaptive
98/// thinking is implemented, those models stay on prompt-based reasoning.
99/// Anthropic's extended-thinking docs:
100/// <https://platform.claude.com/docs/en/build-with-claude/extended-thinking>
101fn anthropic_model_supports_native_thinking(model: &str) -> bool {
102    !model.contains("claude-opus-4-7")
103}
104
105#[derive(Debug, Serialize)]
106struct NativeMessage {
107    role: String,
108    content: Vec<NativeContentOut>,
109}
110
111#[derive(Debug, Serialize)]
112struct ImageSource {
113    #[serde(rename = "type")]
114    source_type: String,
115    media_type: String,
116    data: String,
117}
118
119#[derive(Debug, Serialize)]
120#[serde(tag = "type")]
121enum NativeContentOut {
122    #[serde(rename = "text")]
123    Text {
124        text: String,
125        #[serde(skip_serializing_if = "Option::is_none")]
126        cache_control: Option<CacheControl>,
127    },
128    #[serde(rename = "image")]
129    Image { source: ImageSource },
130    #[serde(rename = "tool_use")]
131    ToolUse {
132        id: String,
133        name: String,
134        input: serde_json::Value,
135        #[serde(skip_serializing_if = "Option::is_none")]
136        cache_control: Option<CacheControl>,
137    },
138    #[serde(rename = "tool_result")]
139    ToolResult {
140        tool_use_id: String,
141        content: String,
142        #[serde(skip_serializing_if = "Option::is_none")]
143        cache_control: Option<CacheControl>,
144    },
145    /// Thinking block for round-tripping extended thinking in conversation
146    /// history. Required when thinking is enabled and assistant messages
147    /// contain tool_use blocks.
148    #[serde(rename = "thinking")]
149    Thinking {
150        thinking: String,
151        #[serde(skip_serializing_if = "Option::is_none")]
152        signature: Option<String>,
153    },
154}
155
156#[derive(Debug, Serialize)]
157struct NativeToolSpec<'a> {
158    name: &'a str,
159    description: &'a str,
160    input_schema: &'a serde_json::Value,
161    #[serde(skip_serializing_if = "Option::is_none")]
162    cache_control: Option<CacheControl>,
163}
164
165#[derive(Debug, Clone, Serialize)]
166struct CacheControl {
167    #[serde(rename = "type")]
168    cache_type: String,
169}
170
171impl CacheControl {
172    fn ephemeral() -> Self {
173        Self {
174            cache_type: "ephemeral".to_string(),
175        }
176    }
177}
178
179#[derive(Debug, Serialize)]
180#[serde(untagged)]
181enum SystemPrompt {
182    String(String),
183    Blocks(Vec<SystemBlock>),
184}
185
186#[derive(Debug, Serialize)]
187struct SystemBlock {
188    #[serde(rename = "type")]
189    block_type: String,
190    text: String,
191    #[serde(skip_serializing_if = "Option::is_none")]
192    cache_control: Option<CacheControl>,
193}
194
195#[derive(Debug, Deserialize)]
196struct NativeChatResponse {
197    #[serde(default)]
198    content: Vec<NativeContentIn>,
199    #[serde(default)]
200    usage: Option<AnthropicUsage>,
201}
202
203#[derive(Debug, Deserialize)]
204struct AnthropicUsage {
205    /// Tokens *after* the last cache breakpoint — NOT the total prompt.
206    /// Per Anthropic prompt-caching docs:
207    /// total_input = cache_read + cache_creation + input_tokens.
208    #[serde(default)]
209    input_tokens: Option<u64>,
210    #[serde(default)]
211    output_tokens: Option<u64>,
212    /// Tokens served from the prompt cache this request.
213    #[serde(default)]
214    cache_read_input_tokens: Option<u64>,
215    /// Tokens written to the prompt cache this request (cache miss path).
216    /// Disjoint from `cache_read_input_tokens` and `input_tokens`.
217    #[serde(default)]
218    cache_creation_input_tokens: Option<u64>,
219}
220
221#[derive(Debug, Deserialize)]
222struct NativeContentIn {
223    #[serde(rename = "type")]
224    kind: String,
225    #[serde(default)]
226    text: Option<String>,
227    #[serde(default)]
228    thinking: Option<String>,
229    /// Signature for integrity verification of thinking blocks.
230    #[serde(default)]
231    signature: Option<String>,
232    #[serde(default)]
233    id: Option<String>,
234    #[serde(default)]
235    name: Option<String>,
236    #[serde(default)]
237    input: Option<serde_json::Value>,
238}
239
240impl AnthropicModelProvider {
241    pub fn new(alias: &str, credential: Option<&str>) -> Self {
242        Self::with_base_url(alias, credential, None)
243    }
244
245    pub fn with_base_url(alias: &str, credential: Option<&str>, base_url: Option<&str>) -> Self {
246        let base_url = base_url
247            .map(|u| u.trim_end_matches('/'))
248            .unwrap_or(BASE_URL)
249            .to_string();
250        Self {
251            alias: alias.to_string(),
252            credential: credential
253                .map(str::trim)
254                .filter(|k| !k.is_empty())
255                .map(ToString::to_string),
256            base_url,
257            max_tokens: zeroclaw_api::model_provider::BASELINE_MAX_TOKENS,
258        }
259    }
260
261    /// Override the maximum output tokens for API requests.
262    pub fn with_max_tokens(mut self, max_tokens: u32) -> Self {
263        self.max_tokens = max_tokens;
264        self
265    }
266
267    fn is_setup_token(token: &str) -> bool {
268        token.starts_with("sk-ant-oat01-")
269    }
270
271    fn apply_auth(
272        &self,
273        request: reqwest::RequestBuilder,
274        credential: &str,
275    ) -> reqwest::RequestBuilder {
276        let is_setup = Self::is_setup_token(credential);
277        // Diagnostic for "401 invalid x-api-key" mysteries: when a provider
278        // is sending a credential the upstream rejects, this is the only
279        // line that nails what bytes actually went out. Logs header kind,
280        // length, first 8 chars (enough to identify api03 vs oat01 vs an
281        // accidental enc2: blob) and last 4 (smudge for tail integrity).
282        // No full credential — that stays out of logs.
283        let len = credential.len();
284        let head: String = credential.chars().take(8).collect();
285        let tail: String = credential
286            .chars()
287            .rev()
288            .take(4)
289            .collect::<String>()
290            .chars()
291            .rev()
292            .collect();
293        ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"header": if is_setup { "Authorization" } else { "x-api-key" }, "credential_len": len, "credential_head": head, "credential_tail": tail})), "Anthropic auth header applied");
294        if is_setup {
295            request
296                .header("Authorization", format!("Bearer {credential}"))
297                .header(
298                    "anthropic-beta",
299                    "claude-code-20250219,oauth-2025-04-20,interleaved-thinking-2025-05-14",
300                )
301                .header("anthropic-dangerous-direct-browser-access", "true")
302        } else {
303            request.header("x-api-key", credential)
304        }
305    }
306
307    /// For OAuth tokens, Anthropic requires the system prompt to start with the
308    /// Claude Code identity prefix. This prepends it to any existing system prompt.
309    fn apply_oauth_system_prompt(system: Option<SystemPrompt>) -> Option<SystemPrompt> {
310        let prefix = SystemBlock {
311            block_type: "text".to_string(),
312            text: "You are Claude Code, Anthropic's official CLI for Claude.".to_string(),
313            cache_control: Some(CacheControl::ephemeral()),
314        };
315        match system {
316            Some(SystemPrompt::Blocks(mut blocks)) => {
317                blocks.insert(0, prefix);
318                Some(SystemPrompt::Blocks(blocks))
319            }
320            Some(SystemPrompt::String(s)) => Some(SystemPrompt::Blocks(vec![
321                prefix,
322                SystemBlock {
323                    block_type: "text".to_string(),
324                    text: s,
325                    cache_control: Some(CacheControl::ephemeral()),
326                },
327            ])),
328            None => Some(SystemPrompt::Blocks(vec![prefix])),
329        }
330    }
331
332    /// Cache conversations with more than 1 non-system message (i.e. after first exchange)
333    fn should_cache_conversation(messages: &[ChatMessage]) -> bool {
334        messages.iter().filter(|m| m.role != "system").count() > 1
335    }
336
337    /// Apply cache control to the last message content block
338    fn apply_cache_to_last_message(messages: &mut [NativeMessage]) {
339        if let Some(last_msg) = messages.last_mut()
340            && let Some(last_content) = last_msg.content.last_mut()
341        {
342            match last_content {
343                NativeContentOut::Text { cache_control, .. }
344                | NativeContentOut::ToolResult { cache_control, .. } => {
345                    *cache_control = Some(CacheControl::ephemeral());
346                }
347                NativeContentOut::ToolUse { .. }
348                | NativeContentOut::Image { .. }
349                | NativeContentOut::Thinking { .. } => {}
350            }
351        }
352    }
353
354    fn convert_tools<'a>(tools: Option<&'a [ToolSpec]>) -> Option<Vec<NativeToolSpec<'a>>> {
355        let items = tools?;
356        if items.is_empty() {
357            return None;
358        }
359        let mut native_tools: Vec<NativeToolSpec<'a>> = items
360            .iter()
361            .map(|tool| NativeToolSpec {
362                name: &tool.name,
363                description: &tool.description,
364                input_schema: &tool.parameters,
365                cache_control: None,
366            })
367            .collect();
368
369        // Cache the last tool definition (caches all tools)
370        if let Some(last_tool) = native_tools.last_mut() {
371            last_tool.cache_control = Some(CacheControl::ephemeral());
372        }
373
374        Some(native_tools)
375    }
376
377    fn parse_assistant_tool_call_message(content: &str) -> Option<Vec<NativeContentOut>> {
378        let value = serde_json::from_str::<serde_json::Value>(content).ok()?;
379        let tool_calls = value
380            .get("tool_calls")
381            .and_then(|v| serde_json::from_value::<Vec<ProviderToolCall>>(v.clone()).ok())?;
382
383        let mut blocks = Vec::new();
384
385        // When extended thinking is enabled, assistant messages must start
386        // with thinking blocks (including signatures) before any tool_use
387        // blocks. The reasoning_content field stores JSON-encoded thinking
388        // blocks from the original response.
389        if let Some(reasoning) = value
390            .get("reasoning_content")
391            .and_then(serde_json::Value::as_str)
392            .filter(|r| !r.is_empty())
393        {
394            for part in reasoning.split('\n') {
395                if let Ok(block) = serde_json::from_str::<serde_json::Value>(part) {
396                    let thinking = block
397                        .get("thinking")
398                        .and_then(|t| t.as_str())
399                        .unwrap_or("")
400                        .to_string();
401                    let signature = block
402                        .get("signature")
403                        .and_then(|s| s.as_str())
404                        .filter(|s| !s.is_empty())
405                        .map(|s| s.to_string());
406                    blocks.push(NativeContentOut::Thinking {
407                        thinking,
408                        signature,
409                    });
410                }
411            }
412        }
413
414        if let Some(text) = value
415            .get("content")
416            .and_then(serde_json::Value::as_str)
417            .map(str::trim)
418            .filter(|t| !t.is_empty())
419        {
420            blocks.push(NativeContentOut::Text {
421                text: text.to_string(),
422                cache_control: None,
423            });
424        }
425        for call in tool_calls {
426            let input = serde_json::from_str::<serde_json::Value>(&call.arguments)
427                .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new()));
428            blocks.push(NativeContentOut::ToolUse {
429                id: call.id,
430                name: call.name,
431                input,
432                cache_control: None,
433            });
434        }
435        Some(blocks)
436    }
437
438    fn parse_tool_result_message(content: &str) -> Option<NativeMessage> {
439        let value = serde_json::from_str::<serde_json::Value>(content).ok()?;
440        let tool_use_id = value
441            .get("tool_call_id")
442            .and_then(serde_json::Value::as_str)?
443            .to_string();
444        let result = value
445            .get("content")
446            .and_then(serde_json::Value::as_str)
447            .unwrap_or("")
448            .to_string();
449        Some(NativeMessage {
450            role: "user".to_string(),
451            content: vec![NativeContentOut::ToolResult {
452                tool_use_id,
453                content: result,
454                cache_control: None,
455            }],
456        })
457    }
458
459    fn convert_messages(messages: &[ChatMessage]) -> (Option<SystemPrompt>, Vec<NativeMessage>) {
460        let mut system_text = None;
461        let mut native_messages = Vec::new();
462
463        for msg in messages {
464            match msg.role.as_str() {
465                "system" => {
466                    if system_text.is_none() {
467                        system_text = Some(msg.content.clone());
468                    }
469                }
470                "assistant" => {
471                    if let Some(blocks) = Self::parse_assistant_tool_call_message(&msg.content) {
472                        native_messages.push(NativeMessage {
473                            role: "assistant".to_string(),
474                            content: blocks,
475                        });
476                    } else if !msg.content.trim().is_empty() {
477                        native_messages.push(NativeMessage {
478                            role: "assistant".to_string(),
479                            content: vec![NativeContentOut::Text {
480                                text: msg.content.clone(),
481                                cache_control: None,
482                            }],
483                        });
484                    }
485                }
486                "tool" => {
487                    let tool_msg = if let Some(tr) = Self::parse_tool_result_message(&msg.content) {
488                        tr
489                    } else if !msg.content.trim().is_empty() {
490                        NativeMessage {
491                            role: "user".to_string(),
492                            content: vec![NativeContentOut::Text {
493                                text: msg.content.clone(),
494                                cache_control: None,
495                            }],
496                        }
497                    } else {
498                        continue;
499                    };
500                    // Tool results map to role "user"; merge consecutive ones
501                    // into a single message so Anthropic doesn't reject the
502                    // request for having adjacent same-role messages.
503                    if native_messages
504                        .last()
505                        .is_some_and(|m| m.role == tool_msg.role)
506                    {
507                        native_messages
508                            .last_mut()
509                            .unwrap()
510                            .content
511                            .extend(tool_msg.content);
512                    } else {
513                        native_messages.push(tool_msg);
514                    }
515                }
516                _ => {
517                    // Parse image markers from user message content
518                    let (text, image_refs) = crate::multimodal::parse_image_markers(&msg.content);
519                    let mut content_blocks: Vec<NativeContentOut> = Vec::new();
520
521                    // Add image content blocks for each image reference
522                    for img_ref in &image_refs {
523                        let (media_type, data) = if img_ref.starts_with("data:") {
524                            // Data URI format: data:image/jpeg;base64,/9j/4AAQ...
525                            if let Some(comma) = img_ref.find(',') {
526                                let header = &img_ref[5..comma];
527                                let mime =
528                                    header.split(';').next().unwrap_or("image/jpeg").to_string();
529                                let b64 = img_ref[comma + 1..].trim().to_string();
530                                (mime, b64)
531                            } else {
532                                continue;
533                            }
534                        } else if std::path::Path::new(img_ref.trim()).exists() {
535                            // Local file path
536                            match std::fs::read(img_ref.trim()) {
537                                Ok(bytes) => {
538                                    let b64 =
539                                        base64::engine::general_purpose::STANDARD.encode(&bytes);
540                                    let ext = std::path::Path::new(img_ref.trim())
541                                        .extension()
542                                        .and_then(|e| e.to_str())
543                                        .unwrap_or("jpg");
544                                    let mime = match ext {
545                                        "png" => "image/png",
546                                        "gif" => "image/gif",
547                                        "webp" => "image/webp",
548                                        _ => "image/jpeg",
549                                    }
550                                    .to_string();
551                                    (mime, b64)
552                                }
553                                Err(_) => continue,
554                            }
555                        } else {
556                            continue;
557                        };
558
559                        content_blocks.push(NativeContentOut::Image {
560                            source: ImageSource {
561                                source_type: "base64".to_string(),
562                                media_type,
563                                data,
564                            },
565                        });
566                    }
567
568                    // Add text content block (skip empty text when images are present)
569                    if text.is_empty() && !image_refs.is_empty() {
570                        content_blocks.push(NativeContentOut::Text {
571                            text: "[image]".to_string(),
572                            cache_control: None,
573                        });
574                    } else if !text.trim().is_empty() {
575                        content_blocks.push(NativeContentOut::Text {
576                            text,
577                            cache_control: None,
578                        });
579                    }
580
581                    // Merge into previous user message if present (e.g.
582                    // when a user message immediately follows tool results
583                    // which are also role "user" in Anthropic's format).
584                    if native_messages.last().is_some_and(|m| m.role == "user") {
585                        native_messages
586                            .last_mut()
587                            .unwrap()
588                            .content
589                            .extend(content_blocks);
590                    } else {
591                        native_messages.push(NativeMessage {
592                            role: "user".to_string(),
593                            content: content_blocks,
594                        });
595                    }
596                }
597            }
598        }
599
600        Self::backfill_orphaned_tool_uses(&mut native_messages);
601
602        // Always use Blocks format with cache_control for system prompts
603        let system_prompt = system_text.map(|text| {
604            SystemPrompt::Blocks(vec![SystemBlock {
605                block_type: "text".to_string(),
606                text,
607                cache_control: Some(CacheControl::ephemeral()),
608            }])
609        });
610
611        (system_prompt, native_messages)
612    }
613
614    /// Pair any orphaned `tool_use` with a stub `tool_result` so interrupted
615    /// turns can't wedge the session with a hard 400 on replay. Defensive
616    /// backstop for the canonical-history guard in the runtime.
617    fn backfill_orphaned_tool_uses(messages: &mut Vec<NativeMessage>) {
618        let mut idx = 0;
619        while idx < messages.len() {
620            let pending: Vec<String> = messages[idx]
621                .content
622                .iter()
623                .filter_map(|block| match block {
624                    NativeContentOut::ToolUse { id, .. } => Some(id.clone()),
625                    _ => None,
626                })
627                .collect();
628
629            if pending.is_empty() {
630                idx += 1;
631                continue;
632            }
633
634            let answered: std::collections::HashSet<String> = messages
635                .get(idx + 1)
636                .map(|next| {
637                    next.content
638                        .iter()
639                        .filter_map(|block| match block {
640                            NativeContentOut::ToolResult { tool_use_id, .. } => {
641                                Some(tool_use_id.clone())
642                            }
643                            _ => None,
644                        })
645                        .collect()
646                })
647                .unwrap_or_default();
648
649            let stubs: Vec<NativeContentOut> = pending
650                .into_iter()
651                .filter(|id| !answered.contains(id))
652                .map(|tool_use_id| NativeContentOut::ToolResult {
653                    tool_use_id,
654                    content: "[tool result missing from history — the turn was \
655                              interrupted before this tool finished]"
656                        .to_string(),
657                    cache_control: None,
658                })
659                .collect();
660
661            if !stubs.is_empty() {
662                if messages
663                    .get(idx + 1)
664                    .is_some_and(|next| next.role == "user")
665                {
666                    let next = &mut messages[idx + 1];
667                    let mut merged = stubs;
668                    merged.append(&mut next.content);
669                    next.content = merged;
670                } else {
671                    messages.insert(
672                        idx + 1,
673                        NativeMessage {
674                            role: "user".to_string(),
675                            content: stubs,
676                        },
677                    );
678                }
679            }
680
681            idx += 1;
682        }
683    }
684
685    fn parse_native_response(response: NativeChatResponse) -> ProviderChatResponse {
686        let mut text_parts = Vec::new();
687        let mut thinking_parts = Vec::new();
688        let mut tool_calls = Vec::new();
689
690        let usage = response.usage.map(|u| {
691            // Anthropic's three input buckets are DISJOINT per
692            // https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching
693            //
694            //   total_input_tokens = cache_read_input_tokens
695            //                      + cache_creation_input_tokens
696            //                      + input_tokens
697            //
698            // Anthropic's `input_tokens` is the tokens AFTER the last cache
699            // breakpoint, not the total prompt. The other two are tokens
700            // before the breakpoint (read from cache vs. being written now).
701            //
702            // Our internal TokenUsage contract is that `input_tokens` is the
703            // *total* prompt sent to the model. Sum all three to normalize.
704            // `cached_input_tokens` is reported as the cache-read portion
705            // (the discounted-billing subset of the total) — this matches
706            // what billable_uncached_input = input - cached expects.
707            let uncached = u.input_tokens.unwrap_or(0);
708            let cache_read = u.cache_read_input_tokens.unwrap_or(0);
709            let cache_create = u.cache_creation_input_tokens.unwrap_or(0);
710            let total = uncached
711                .saturating_add(cache_read)
712                .saturating_add(cache_create);
713            let any_reported = u.input_tokens.is_some()
714                || u.cache_read_input_tokens.is_some()
715                || u.cache_creation_input_tokens.is_some();
716            TokenUsage {
717                input_tokens: if any_reported { Some(total) } else { None },
718                output_tokens: u.output_tokens,
719                cached_input_tokens: u.cache_read_input_tokens,
720            }
721        });
722
723        for block in response.content {
724            match block.kind.as_str() {
725                "text" => {
726                    if let Some(text) = block.text.map(|t| t.trim().to_string())
727                        && !text.is_empty()
728                    {
729                        text_parts.push(text);
730                    }
731                }
732                "thinking" => {
733                    // Store thinking text byte-for-byte: the signature is
734                    // computed over the exact bytes the model returned, so
735                    // any mutation (including trim()) invalidates it on
736                    // replay. Only skip when the provider returns genuinely
737                    // empty content.
738                    if let Some(thinking) = block.thinking.as_deref().or(block.text.as_deref())
739                        && !thinking.is_empty()
740                    {
741                        let json_block = serde_json::json!({
742                            "thinking": thinking,
743                            "signature": block.signature.as_deref().unwrap_or(""),
744                        });
745                        thinking_parts.push(json_block.to_string());
746                    }
747                }
748                "tool_use" => {
749                    let name = block.name.unwrap_or_default();
750                    if name.is_empty() {
751                        continue;
752                    }
753                    let arguments = block
754                        .input
755                        .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
756                    tool_calls.push(ProviderToolCall {
757                        id: block.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
758                        name,
759                        arguments: arguments.to_string(),
760                        extra_content: None,
761                    });
762                }
763                _ => {}
764            }
765        }
766
767        let reasoning_content = if thinking_parts.is_empty() {
768            None
769        } else {
770            Some(thinking_parts.join("\n"))
771        };
772
773        ProviderChatResponse {
774            text: if text_parts.is_empty() {
775                None
776            } else {
777                Some(text_parts.join("\n"))
778            },
779            tool_calls,
780            usage,
781            reasoning_content,
782        }
783    }
784
785    /// Resolve thinking parameters for an API request. Returns the effective
786    /// temperature (forced to 1.0 when thinking is active), the thinking
787    /// config for the request body, and the effective max_tokens (raised to
788    /// meet budget_tokens minimum when needed).
789    fn resolve_thinking(
790        &self,
791        thinking: Option<zeroclaw_api::model_provider::NativeThinkingParams>,
792        temperature: Option<f64>,
793        model: &str,
794    ) -> (Option<f64>, Option<NativeThinkingConfig>, u32) {
795        match thinking {
796            Some(params) if anthropic_model_supports_native_thinking(model) => {
797                ::zeroclaw_log::record!(
798                    INFO,
799                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
800                        .with_attrs(::serde_json::json!({"budget_tokens": params.budget_tokens})),
801                    "Native extended thinking enabled; forcing temperature=1.0"
802                );
803                // API requires max_tokens > budget_tokens (strictly greater).
804                let min_required = params.budget_tokens + 1;
805                let max_tokens = self.max_tokens.max(min_required);
806                (
807                    Some(1.0),
808                    Some(NativeThinkingConfig {
809                        kind: "enabled",
810                        budget_tokens: params.budget_tokens,
811                    }),
812                    max_tokens,
813                )
814            }
815            Some(_) => {
816                // Caller asked for native thinking but the model rejects the
817                // fixed-budget request shape. Drop to prompt-based reasoning
818                // (the agent loop's prefix already injected) and keep the
819                // caller-supplied temperature so per-model guards still apply.
820                ::zeroclaw_log::record!(
821                    WARN,
822                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
823                        .with_attrs(::serde_json::json!({"model": model})),
824                    "Native extended thinking requested but model only supports adaptive thinking; falling back to prompt-based reasoning"
825                );
826                (temperature, None, self.max_tokens)
827            }
828            None => (temperature, None, self.max_tokens),
829        }
830    }
831
832    fn http_client(&self) -> Client {
833        zeroclaw_config::schema::build_runtime_proxy_client_with_timeouts(
834            "model_provider.anthropic",
835            120,
836            10,
837        )
838    }
839
840    /// Build a streaming request body from a `NativeChatRequest`.
841    fn build_streaming_request(request: &NativeChatRequest<'_>) -> serde_json::Value {
842        let mut body =
843            serde_json::to_value(request).expect("NativeChatRequest should serialize to JSON");
844        body["stream"] = serde_json::Value::Bool(true);
845        body
846    }
847
848    /// Parse Anthropic SSE lines from `response` and send `StreamEvent`s to `tx`.
849    async fn parse_anthropic_sse(
850        response: reqwest::Response,
851        tx: &tokio::sync::mpsc::Sender<StreamResult<StreamEvent>>,
852    ) {
853        use tokio_util::io::StreamReader;
854
855        let byte_stream = response
856            .bytes_stream()
857            .map(|result| result.map_err(std::io::Error::other));
858        let reader = StreamReader::new(byte_stream);
859        Self::parse_anthropic_sse_from_reader(reader, tx).await;
860    }
861
862    /// Inner loop split out of `parse_anthropic_sse` so unit tests can feed a
863    /// `Cursor<&[u8]>` directly without spinning up a mock HTTP server.
864    async fn parse_anthropic_sse_from_reader<R>(
865        reader: R,
866        tx: &tokio::sync::mpsc::Sender<StreamResult<StreamEvent>>,
867    ) where
868        R: tokio::io::AsyncBufRead + Unpin,
869    {
870        use tokio::io::AsyncBufReadExt;
871
872        let mut lines = reader.lines();
873
874        let mut tool_id: Option<String> = None;
875        let mut tool_name: Option<String> = None;
876        let mut tool_input_json = String::new();
877
878        // Anthropic emits usage in two places: `message_start` carries the
879        // input-token count + prompt-cache reads; `message_delta` carries
880        // running output-token totals (each delta supersedes the prior). We
881        // capture both, then emit one `StreamEvent::Usage` at `message_stop`
882        // so the gateway accumulator and `record_turn_cost()` see the same
883        // signal Anthropic sends — closes the original #6001 live repro,
884        // which was Anthropic-shaped streaming.
885        let mut input_tokens: Option<u64> = None;
886        let mut output_tokens: Option<u64> = None;
887        let mut cached_input_tokens: Option<u64> = None;
888        let mut cache_creation_input_tokens: Option<u64> = None;
889
890        while let Ok(Some(line)) =
891            match tokio::time::timeout(SSE_IDLE_TIMEOUT, lines.next_line()).await {
892                Ok(read) => read,
893                Err(_) => {
894                    ::zeroclaw_log::record!(
895                        WARN,
896                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
897                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
898                            .with_attrs(::serde_json::json!({
899                                "idle_secs": SSE_IDLE_TIMEOUT.as_secs(),
900                            })),
901                        "stream: SSE idle timeout — connection stalled, aborting stream"
902                    );
903                    let _ = tx
904                        .send(Err(StreamError::Http(format!(
905                            "SSE stream stalled: no data for {}s",
906                            SSE_IDLE_TIMEOUT.as_secs()
907                        ))))
908                        .await;
909                    return;
910                }
911            }
912        {
913            let line = line.trim().to_string();
914            if !line.starts_with("data: ") {
915                continue;
916            }
917            let json_str = &line["data: ".len()..];
918
919            let event: serde_json::Value = match serde_json::from_str(json_str) {
920                Ok(v) => v,
921                Err(_) => continue,
922            };
923
924            let event_type = event
925                .get("type")
926                .and_then(|t| t.as_str())
927                .unwrap_or_default();
928
929            match event_type {
930                "message_start" => {
931                    let model = event
932                        .get("message")
933                        .and_then(|m| m.get("model"))
934                        .and_then(|m| m.as_str())
935                        .unwrap_or("unknown");
936                    let usage = event.get("message").and_then(|m| m.get("usage"));
937                    let observed_input = usage
938                        .and_then(|u| u.get("input_tokens"))
939                        .and_then(|t| t.as_u64());
940                    let observed_cached = usage
941                        .and_then(|u| u.get("cache_read_input_tokens"))
942                        .and_then(|t| t.as_u64());
943                    let observed_cache_create = usage
944                        .and_then(|u| u.get("cache_creation_input_tokens"))
945                        .and_then(|t| t.as_u64());
946                    if let Some(v) = observed_input {
947                        input_tokens = Some(v);
948                    }
949                    if let Some(v) = observed_cached {
950                        cached_input_tokens = Some(v);
951                    }
952                    if let Some(v) = observed_cache_create {
953                        cache_creation_input_tokens = Some(v);
954                    }
955                    ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"model": model, "input_tokens": observed_input, "cached_input_tokens": observed_cached, "cache_creation_input_tokens": observed_cache_create})), "stream: message_start");
956                }
957                "content_block_start" => {
958                    if let Some(block) = event.get("content_block") {
959                        let block_type = block
960                            .get("type")
961                            .and_then(|t| t.as_str())
962                            .unwrap_or_default();
963                        if block_type == "tool_use" {
964                            if let Some(id) = tool_id.take() {
965                                let name = tool_name.take().unwrap_or_default();
966                                let input = std::mem::take(&mut tool_input_json);
967                                let _ = tx
968                                    .send(Ok(StreamEvent::ToolCall(ProviderToolCall {
969                                        id,
970                                        name,
971                                        arguments: input,
972                                        extra_content: None,
973                                    })))
974                                    .await;
975                            }
976                            tool_id = block
977                                .get("id")
978                                .and_then(|v| v.as_str())
979                                .map(ToString::to_string);
980                            tool_name = block
981                                .get("name")
982                                .and_then(|v| v.as_str())
983                                .map(ToString::to_string);
984                            tool_input_json.clear();
985                        }
986                    }
987                }
988                "content_block_delta" => {
989                    if let Some(delta) = event.get("delta") {
990                        let delta_type = delta
991                            .get("type")
992                            .and_then(|t| t.as_str())
993                            .unwrap_or_default();
994                        match delta_type {
995                            "text_delta" => {
996                                if let Some(text) = delta.get("text").and_then(|t| t.as_str())
997                                    && !text.is_empty()
998                                    && tx
999                                        .send(Ok(StreamEvent::TextDelta(StreamChunk::delta(
1000                                            text.to_string(),
1001                                        ))))
1002                                        .await
1003                                        .is_err()
1004                                {
1005                                    return;
1006                                }
1007                            }
1008                            "input_json_delta" => {
1009                                if let Some(json) =
1010                                    delta.get("partial_json").and_then(|j| j.as_str())
1011                                {
1012                                    tool_input_json.push_str(json);
1013                                }
1014                            }
1015                            // TODO: handle "thinking_delta" events for streaming
1016                            // extended thinking content. Currently thinking blocks
1017                            // are only captured in non-streaming parse_native_response().
1018                            _ => {}
1019                        }
1020                    }
1021                }
1022                "content_block_stop" => {
1023                    if let Some(id) = tool_id.take() {
1024                        let name = tool_name.take().unwrap_or_default();
1025                        let input = std::mem::take(&mut tool_input_json);
1026                        let _ = tx
1027                            .send(Ok(StreamEvent::ToolCall(ProviderToolCall {
1028                                id,
1029                                name,
1030                                arguments: input,
1031                                extra_content: None,
1032                            })))
1033                            .await;
1034                    }
1035                }
1036                "message_delta" => {
1037                    let stop_reason = event
1038                        .get("delta")
1039                        .and_then(|d| d.get("stop_reason"))
1040                        .and_then(|s| s.as_str())
1041                        .unwrap_or("none");
1042                    // Anthropic's running-total: each `message_delta`
1043                    // supersedes the previous one, so we always overwrite.
1044                    let observed_output = event
1045                        .get("usage")
1046                        .and_then(|u| u.get("output_tokens"))
1047                        .and_then(|t| t.as_u64());
1048                    if let Some(v) = observed_output {
1049                        output_tokens = Some(v);
1050                    }
1051                    if stop_reason == "max_tokens" {
1052                        ::zeroclaw_log::record!(
1053                            WARN,
1054                            ::zeroclaw_log::Event::new(
1055                                module_path!(),
1056                                ::zeroclaw_log::Action::Note
1057                            )
1058                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1059                            .with_attrs(::serde_json::json!({"output_tokens": observed_output})),
1060                            "response truncated: hit max_tokens limit. Increase provider_max_tokens in config."
1061                        );
1062                    } else {
1063                        ::zeroclaw_log::record!(DEBUG, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"stop_reason": stop_reason, "output_tokens": observed_output})), "stream: message_delta");
1064                    }
1065                }
1066                "message_stop" => {
1067                    ::zeroclaw_log::record!(
1068                        DEBUG,
1069                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1070                        "stream: message_stop"
1071                    );
1072                    if input_tokens.is_some()
1073                        || output_tokens.is_some()
1074                        || cached_input_tokens.is_some()
1075                        || cache_creation_input_tokens.is_some()
1076                    {
1077                        // Normalize to TokenUsage contract: `input_tokens` is
1078                        // the *total* prompt size. Anthropic reports three
1079                        // DISJOINT buckets per
1080                        // https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching:
1081                        //   total = cache_read + cache_creation + input_tokens
1082                        // where `input_tokens` from the API is "tokens after
1083                        // the last cache breakpoint", not the total.
1084                        let uncached = input_tokens.unwrap_or(0);
1085                        let cache_read = cached_input_tokens.unwrap_or(0);
1086                        let cache_create = cache_creation_input_tokens.unwrap_or(0);
1087                        let normalized_input = Some(
1088                            uncached
1089                                .saturating_add(cache_read)
1090                                .saturating_add(cache_create),
1091                        );
1092                        let _ = tx
1093                            .send(Ok(StreamEvent::Usage(TokenUsage {
1094                                input_tokens: normalized_input,
1095                                output_tokens,
1096                                cached_input_tokens,
1097                            })))
1098                            .await;
1099                    }
1100                    let _ = tx.send(Ok(StreamEvent::Final)).await;
1101                    return;
1102                }
1103                "error" => {
1104                    let msg = event
1105                        .get("error")
1106                        .and_then(|e| e.get("message"))
1107                        .and_then(|m| m.as_str())
1108                        .unwrap_or("unknown streaming error");
1109                    let _ = tx
1110                        .send(Err(StreamError::ModelProvider(msg.to_string())))
1111                        .await;
1112                    return;
1113                }
1114                _ => {}
1115            }
1116        }
1117
1118        ::zeroclaw_log::record!(
1119            DEBUG,
1120            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Complete)
1121                .with_category(::zeroclaw_log::EventCategory::Provider)
1122                .with_outcome(::zeroclaw_log::EventOutcome::Success)
1123                .with_attrs(::serde_json::json!({
1124                    "input_tokens": input_tokens,
1125                    "output_tokens": output_tokens,
1126                })),
1127            "stream: SSE parser reached end of stream, emitting Final"
1128        );
1129        let _ = tx.send(Ok(StreamEvent::Final)).await;
1130    }
1131}
1132
1133#[async_trait]
1134impl ModelProvider for AnthropicModelProvider {
1135    fn default_temperature(&self) -> f64 {
1136        TEMPERATURE_DEFAULT
1137    }
1138
1139    fn default_base_url(&self) -> Option<&str> {
1140        Some(BASE_URL)
1141    }
1142
1143    async fn chat_with_system(
1144        &self,
1145        system_prompt: Option<&str>,
1146        message: &str,
1147        model: &str,
1148        temperature: Option<f64>,
1149    ) -> anyhow::Result<String> {
1150        let credential = self.credential.as_ref().ok_or_else(|| {
1151            ::zeroclaw_log::record!(
1152                ERROR,
1153                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
1154                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1155                    .with_attrs(::serde_json::json!({"missing": "credentials"})),
1156                "anthropic: no credentials configured"
1157            );
1158            anyhow::Error::msg(
1159                "Anthropic credentials not set. Set ANTHROPIC_API_KEY or ANTHROPIC_OAUTH_TOKEN (setup-token).",
1160            )
1161        })?;
1162
1163        let system = system_prompt.map(|s| SystemPrompt::String(s.to_string()));
1164        let system = if Self::is_setup_token(credential) {
1165            Self::apply_oauth_system_prompt(system)
1166        } else {
1167            system
1168        };
1169
1170        ::zeroclaw_log::record!(
1171            DEBUG,
1172            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1173                .with_attrs(::serde_json::json!({"max_tokens": self.max_tokens, "model": model})),
1174            "API request"
1175        );
1176        let request = NativeChatRequest {
1177            model: model.to_string(),
1178            max_tokens: self.max_tokens,
1179            system,
1180            messages: vec![NativeMessage {
1181                role: "user".to_string(),
1182                content: vec![NativeContentOut::Text {
1183                    text: message.to_string(),
1184                    cache_control: None,
1185                }],
1186            }],
1187            temperature,
1188            tools: None,
1189            tool_choice: None,
1190            stream: None,
1191            thinking: None,
1192        };
1193
1194        let mut request = self
1195            .http_client()
1196            .post(format!("{}/v1/messages", self.base_url))
1197            .header("anthropic-version", "2023-06-01")
1198            .header("content-type", "application/json")
1199            .json(&request);
1200
1201        request = self.apply_auth(request, credential);
1202
1203        let response = request.send().await?;
1204
1205        if !response.status().is_success() {
1206            return Err(super::api_error("Anthropic", response).await);
1207        }
1208
1209        let chat_response: NativeChatResponse = response.json().await?;
1210        let parsed = Self::parse_native_response(chat_response);
1211        parsed.text.ok_or_else(|| {
1212            ::zeroclaw_log::record!(
1213                ERROR,
1214                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1215                    .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1216                "anthropic: empty text in response"
1217            );
1218            anyhow::Error::msg("No response from Anthropic")
1219        })
1220    }
1221
1222    async fn chat(
1223        &self,
1224        request: ProviderChatRequest<'_>,
1225        model: &str,
1226        temperature: Option<f64>,
1227    ) -> anyhow::Result<ProviderChatResponse> {
1228        let credential = self.credential.as_ref().ok_or_else(|| {
1229            ::zeroclaw_log::record!(
1230                ERROR,
1231                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
1232                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
1233                    .with_attrs(::serde_json::json!({"missing": "credentials"})),
1234                "anthropic: no credentials configured"
1235            );
1236            anyhow::Error::msg(
1237                "Anthropic credentials not set. Set ANTHROPIC_API_KEY or ANTHROPIC_OAUTH_TOKEN (setup-token).",
1238            )
1239        })?;
1240
1241        let (system_prompt, mut messages) = Self::convert_messages(request.messages);
1242
1243        // Auto-cache last message if conversation is long
1244        if Self::should_cache_conversation(request.messages) {
1245            Self::apply_cache_to_last_message(&mut messages);
1246        }
1247
1248        // Check for tool_choice override from the agent loop (e.g. "any"
1249        // to force tool use for hardware requests).
1250        let tool_choice_override = zeroclaw_api::TOOL_CHOICE_OVERRIDE
1251            .try_with(Clone::clone)
1252            .ok()
1253            .flatten();
1254        let native_tools = Self::convert_tools(request.tools);
1255        let tool_choice = if native_tools.is_some() {
1256            tool_choice_override.map(|tc| serde_json::json!({ "type": tc }))
1257        } else {
1258            None
1259        };
1260
1261        // For OAuth tokens, prepend Claude Code identity to system prompt
1262        let system_prompt = if Self::is_setup_token(credential) {
1263            Self::apply_oauth_system_prompt(system_prompt)
1264        } else {
1265            system_prompt
1266        };
1267
1268        let (effective_temperature, thinking_config, effective_max_tokens) =
1269            self.resolve_thinking(request.thinking, temperature, model);
1270
1271        ::zeroclaw_log::record!(
1272            DEBUG,
1273            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
1274                ::serde_json::json!({"max_tokens": effective_max_tokens, "model": model})
1275            ),
1276            "non-streaming API request"
1277        );
1278        let native_request = NativeChatRequest {
1279            model: model.to_string(),
1280            max_tokens: effective_max_tokens,
1281            system: system_prompt,
1282            messages,
1283            temperature: effective_temperature,
1284            tools: native_tools,
1285            tool_choice,
1286            stream: None,
1287            thinking: thinking_config,
1288        };
1289
1290        let req = self
1291            .http_client()
1292            .post(format!("{}/v1/messages", self.base_url))
1293            .header("anthropic-version", "2023-06-01")
1294            .header("content-type", "application/json")
1295            .json(&native_request);
1296
1297        let response = self.apply_auth(req, credential).send().await?;
1298        if !response.status().is_success() {
1299            return Err(super::api_error("Anthropic", response).await);
1300        }
1301
1302        let native_response: NativeChatResponse = response.json().await?;
1303        Ok(Self::parse_native_response(native_response))
1304    }
1305
1306    fn capabilities(&self) -> ProviderCapabilities {
1307        ProviderCapabilities {
1308            native_tool_calling: true,
1309            vision: true,
1310            prompt_caching: true,
1311            extended_thinking: true,
1312        }
1313    }
1314
1315    fn supports_native_tools(&self) -> bool {
1316        true
1317    }
1318
1319    async fn chat_with_tools(
1320        &self,
1321        messages: &[ChatMessage],
1322        tools: &[serde_json::Value],
1323        model: &str,
1324        temperature: Option<f64>,
1325    ) -> anyhow::Result<ProviderChatResponse> {
1326        // Convert OpenAI-format tool JSON to ToolSpec so we can reuse the
1327        // existing `chat()` method which handles full message history,
1328        // system prompt extraction, caching, and Anthropic native formatting.
1329        let tool_specs: Vec<ToolSpec> = tools
1330            .iter()
1331            .filter_map(|t| {
1332                let func = t.get("function").or_else(|| {
1333                    ::zeroclaw_log::record!(
1334                        WARN,
1335                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1336                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1337                        "Skipping malformed tool definition (missing 'function' key)"
1338                    );
1339                    None
1340                })?;
1341                let name = func.get("name").and_then(|n| n.as_str()).or_else(|| {
1342                    ::zeroclaw_log::record!(
1343                        WARN,
1344                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1345                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1346                        "Skipping tool with missing or non-string 'name'"
1347                    );
1348                    None
1349                })?;
1350                Some(ToolSpec {
1351                    name: name.to_string(),
1352                    description: func
1353                        .get("description")
1354                        .and_then(|d| d.as_str())
1355                        .unwrap_or("")
1356                        .to_string(),
1357                    parameters: func
1358                        .get("parameters")
1359                        .cloned()
1360                        .unwrap_or(serde_json::json!({"type": "object"})),
1361                })
1362            })
1363            .collect();
1364
1365        let request = ProviderChatRequest {
1366            messages,
1367            tools: if tool_specs.is_empty() {
1368                None
1369            } else {
1370                Some(&tool_specs)
1371            },
1372            thinking: None,
1373        };
1374        self.chat(request, model, temperature).await
1375    }
1376
1377    async fn warmup(&self) -> anyhow::Result<()> {
1378        if let Some(credential) = self.credential.as_ref() {
1379            let mut request = self
1380                .http_client()
1381                .post(format!("{}/v1/messages", self.base_url))
1382                .header("anthropic-version", "2023-06-01");
1383            request = self.apply_auth(request, credential);
1384            // Send a minimal request; the goal is TLS + HTTP/2 setup, not a valid response.
1385            // Anthropic has no lightweight GET endpoint, so we accept any non-network error.
1386            let _ = request.send().await?;
1387        }
1388        Ok(())
1389    }
1390
1391    async fn list_models(&self) -> anyhow::Result<Vec<String>> {
1392        // Anthropic's /v1/models requires a credential. Onboard pulls the
1393        // catalog from models.dev before the user has entered a key.
1394        crate::models_dev::list_models_for("anthropic").await
1395    }
1396
1397    fn supports_streaming(&self) -> bool {
1398        true
1399    }
1400
1401    fn supports_streaming_tool_events(&self) -> bool {
1402        true
1403    }
1404
1405    fn stream_chat(
1406        &self,
1407        request: ProviderChatRequest<'_>,
1408        model: &str,
1409        temperature: Option<f64>,
1410        options: StreamOptions,
1411    ) -> stream::BoxStream<'static, StreamResult<StreamEvent>> {
1412        if !options.enabled {
1413            return stream::once(async { Ok(StreamEvent::Final) }).boxed();
1414        }
1415
1416        let credential = match self.credential.as_ref() {
1417            Some(c) => c.clone(),
1418            None => {
1419                return stream::once(async {
1420                    Err(StreamError::ModelProvider(
1421                        "Anthropic credentials not set".to_string(),
1422                    ))
1423                })
1424                .boxed();
1425            }
1426        };
1427
1428        let (system_prompt, mut messages) = Self::convert_messages(request.messages);
1429        if Self::should_cache_conversation(request.messages) {
1430            Self::apply_cache_to_last_message(&mut messages);
1431        }
1432
1433        let tool_choice_override = zeroclaw_api::TOOL_CHOICE_OVERRIDE
1434            .try_with(Clone::clone)
1435            .ok()
1436            .flatten();
1437        let native_tools = Self::convert_tools(request.tools);
1438        let tool_choice = if native_tools.is_some() {
1439            tool_choice_override.map(|tc| serde_json::json!({ "type": tc }))
1440        } else {
1441            None
1442        };
1443
1444        let system_prompt = if Self::is_setup_token(&credential) {
1445            Self::apply_oauth_system_prompt(system_prompt)
1446        } else {
1447            system_prompt
1448        };
1449
1450        let (effective_temperature, thinking_config, effective_max_tokens) =
1451            self.resolve_thinking(request.thinking, temperature, model);
1452
1453        // When native thinking is enabled, streamed `thinking_delta` /
1454        // `signature_delta` SSE events are not yet parsed into
1455        // `reasoning_content`, which means a tool-use turn could emit a
1456        // tool call without preserving the signed thinking block that
1457        // justified it — breaking Anthropic's signature round-trip. Fall
1458        // back to a non-streaming request so `parse_native_response` can
1459        // preserve the signed blocks, and synthesize a short stream from
1460        // the completed response. Full streaming thinking_delta
1461        // preservation is tracked as a follow-up.
1462        if thinking_config.is_some() {
1463            ::zeroclaw_log::record!(
1464                INFO,
1465                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1466                    .with_attrs(::serde_json::json!({"model": model})),
1467                "native thinking enabled; using non-streaming fallback to preserve signed thinking blocks"
1468            );
1469            let native_request = NativeChatRequest {
1470                model: model.to_string(),
1471                max_tokens: effective_max_tokens,
1472                system: system_prompt,
1473                messages,
1474                temperature: effective_temperature,
1475                tools: native_tools,
1476                tool_choice,
1477                stream: None,
1478                thinking: thinking_config,
1479            };
1480            // Serialize eagerly so the request body is owned and `'static`
1481            // across the async boundary — `NativeToolSpec<'a>` borrows from
1482            // `request.tools`, which prevents moving `native_request` into
1483            // the spawned future otherwise.
1484            let body = serde_json::to_value(&native_request)
1485                .expect("NativeChatRequest should serialize to JSON");
1486            let client = self.http_client();
1487            let url = format!("{}/v1/messages", self.base_url);
1488            let is_oauth = Self::is_setup_token(&credential);
1489
1490            return stream::once(async move {
1491                let mut req = client
1492                    .post(&url)
1493                    .header("anthropic-version", "2023-06-01")
1494                    .header("content-type", "application/json")
1495                    .json(&body);
1496                if is_oauth {
1497                    req = req
1498                        .header("Authorization", format!("Bearer {credential}"))
1499                        .header(
1500                            "anthropic-beta",
1501                            "claude-code-20250219,oauth-2025-04-20,interleaved-thinking-2025-05-14",
1502                        )
1503                        .header("anthropic-dangerous-direct-browser-access", "true");
1504                } else {
1505                    req = req.header("x-api-key", &credential);
1506                }
1507                let response = req
1508                    .send()
1509                    .await
1510                    .map_err(|e| StreamError::Http(e.to_string()))?;
1511                if !response.status().is_success() {
1512                    let status = response.status();
1513                    let body = response
1514                        .text()
1515                        .await
1516                        .unwrap_or_else(|_| format!("HTTP error: {status}"));
1517                    return Err(StreamError::ModelProvider(format!("{status}: {body}")));
1518                }
1519                let parsed: NativeChatResponse = response
1520                    .json()
1521                    .await
1522                    .map_err(|e| StreamError::ModelProvider(format!("response decode: {e}")))?;
1523                Ok(Self::parse_native_response(parsed))
1524            })
1525            .flat_map(|result| match result {
1526                Ok(resp) => {
1527                    let mut events: Vec<StreamResult<StreamEvent>> = Vec::new();
1528                    // Emit signed thinking blocks first via `StreamChunk.reasoning`
1529                    // so the agent loop can accumulate them into
1530                    // `ChatResponse.reasoning_content` for multi-turn replay.
1531                    // Anthropic requires signed thinking blocks to precede
1532                    // tool-use blocks in conversation history.
1533                    if let Some(rc) = resp.reasoning_content {
1534                        events.push(Ok(StreamEvent::TextDelta(StreamChunk {
1535                            delta: String::new(),
1536                            reasoning: Some(rc),
1537                            is_final: false,
1538                            token_count: 0,
1539                        })));
1540                    }
1541                    if let Some(text) = resp.text.filter(|t| !t.is_empty()) {
1542                        events.push(Ok(StreamEvent::TextDelta(StreamChunk::delta(text))));
1543                    }
1544                    for tc in resp.tool_calls {
1545                        events.push(Ok(StreamEvent::ToolCall(tc)));
1546                    }
1547                    if let Some(usage) = resp.usage {
1548                        events.push(Ok(StreamEvent::Usage(usage)));
1549                    }
1550                    events.push(Ok(StreamEvent::Final));
1551                    stream::iter(events)
1552                }
1553                Err(e) => stream::iter(vec![Err(e)]),
1554            })
1555            .boxed();
1556        }
1557
1558        ::zeroclaw_log::record!(
1559            DEBUG,
1560            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(
1561                ::serde_json::json!({"max_tokens": effective_max_tokens, "model": model})
1562            ),
1563            "stream_chat request"
1564        );
1565        let native_request = NativeChatRequest {
1566            model: model.to_string(),
1567            max_tokens: effective_max_tokens,
1568            system: system_prompt,
1569            messages,
1570            temperature: effective_temperature,
1571            tools: native_tools,
1572            tool_choice,
1573            stream: Some(true),
1574            thinking: thinking_config,
1575        };
1576
1577        let body = Self::build_streaming_request(&native_request);
1578        let client = self.http_client();
1579        let url = format!("{}/v1/messages", self.base_url);
1580        let is_oauth = Self::is_setup_token(&credential);
1581
1582        let (tx, rx) = tokio::sync::mpsc::channel::<StreamResult<StreamEvent>>(64);
1583
1584        ::zeroclaw_log::record!(
1585            DEBUG,
1586            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Spawn)
1587                .with_category(::zeroclaw_log::EventCategory::Provider)
1588                .with_attrs(::serde_json::json!({
1589                    "idle_timeout_secs": SSE_IDLE_TIMEOUT.as_secs(),
1590                    "channel_capacity": 64,
1591                })),
1592            "stream: spawning detached Anthropic SSE parser task"
1593        );
1594
1595        let parser_handle = ::zeroclaw_spawn::spawn!(async move {
1596            let mut req = client
1597                .post(&url)
1598                .header("anthropic-version", "2023-06-01")
1599                .header("content-type", "application/json")
1600                .json(&body);
1601
1602            if is_oauth {
1603                req = req
1604                    .header("Authorization", format!("Bearer {credential}"))
1605                    .header(
1606                        "anthropic-beta",
1607                        "claude-code-20250219,oauth-2025-04-20,interleaved-thinking-2025-05-14",
1608                    )
1609                    .header("anthropic-dangerous-direct-browser-access", "true");
1610            } else {
1611                req = req.header("x-api-key", &credential);
1612            }
1613
1614            let response = match req.send().await {
1615                Ok(r) => r,
1616                Err(e) => {
1617                    let _ = tx
1618                        .send(Err(StreamError::Http(super::format_error_chain(&e))))
1619                        .await;
1620                    return;
1621                }
1622            };
1623
1624            if !response.status().is_success() {
1625                let status = response.status();
1626                let error = response
1627                    .text()
1628                    .await
1629                    .unwrap_or_else(|_| format!("HTTP error: {status}"));
1630                let _ = tx
1631                    .send(Err(StreamError::ModelProvider(format!(
1632                        "{status}: {error}"
1633                    ))))
1634                    .await;
1635                return;
1636            }
1637
1638            Self::parse_anthropic_sse(response, &tx).await;
1639        });
1640
1641        // The guard travels inside the unfold state so it is dropped at the
1642        // exact moment the consumer drops the stream — turning a turn cancel
1643        // (or normal completion) into an immediate parser-task abort instead
1644        // of a leaked socket that lingers until SSE_IDLE_TIMEOUT.
1645        let guard = AbortOnDrop::new(parser_handle.abort_handle());
1646        stream::unfold((rx, guard), |(mut rx, guard)| async move {
1647            rx.recv().await.map(|event| (event, (rx, guard)))
1648        })
1649        .boxed()
1650    }
1651}
1652
1653impl ::zeroclaw_api::attribution::Attributable for AnthropicModelProvider {
1654    fn role(&self) -> ::zeroclaw_api::attribution::Role {
1655        ::zeroclaw_api::attribution::Role::Provider(
1656            ::zeroclaw_api::attribution::ProviderKind::Model(
1657                ::zeroclaw_api::attribution::ModelProviderKind::Anthropic,
1658            ),
1659        )
1660    }
1661    fn alias(&self) -> &str {
1662        &self.alias
1663    }
1664}
1665
1666#[cfg(test)]
1667mod tests {
1668    use super::*;
1669    use crate::auth::anthropic_token::{AnthropicAuthKind, detect_auth_kind};
1670
1671    /// Fake Anthropic SSE stream covering the message_start → content → delta
1672    /// → stop sequence with usage in both the start frame and the stop delta.
1673    /// Each `data:` line is one Anthropic event per the streaming spec.
1674    ///
1675    /// The usage frame includes all three disjoint input buckets
1676    /// (input_tokens=314 after-breakpoint, cache_read=42, cache_creation=100)
1677    /// so the test exercises the documented Anthropic formula:
1678    ///   total = cache_read + cache_creation + input_tokens
1679    fn fake_anthropic_sse() -> &'static [u8] {
1680        b"event: message_start\n\
1681data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"claude-sonnet-4-5\",\"usage\":{\"input_tokens\":314,\"cache_read_input_tokens\":42,\"cache_creation_input_tokens\":100}}}\n\n\
1682event: content_block_start\n\
1683data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n\
1684event: content_block_delta\n\
1685data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n\
1686event: content_block_stop\n\
1687data: {\"type\":\"content_block_stop\",\"index\":0}\n\n\
1688event: message_delta\n\
1689data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\"},\"usage\":{\"output_tokens\":27}}\n\n\
1690event: message_stop\n\
1691data: {\"type\":\"message_stop\"}\n\n"
1692    }
1693
1694    #[tokio::test]
1695    async fn streaming_usage_emitted_before_final() {
1696        // The original #6001 live repro was Anthropic streaming; before this
1697        // PR the message_start / message_delta usage frames were only logged
1698        // at DEBUG and never surfaced as `StreamEvent::Usage`. Now they are.
1699        use std::io::Cursor;
1700
1701        let bytes = fake_anthropic_sse();
1702        let reader = tokio::io::BufReader::new(Cursor::new(bytes));
1703        let (tx, mut rx) = tokio::sync::mpsc::channel::<StreamResult<StreamEvent>>(64);
1704        AnthropicModelProvider::parse_anthropic_sse_from_reader(reader, &tx).await;
1705
1706        let mut events = Vec::new();
1707        while let Ok(Some(ev)) =
1708            tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await
1709        {
1710            events.push(ev);
1711        }
1712
1713        let states: Vec<&str> = events
1714            .iter()
1715            .map(|e| match e.as_ref() {
1716                Ok(StreamEvent::TextDelta(_)) => "text",
1717                Ok(StreamEvent::ToolCall(_)) => "tool_call",
1718                Ok(StreamEvent::PreExecutedToolCall { .. }) => "pre_tool_call",
1719                Ok(StreamEvent::PreExecutedToolResult { .. }) => "pre_tool_result",
1720                Ok(StreamEvent::Usage(_)) => "usage",
1721                Ok(StreamEvent::Final) => "final",
1722                Err(_) => "err",
1723            })
1724            .collect();
1725
1726        // Required ordering: usage event must appear before Final so the
1727        // gateway accumulator can capture it within the same turn boundary.
1728        let usage_pos = states
1729            .iter()
1730            .position(|s| *s == "usage")
1731            .unwrap_or_else(|| panic!("expected Usage event in stream, got {states:?}"));
1732        let final_pos = states
1733            .iter()
1734            .position(|s| *s == "final")
1735            .unwrap_or_else(|| panic!("expected Final event in stream, got {states:?}"));
1736        assert!(
1737            usage_pos < final_pos,
1738            "Usage must come before Final, got {states:?}"
1739        );
1740
1741        // The Usage payload must carry both input + output token counts plus
1742        // the cached-input prompt-cache reads from message_start.
1743        let usage = events
1744            .iter()
1745            .find_map(|e| match e.as_ref() {
1746                Ok(StreamEvent::Usage(u)) => Some(u.clone()),
1747                _ => None,
1748            })
1749            .unwrap();
1750        assert_eq!(
1751            usage.input_tokens,
1752            Some(456),
1753            "input_tokens must be the total of all three Anthropic buckets \
1754             (after-breakpoint 314 + cache_read 42 + cache_creation 100) \
1755             per the documented prompt-caching formula"
1756        );
1757        assert_eq!(
1758            usage.output_tokens,
1759            Some(27),
1760            "output_tokens from message_delta usage frame"
1761        );
1762        assert_eq!(
1763            usage.cached_input_tokens,
1764            Some(42),
1765            "cache_read_input_tokens from message_start"
1766        );
1767    }
1768
1769    /// A reader that yields one buffer of bytes, then parks forever — models
1770    /// an SSE connection that delivers `message_start` and then goes silent
1771    /// with the socket still open. Without the idle timeout this hangs the
1772    /// parser indefinitely.
1773    struct StallAfterReader {
1774        data: std::io::Cursor<Vec<u8>>,
1775        drained: bool,
1776    }
1777
1778    impl tokio::io::AsyncRead for StallAfterReader {
1779        fn poll_read(
1780            mut self: std::pin::Pin<&mut Self>,
1781            cx: &mut std::task::Context<'_>,
1782            buf: &mut tokio::io::ReadBuf<'_>,
1783        ) -> std::task::Poll<std::io::Result<()>> {
1784            if self.drained {
1785                // Park without self-waking; the surrounding timeout's timer
1786                // provides the wake. Self-waking here would busy-spin under
1787                // paused time and starve the timer.
1788                return std::task::Poll::Pending;
1789            }
1790            let before = buf.filled().len();
1791            let inner = std::pin::Pin::new(&mut self.data);
1792            let res = inner.poll_read(cx, buf);
1793            // Once the seed buffer is exhausted, stall on the *next* read
1794            // rather than reporting EOF (0 bytes) — EOF would end the stream
1795            // cleanly and never exercise the idle timeout.
1796            if buf.filled().len() == before {
1797                self.drained = true;
1798                return std::task::Poll::Pending;
1799            }
1800            res
1801        }
1802    }
1803
1804    #[tokio::test(start_paused = true)]
1805    async fn stalled_stream_times_out_instead_of_hanging() {
1806        // Repro: connection delivers message_start then goes silent. The
1807        // parser must surface a retryable StreamError rather than parking on
1808        // next_line() forever (the "stuck on working" hang).
1809        let start = b"event: message_start\n\
1810data: {\"type\":\"message_start\",\"message\":{\"id\":\"m\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"claude\",\"usage\":{\"input_tokens\":1}}}\n\n"
1811            .to_vec();
1812        let reader = tokio::io::BufReader::new(StallAfterReader {
1813            data: std::io::Cursor::new(start),
1814            drained: false,
1815        });
1816        let (tx, mut rx) = tokio::sync::mpsc::channel::<StreamResult<StreamEvent>>(64);
1817
1818        let parser = ::zeroclaw_spawn::spawn!(async move {
1819            AnthropicModelProvider::parse_anthropic_sse_from_reader(reader, &tx).await;
1820        });
1821
1822        // Let the parser run until it parks on the stalled read before we
1823        // jump virtual time forward.
1824        tokio::task::yield_now().await;
1825        // Advance virtual time past the idle bound; the parser should wake,
1826        // emit an error, and return — closing the channel.
1827        tokio::time::advance(SSE_IDLE_TIMEOUT + std::time::Duration::from_secs(1)).await;
1828
1829        let mut last_err = None;
1830        while let Some(ev) = rx.recv().await {
1831            if let Err(e) = ev {
1832                last_err = Some(e);
1833            }
1834        }
1835        parser.await.expect("parser task must finish, not hang");
1836
1837        let err = last_err.expect("a StreamError must be emitted on stall");
1838        assert!(
1839            matches!(err, StreamError::Http(ref m) if m.contains("stalled")),
1840            "expected stalled-stream Http error, got: {err:?}"
1841        );
1842    }
1843
1844    #[tokio::test(start_paused = true)]
1845    async fn dropping_guard_aborts_parser_without_idle_wait() {
1846        // The full-measure fix: dropping the consumer stream must abort the
1847        // detached parser immediately (turn cancel), not leak the socket until
1848        // SSE_IDLE_TIMEOUT. We model the stream's lifetime with AbortOnDrop and
1849        // assert the task is aborted the instant the guard drops.
1850        let start = b"event: message_start\n\
1851data: {\"type\":\"message_start\",\"message\":{\"id\":\"m\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"claude\",\"usage\":{\"input_tokens\":1}}}\n\n"
1852            .to_vec();
1853        let reader = tokio::io::BufReader::new(StallAfterReader {
1854            data: std::io::Cursor::new(start),
1855            drained: false,
1856        });
1857        let (tx, _rx) = tokio::sync::mpsc::channel::<StreamResult<StreamEvent>>(64);
1858
1859        let handle = ::zeroclaw_spawn::spawn!(async move {
1860            AnthropicModelProvider::parse_anthropic_sse_from_reader(reader, &tx).await;
1861        });
1862        let probe = handle.abort_handle();
1863        let guard = AbortOnDrop::new(handle.abort_handle());
1864
1865        // Let the parser park on the stalled read.
1866        tokio::task::yield_now().await;
1867        assert!(
1868            !probe.is_finished(),
1869            "parser must still be running (parked on the stalled read) before drop"
1870        );
1871
1872        // Dropping the guard must abort the parser — no SSE_IDLE_TIMEOUT wait.
1873        drop(guard);
1874        tokio::task::yield_now().await;
1875        assert!(
1876            probe.is_finished(),
1877            "guard drop must abort the parser task immediately, not wait out the idle timeout"
1878        );
1879    }
1880
1881    #[tokio::test]
1882    async fn streaming_usage_omitted_when_provider_does_not_send_usage() {
1883        // Backward-compat: a stream that never emits a usage frame must not
1884        // synthesize a zero-valued Usage event. Consumers should treat
1885        // absence as "usage unavailable" rather than "usage was zero."
1886        use std::io::Cursor;
1887
1888        let bytes = b"event: message_start\n\
1889data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"model\":\"claude\"}}\n\n\
1890event: content_block_start\n\
1891data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n\
1892event: content_block_stop\n\
1893data: {\"type\":\"content_block_stop\",\"index\":0}\n\n\
1894event: message_stop\n\
1895data: {\"type\":\"message_stop\"}\n\n";
1896        let reader = tokio::io::BufReader::new(Cursor::new(bytes.as_slice()));
1897        let (tx, mut rx) = tokio::sync::mpsc::channel::<StreamResult<StreamEvent>>(64);
1898        AnthropicModelProvider::parse_anthropic_sse_from_reader(reader, &tx).await;
1899
1900        let mut saw_usage = false;
1901        while let Ok(Some(ev)) =
1902            tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await
1903        {
1904            if matches!(ev, Ok(StreamEvent::Usage(_))) {
1905                saw_usage = true;
1906            }
1907        }
1908        assert!(
1909            !saw_usage,
1910            "must not emit Usage when provider sent no usage frames"
1911        );
1912    }
1913
1914    #[test]
1915    fn creates_with_key() {
1916        let p = AnthropicModelProvider::new("test", Some("anthropic-test-credential"));
1917        assert!(p.credential.is_some());
1918        assert_eq!(p.credential.as_deref(), Some("anthropic-test-credential"));
1919        assert_eq!(p.base_url, "https://api.anthropic.com");
1920    }
1921
1922    #[test]
1923    fn creates_without_key() {
1924        let p = AnthropicModelProvider::new("test", None);
1925        assert!(p.credential.is_none());
1926        assert_eq!(p.base_url, "https://api.anthropic.com");
1927    }
1928
1929    #[test]
1930    fn creates_with_empty_key() {
1931        let p = AnthropicModelProvider::new("test", Some(""));
1932        assert!(p.credential.is_none());
1933    }
1934
1935    #[test]
1936    fn creates_with_whitespace_key() {
1937        let p = AnthropicModelProvider::new("test", Some("  anthropic-test-credential  "));
1938        assert!(p.credential.is_some());
1939        assert_eq!(p.credential.as_deref(), Some("anthropic-test-credential"));
1940    }
1941
1942    #[test]
1943    fn creates_with_custom_base_url() {
1944        let p = AnthropicModelProvider::with_base_url(
1945            "test",
1946            Some("anthropic-credential"),
1947            Some("https://api.example.com"),
1948        );
1949        assert_eq!(p.base_url, "https://api.example.com");
1950        assert_eq!(p.credential.as_deref(), Some("anthropic-credential"));
1951    }
1952
1953    #[test]
1954    fn custom_base_url_trims_trailing_slash() {
1955        let p =
1956            AnthropicModelProvider::with_base_url("test", None, Some("https://api.example.com/"));
1957        assert_eq!(p.base_url, "https://api.example.com");
1958    }
1959
1960    #[test]
1961    fn no_base_url_uses_published_endpoint() {
1962        let p = AnthropicModelProvider::with_base_url("test", None, None);
1963        assert_eq!(p.base_url, "https://api.anthropic.com");
1964    }
1965
1966    #[tokio::test]
1967    async fn chat_fails_without_key() {
1968        let p = AnthropicModelProvider::new("test", None);
1969        let result = p
1970            .chat_with_system(None, "hello", "claude-3-opus", Some(0.7))
1971            .await;
1972        assert!(result.is_err());
1973        let err = result.unwrap_err().to_string();
1974        assert!(
1975            err.contains("credentials not set"),
1976            "Expected key error, got: {err}"
1977        );
1978    }
1979
1980    #[test]
1981    fn setup_token_detection_works() {
1982        assert!(AnthropicModelProvider::is_setup_token(
1983            "sk-ant-oat01-abcdef"
1984        ));
1985        assert!(!AnthropicModelProvider::is_setup_token("sk-ant-api-key"));
1986    }
1987
1988    #[test]
1989    fn apply_auth_uses_bearer_and_beta_for_setup_tokens() {
1990        let model_provider = AnthropicModelProvider::new("test", None);
1991        let request = model_provider
1992            .apply_auth(
1993                model_provider
1994                    .http_client()
1995                    .get("https://api.anthropic.com/v1/models"),
1996                "sk-ant-oat01-test-token",
1997            )
1998            .build()
1999            .expect("request should build");
2000
2001        assert_eq!(
2002            request
2003                .headers()
2004                .get("authorization")
2005                .and_then(|v| v.to_str().ok()),
2006            Some("Bearer sk-ant-oat01-test-token")
2007        );
2008        assert_eq!(
2009            request
2010                .headers()
2011                .get("anthropic-beta")
2012                .and_then(|v| v.to_str().ok()),
2013            Some("claude-code-20250219,oauth-2025-04-20,interleaved-thinking-2025-05-14")
2014        );
2015        assert_eq!(
2016            request
2017                .headers()
2018                .get("anthropic-dangerous-direct-browser-access")
2019                .and_then(|v| v.to_str().ok()),
2020            Some("true")
2021        );
2022        assert!(request.headers().get("x-api-key").is_none());
2023    }
2024
2025    #[test]
2026    fn apply_auth_uses_x_api_key_for_regular_tokens() {
2027        let model_provider = AnthropicModelProvider::new("test", None);
2028        let request = model_provider
2029            .apply_auth(
2030                model_provider
2031                    .http_client()
2032                    .get("https://api.anthropic.com/v1/models"),
2033                "sk-ant-api-key",
2034            )
2035            .build()
2036            .expect("request should build");
2037
2038        assert_eq!(
2039            request
2040                .headers()
2041                .get("x-api-key")
2042                .and_then(|v| v.to_str().ok()),
2043            Some("sk-ant-api-key")
2044        );
2045        assert!(request.headers().get("authorization").is_none());
2046        assert!(request.headers().get("anthropic-beta").is_none());
2047    }
2048
2049    #[tokio::test]
2050    async fn chat_with_system_fails_without_key() {
2051        let p = AnthropicModelProvider::new("test", None);
2052        let result = p
2053            .chat_with_system(
2054                Some("You are ZeroClaw"),
2055                "hello",
2056                "claude-3-opus",
2057                Some(0.7),
2058            )
2059            .await;
2060        assert!(result.is_err());
2061    }
2062
2063    #[test]
2064    fn chat_request_serializes_without_system() {
2065        let req = ChatRequest {
2066            model: "claude-3-opus".to_string(),
2067            max_tokens: 4096,
2068            system: None,
2069            messages: vec![Message {
2070                role: "user".to_string(),
2071                content: "hello".to_string(),
2072            }],
2073            temperature: Some(0.7),
2074        };
2075        let json = serde_json::to_string(&req).unwrap();
2076        assert!(
2077            !json.contains("system"),
2078            "system field should be skipped when None"
2079        );
2080        assert!(json.contains("claude-3-opus"));
2081        assert!(json.contains("hello"));
2082    }
2083
2084    #[test]
2085    fn chat_request_serializes_with_system() {
2086        let req = ChatRequest {
2087            model: "claude-3-opus".to_string(),
2088            max_tokens: 4096,
2089            system: Some("You are ZeroClaw".to_string()),
2090            messages: vec![Message {
2091                role: "user".to_string(),
2092                content: "hello".to_string(),
2093            }],
2094            temperature: Some(0.7),
2095        };
2096        let json = serde_json::to_string(&req).unwrap();
2097        assert!(json.contains("\"system\":\"You are ZeroClaw\""));
2098    }
2099
2100    #[test]
2101    fn chat_response_deserializes() {
2102        let json = r#"{"content":[{"type":"text","text":"Hello there!"}]}"#;
2103        let resp: ChatResponse = serde_json::from_str(json).unwrap();
2104        assert_eq!(resp.content.len(), 1);
2105        assert_eq!(resp.content[0].kind, "text");
2106        assert_eq!(resp.content[0].text.as_deref(), Some("Hello there!"));
2107    }
2108
2109    #[test]
2110    fn chat_response_empty_content() {
2111        let json = r#"{"content":[]}"#;
2112        let resp: ChatResponse = serde_json::from_str(json).unwrap();
2113        assert!(resp.content.is_empty());
2114    }
2115
2116    #[test]
2117    fn chat_response_multiple_blocks() {
2118        let json =
2119            r#"{"content":[{"type":"text","text":"First"},{"type":"text","text":"Second"}]}"#;
2120        let resp: ChatResponse = serde_json::from_str(json).unwrap();
2121        assert_eq!(resp.content.len(), 2);
2122        assert_eq!(resp.content[0].text.as_deref(), Some("First"));
2123        assert_eq!(resp.content[1].text.as_deref(), Some("Second"));
2124    }
2125
2126    #[test]
2127    fn temperature_range_serializes() {
2128        for temp in [0.0, 0.5, 1.0, 2.0] {
2129            let req = ChatRequest {
2130                model: "claude-3-opus".to_string(),
2131                max_tokens: 4096,
2132                system: None,
2133                messages: vec![],
2134                temperature: Some(temp),
2135            };
2136            let json = serde_json::to_string(&req).unwrap();
2137            assert!(json.contains(&format!("{temp}")));
2138        }
2139    }
2140
2141    #[test]
2142    fn anthropic_model_supports_native_thinking_excludes_opus_4_7() {
2143        // Opus 4.7 only supports adaptive thinking; fixed-budget returns 400.
2144        assert!(!anthropic_model_supports_native_thinking("claude-opus-4-7"));
2145        assert!(!anthropic_model_supports_native_thinking(
2146            "claude-opus-4-7-20260101"
2147        ));
2148    }
2149
2150    #[test]
2151    fn anthropic_model_supports_native_thinking_allows_other_models() {
2152        assert!(anthropic_model_supports_native_thinking("claude-opus-4-6"));
2153        assert!(anthropic_model_supports_native_thinking(
2154            "claude-sonnet-4-6"
2155        ));
2156        assert!(anthropic_model_supports_native_thinking("claude-haiku-4-5"));
2157    }
2158
2159    #[test]
2160    fn resolve_thinking_drops_native_for_opus_4_7() {
2161        let provider = AnthropicModelProvider::new("test", Some("test-key"));
2162        let params = zeroclaw_api::model_provider::NativeThinkingParams {
2163            budget_tokens: 10_000,
2164        };
2165        let (temp, config, max_tokens) =
2166            provider.resolve_thinking(Some(params), Some(0.7_f64), "claude-opus-4-7");
2167        assert!(
2168            config.is_none(),
2169            "native thinking should be gated off for opus-4-7"
2170        );
2171        // Caller-supplied temperature is preserved (so per-model omit guard
2172        // can still take effect downstream).
2173        assert!((temp.unwrap() - 0.7_f64).abs() < f64::EPSILON);
2174        assert_eq!(max_tokens, provider.max_tokens);
2175    }
2176
2177    #[test]
2178    fn resolve_thinking_keeps_native_for_supported_models() {
2179        let provider = AnthropicModelProvider::new("test", Some("test-key"));
2180        let params = zeroclaw_api::model_provider::NativeThinkingParams {
2181            budget_tokens: 10_000,
2182        };
2183        let (temp, config, _) =
2184            provider.resolve_thinking(Some(params), Some(0.7_f64), "claude-sonnet-4-6");
2185        assert!(
2186            config.is_some(),
2187            "native thinking should activate on supported models"
2188        );
2189        // Forced to 1.0 per Anthropic native-thinking contract.
2190        assert!((temp.unwrap() - 1.0_f64).abs() < f64::EPSILON);
2191    }
2192
2193    #[test]
2194    fn native_chat_request_serializes_without_temperature_when_none() {
2195        let req = NativeChatRequest {
2196            model: "claude-opus-4-7".to_string(),
2197            max_tokens: 4096,
2198            system: None,
2199            messages: vec![],
2200            temperature: None,
2201            tools: None,
2202            tool_choice: None,
2203            stream: None,
2204            thinking: None,
2205        };
2206        let json = serde_json::to_string(&req).unwrap();
2207        assert!(json.contains("max_tokens"));
2208        assert!(
2209            !json.contains("temperature"),
2210            "expected temperature to be omitted, got: {json}"
2211        );
2212    }
2213
2214    #[test]
2215    fn native_chat_request_serializes_with_temperature_when_some() {
2216        let req = NativeChatRequest {
2217            model: "claude-sonnet-4-6".to_string(),
2218            max_tokens: 4096,
2219            system: None,
2220            messages: vec![],
2221            temperature: Some(0.7),
2222            tools: None,
2223            tool_choice: None,
2224            stream: None,
2225            thinking: None,
2226        };
2227        let json = serde_json::to_string(&req).unwrap();
2228        assert!(
2229            json.contains("\"temperature\":0.7"),
2230            "expected temperature to be present, got: {json}"
2231        );
2232    }
2233
2234    #[test]
2235    fn detects_auth_from_jwt_shape() {
2236        let kind = detect_auth_kind("a.b.c", None);
2237        assert_eq!(kind, AnthropicAuthKind::Authorization);
2238    }
2239
2240    #[test]
2241    fn cache_control_serializes_correctly() {
2242        let cache = CacheControl::ephemeral();
2243        let json = serde_json::to_string(&cache).unwrap();
2244        assert_eq!(json, r#"{"type":"ephemeral"}"#);
2245    }
2246
2247    #[test]
2248    fn system_prompt_string_variant_serializes() {
2249        let prompt = SystemPrompt::String("You are a helpful assistant".to_string());
2250        let json = serde_json::to_string(&prompt).unwrap();
2251        assert_eq!(json, r#""You are a helpful assistant""#);
2252    }
2253
2254    #[test]
2255    fn system_prompt_blocks_variant_serializes() {
2256        let prompt = SystemPrompt::Blocks(vec![SystemBlock {
2257            block_type: "text".to_string(),
2258            text: "You are a helpful assistant".to_string(),
2259            cache_control: Some(CacheControl::ephemeral()),
2260        }]);
2261        let json = serde_json::to_string(&prompt).unwrap();
2262        assert!(json.contains(r#""type":"text""#));
2263        assert!(json.contains("You are a helpful assistant"));
2264        assert!(json.contains(r#""type":"ephemeral""#));
2265    }
2266
2267    #[test]
2268    fn system_prompt_blocks_without_cache_control() {
2269        let prompt = SystemPrompt::Blocks(vec![SystemBlock {
2270            block_type: "text".to_string(),
2271            text: "Short prompt".to_string(),
2272            cache_control: None,
2273        }]);
2274        let json = serde_json::to_string(&prompt).unwrap();
2275        assert!(json.contains("Short prompt"));
2276        assert!(!json.contains("cache_control"));
2277    }
2278
2279    #[test]
2280    fn native_content_text_without_cache_control() {
2281        let content = NativeContentOut::Text {
2282            text: "Hello".to_string(),
2283            cache_control: None,
2284        };
2285        let json = serde_json::to_string(&content).unwrap();
2286        assert!(json.contains(r#""type":"text""#));
2287        assert!(json.contains("Hello"));
2288        assert!(!json.contains("cache_control"));
2289    }
2290
2291    #[test]
2292    fn native_content_text_with_cache_control() {
2293        let content = NativeContentOut::Text {
2294            text: "Hello".to_string(),
2295            cache_control: Some(CacheControl::ephemeral()),
2296        };
2297        let json = serde_json::to_string(&content).unwrap();
2298        assert!(json.contains(r#""type":"text""#));
2299        assert!(json.contains("Hello"));
2300        assert!(json.contains(r#""cache_control":{"type":"ephemeral"}"#));
2301    }
2302
2303    #[test]
2304    fn native_content_tool_use_without_cache_control() {
2305        let content = NativeContentOut::ToolUse {
2306            id: "tool_123".to_string(),
2307            name: "get_weather".to_string(),
2308            input: serde_json::json!({"location": "San Francisco"}),
2309            cache_control: None,
2310        };
2311        let json = serde_json::to_string(&content).unwrap();
2312        assert!(json.contains(r#""type":"tool_use""#));
2313        assert!(json.contains("tool_123"));
2314        assert!(json.contains("get_weather"));
2315        assert!(!json.contains("cache_control"));
2316    }
2317
2318    #[test]
2319    fn native_content_tool_result_with_cache_control() {
2320        let content = NativeContentOut::ToolResult {
2321            tool_use_id: "tool_123".to_string(),
2322            content: "Result data".to_string(),
2323            cache_control: Some(CacheControl::ephemeral()),
2324        };
2325        let json = serde_json::to_string(&content).unwrap();
2326        assert!(json.contains(r#""type":"tool_result""#));
2327        assert!(json.contains("tool_123"));
2328        assert!(json.contains("Result data"));
2329        assert!(json.contains(r#""cache_control":{"type":"ephemeral"}"#));
2330    }
2331
2332    #[test]
2333    fn native_tool_spec_without_cache_control() {
2334        let schema = serde_json::json!({"type": "object"});
2335        let tool = NativeToolSpec {
2336            name: "get_weather",
2337            description: "Get weather info",
2338            input_schema: &schema,
2339            cache_control: None,
2340        };
2341        let json = serde_json::to_string(&tool).unwrap();
2342        assert!(json.contains("get_weather"));
2343        assert!(!json.contains("cache_control"));
2344    }
2345
2346    #[test]
2347    fn native_tool_spec_with_cache_control() {
2348        let schema = serde_json::json!({"type": "object"});
2349        let tool = NativeToolSpec {
2350            name: "get_weather",
2351            description: "Get weather info",
2352            input_schema: &schema,
2353            cache_control: Some(CacheControl::ephemeral()),
2354        };
2355        let json = serde_json::to_string(&tool).unwrap();
2356        assert!(json.contains("get_weather"));
2357        assert!(json.contains(r#""cache_control":{"type":"ephemeral"}"#));
2358    }
2359
2360    #[test]
2361    fn should_cache_conversation_short() {
2362        let messages = vec![
2363            ChatMessage {
2364                role: "system".to_string(),
2365                content: "System prompt".to_string(),
2366            },
2367            ChatMessage {
2368                role: "user".to_string(),
2369                content: "Hello".to_string(),
2370            },
2371        ];
2372        // Only 1 non-system message — should not cache
2373        assert!(!AnthropicModelProvider::should_cache_conversation(
2374            &messages
2375        ));
2376    }
2377
2378    #[test]
2379    fn should_cache_conversation_long() {
2380        let mut messages = vec![ChatMessage {
2381            role: "system".to_string(),
2382            content: "System prompt".to_string(),
2383        }];
2384        // Add 3 non-system messages
2385        for i in 0..3 {
2386            messages.push(ChatMessage {
2387                role: if i % 2 == 0 { "user" } else { "assistant" }.to_string(),
2388                content: format!("Message {i}"),
2389            });
2390        }
2391        assert!(AnthropicModelProvider::should_cache_conversation(&messages));
2392    }
2393
2394    #[test]
2395    fn should_cache_conversation_boundary() {
2396        let messages = vec![ChatMessage {
2397            role: "user".to_string(),
2398            content: "Hello".to_string(),
2399        }];
2400        // Exactly 1 non-system message — should not cache
2401        assert!(!AnthropicModelProvider::should_cache_conversation(
2402            &messages
2403        ));
2404
2405        // Add one more to cross boundary (>1)
2406        let messages = vec![
2407            ChatMessage {
2408                role: "user".to_string(),
2409                content: "Hello".to_string(),
2410            },
2411            ChatMessage {
2412                role: "assistant".to_string(),
2413                content: "Hi".to_string(),
2414            },
2415        ];
2416        assert!(AnthropicModelProvider::should_cache_conversation(&messages));
2417    }
2418
2419    #[test]
2420    fn apply_cache_to_last_message_text() {
2421        let mut messages = vec![NativeMessage {
2422            role: "user".to_string(),
2423            content: vec![NativeContentOut::Text {
2424                text: "Hello".to_string(),
2425                cache_control: None,
2426            }],
2427        }];
2428
2429        AnthropicModelProvider::apply_cache_to_last_message(&mut messages);
2430
2431        match &messages[0].content[0] {
2432            NativeContentOut::Text { cache_control, .. } => {
2433                assert!(cache_control.is_some());
2434            }
2435            _ => panic!("Expected Text variant"),
2436        }
2437    }
2438
2439    #[test]
2440    fn apply_cache_to_last_message_tool_result() {
2441        let mut messages = vec![NativeMessage {
2442            role: "user".to_string(),
2443            content: vec![NativeContentOut::ToolResult {
2444                tool_use_id: "tool_123".to_string(),
2445                content: "Result".to_string(),
2446                cache_control: None,
2447            }],
2448        }];
2449
2450        AnthropicModelProvider::apply_cache_to_last_message(&mut messages);
2451
2452        match &messages[0].content[0] {
2453            NativeContentOut::ToolResult { cache_control, .. } => {
2454                assert!(cache_control.is_some());
2455            }
2456            _ => panic!("Expected ToolResult variant"),
2457        }
2458    }
2459
2460    #[test]
2461    fn apply_cache_to_last_message_does_not_affect_tool_use() {
2462        let mut messages = vec![NativeMessage {
2463            role: "assistant".to_string(),
2464            content: vec![NativeContentOut::ToolUse {
2465                id: "tool_123".to_string(),
2466                name: "get_weather".to_string(),
2467                input: serde_json::json!({}),
2468                cache_control: None,
2469            }],
2470        }];
2471
2472        AnthropicModelProvider::apply_cache_to_last_message(&mut messages);
2473
2474        // ToolUse should not be affected
2475        match &messages[0].content[0] {
2476            NativeContentOut::ToolUse { cache_control, .. } => {
2477                assert!(cache_control.is_none());
2478            }
2479            _ => panic!("Expected ToolUse variant"),
2480        }
2481    }
2482
2483    #[test]
2484    fn apply_cache_empty_messages() {
2485        let mut messages = vec![];
2486        AnthropicModelProvider::apply_cache_to_last_message(&mut messages);
2487        // Should not panic
2488        assert!(messages.is_empty());
2489    }
2490
2491    #[test]
2492    fn convert_tools_adds_cache_to_last_tool() {
2493        let tools = vec![
2494            ToolSpec {
2495                name: "tool1".to_string(),
2496                description: "First tool".to_string(),
2497                parameters: serde_json::json!({"type": "object"}),
2498            },
2499            ToolSpec {
2500                name: "tool2".to_string(),
2501                description: "Second tool".to_string(),
2502                parameters: serde_json::json!({"type": "object"}),
2503            },
2504        ];
2505
2506        let native_tools = AnthropicModelProvider::convert_tools(Some(&tools)).unwrap();
2507
2508        assert_eq!(native_tools.len(), 2);
2509        assert!(native_tools[0].cache_control.is_none());
2510        assert!(native_tools[1].cache_control.is_some());
2511    }
2512
2513    #[test]
2514    fn convert_tools_single_tool_gets_cache() {
2515        let tools = vec![ToolSpec {
2516            name: "tool1".to_string(),
2517            description: "Only tool".to_string(),
2518            parameters: serde_json::json!({"type": "object"}),
2519        }];
2520
2521        let native_tools = AnthropicModelProvider::convert_tools(Some(&tools)).unwrap();
2522
2523        assert_eq!(native_tools.len(), 1);
2524        assert!(native_tools[0].cache_control.is_some());
2525    }
2526
2527    #[test]
2528    fn convert_messages_small_system_prompt_uses_blocks_with_cache() {
2529        let messages = vec![ChatMessage {
2530            role: "system".to_string(),
2531            content: "Short system prompt".to_string(),
2532        }];
2533
2534        let (system_prompt, _) = AnthropicModelProvider::convert_messages(&messages);
2535
2536        match system_prompt.unwrap() {
2537            SystemPrompt::Blocks(blocks) => {
2538                assert_eq!(blocks.len(), 1);
2539                assert_eq!(blocks[0].text, "Short system prompt");
2540                assert!(
2541                    blocks[0].cache_control.is_some(),
2542                    "Small system prompts should have cache_control"
2543                );
2544            }
2545            SystemPrompt::String(_) => {
2546                panic!("Expected Blocks variant with cache_control for small prompt")
2547            }
2548        }
2549    }
2550
2551    #[test]
2552    fn convert_messages_large_system_prompt() {
2553        let large_content = "a".repeat(3073);
2554        let messages = vec![ChatMessage {
2555            role: "system".to_string(),
2556            content: large_content.clone(),
2557        }];
2558
2559        let (system_prompt, _) = AnthropicModelProvider::convert_messages(&messages);
2560
2561        match system_prompt.unwrap() {
2562            SystemPrompt::Blocks(blocks) => {
2563                assert_eq!(blocks.len(), 1);
2564                assert_eq!(blocks[0].text, large_content);
2565                assert!(blocks[0].cache_control.is_some());
2566            }
2567            SystemPrompt::String(_) => panic!("Expected Blocks variant for large prompt"),
2568        }
2569    }
2570
2571    #[test]
2572    fn native_chat_request_with_blocks_system() {
2573        // System prompts now always use Blocks format with cache_control
2574        let req = NativeChatRequest {
2575            model: "claude-3-opus".to_string(),
2576            max_tokens: 4096,
2577            system: Some(SystemPrompt::Blocks(vec![SystemBlock {
2578                block_type: "text".to_string(),
2579                text: "System".to_string(),
2580                cache_control: Some(CacheControl::ephemeral()),
2581            }])),
2582            messages: vec![NativeMessage {
2583                role: "user".to_string(),
2584                content: vec![NativeContentOut::Text {
2585                    text: "Hello".to_string(),
2586                    cache_control: None,
2587                }],
2588            }],
2589            temperature: Some(0.7),
2590            tools: None,
2591            tool_choice: None,
2592            stream: None,
2593            thinking: None,
2594        };
2595
2596        let json = serde_json::to_string(&req).unwrap();
2597        assert!(json.contains("System"));
2598        assert!(
2599            json.contains(r#""cache_control":{"type":"ephemeral"}"#),
2600            "System prompt should include cache_control"
2601        );
2602    }
2603
2604    #[test]
2605    fn native_chat_request_omits_temperature_when_none() {
2606        let req = NativeChatRequest {
2607            model: "claude-opus-4-7".to_string(),
2608            max_tokens: 4096,
2609            system: None,
2610            messages: vec![NativeMessage {
2611                role: "user".to_string(),
2612                content: vec![NativeContentOut::Text {
2613                    text: "hi".to_string(),
2614                    cache_control: None,
2615                }],
2616            }],
2617            temperature: None,
2618            tools: None,
2619            tool_choice: None,
2620            stream: None,
2621            thinking: None,
2622        };
2623
2624        let json = serde_json::to_string(&req).unwrap();
2625        assert!(
2626            !json.contains("temperature"),
2627            "temperature should be omitted when None; got: {json}"
2628        );
2629    }
2630
2631    #[tokio::test]
2632    async fn warmup_without_key_is_noop() {
2633        let model_provider = AnthropicModelProvider::new("test", None);
2634        let result = model_provider.warmup().await;
2635        assert!(result.is_ok());
2636    }
2637
2638    #[test]
2639    fn convert_messages_preserves_multi_turn_history() {
2640        let messages = vec![
2641            ChatMessage {
2642                role: "system".to_string(),
2643                content: "You are helpful.".to_string(),
2644            },
2645            ChatMessage {
2646                role: "user".to_string(),
2647                content: "gen a 2 sum in golang".to_string(),
2648            },
2649            ChatMessage {
2650                role: "assistant".to_string(),
2651                content: "```go\nfunc twoSum(nums []int) {}\n```".to_string(),
2652            },
2653            ChatMessage {
2654                role: "user".to_string(),
2655                content: "what's meaning of make here?".to_string(),
2656            },
2657        ];
2658
2659        let (system, native_msgs) = AnthropicModelProvider::convert_messages(&messages);
2660
2661        // System prompt extracted
2662        assert!(system.is_some());
2663        // All 3 non-system messages preserved in order
2664        assert_eq!(native_msgs.len(), 3);
2665        assert_eq!(native_msgs[0].role, "user");
2666        assert_eq!(native_msgs[1].role, "assistant");
2667        assert_eq!(native_msgs[2].role, "user");
2668    }
2669
2670    /// Integration test: spin up a mock Anthropic API server, call chat_with_tools
2671    /// with a multi-turn conversation + tools, and verify the request body contains
2672    /// ALL conversation turns and native tool definitions.
2673    #[tokio::test]
2674    async fn chat_with_tools_sends_full_history_and_native_tools() {
2675        use axum::{Json, Router, routing::post};
2676        use std::sync::{Arc, Mutex};
2677        use tokio::net::TcpListener;
2678
2679        // Captured request body for assertion
2680        let captured: Arc<Mutex<Option<serde_json::Value>>> = Arc::new(Mutex::new(None));
2681        let captured_clone = captured.clone();
2682
2683        let app = Router::new().route(
2684            "/v1/messages",
2685            post(move |Json(body): Json<serde_json::Value>| {
2686                let cap = captured_clone.clone();
2687                async move {
2688                    *cap.lock().unwrap() = Some(body);
2689                    // Return a minimal valid Anthropic response
2690                    Json(serde_json::json!({
2691                        "id": "msg_test",
2692                        "type": "message",
2693                        "role": "assistant",
2694                        "content": [{"type": "text", "text": "The make function creates a map."}],
2695                        "model": "claude-opus-4-6",
2696                        "stop_reason": "end_turn",
2697                        "usage": {"input_tokens": 100, "output_tokens": 20}
2698                    }))
2699                }
2700            }),
2701        );
2702
2703        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2704        let addr = listener.local_addr().unwrap();
2705        let server_handle = zeroclaw_spawn::spawn!(async move {
2706            axum::serve(listener, app).await.unwrap();
2707        });
2708
2709        // Create model_provider pointing at mock server
2710        let model_provider = AnthropicModelProvider {
2711            alias: "test".to_string(),
2712            credential: Some("test-key".to_string()),
2713            base_url: format!("http://{addr}"),
2714            max_tokens: 4096,
2715        };
2716
2717        // Multi-turn conversation: system → user (Go code) → assistant (code response) → user (follow-up)
2718        let messages = vec![
2719            ChatMessage::system("You are a helpful assistant."),
2720            ChatMessage::user("gen a 2 sum in golang"),
2721            ChatMessage::assistant(
2722                "```go\nfunc twoSum(nums []int, target int) []int {\n    m := make(map[int]int)\n    for i, n := range nums {\n        if j, ok := m[target-n]; ok {\n            return []int{j, i}\n        }\n        m[n] = i\n    }\n    return nil\n}\n```",
2723            ),
2724            ChatMessage::user("what's meaning of make here?"),
2725        ];
2726
2727        let tools = vec![serde_json::json!({
2728            "type": "function",
2729            "function": {
2730                "name": "shell",
2731                "description": "Run a shell command",
2732                "parameters": {
2733                    "type": "object",
2734                    "properties": {
2735                        "command": {"type": "string"}
2736                    },
2737                    "required": ["command"]
2738                }
2739            }
2740        })];
2741
2742        let result = model_provider
2743            .chat_with_tools(&messages, &tools, "claude-opus-4-6", Some(0.7))
2744            .await;
2745        assert!(result.is_ok(), "chat_with_tools failed: {:?}", result.err());
2746
2747        let body = captured
2748            .lock()
2749            .unwrap()
2750            .take()
2751            .expect("No request captured");
2752
2753        // Verify system prompt extracted to top-level field
2754        let system = &body["system"];
2755        assert!(
2756            system.to_string().contains("helpful assistant"),
2757            "System prompt missing: {system}"
2758        );
2759
2760        // Verify ALL conversation turns present in messages array
2761        let msgs = body["messages"].as_array().expect("messages not an array");
2762        assert_eq!(
2763            msgs.len(),
2764            3,
2765            "Expected 3 messages (2 user + 1 assistant), got {}",
2766            msgs.len()
2767        );
2768
2769        // Turn 1: user with Go request
2770        assert_eq!(msgs[0]["role"], "user");
2771        let turn1_text = msgs[0]["content"].to_string();
2772        assert!(
2773            turn1_text.contains("2 sum"),
2774            "Turn 1 missing Go request: {turn1_text}"
2775        );
2776
2777        // Turn 2: assistant with Go code
2778        assert_eq!(msgs[1]["role"], "assistant");
2779        let turn2_text = msgs[1]["content"].to_string();
2780        assert!(
2781            turn2_text.contains("make(map[int]int)"),
2782            "Turn 2 missing Go code: {turn2_text}"
2783        );
2784
2785        // Turn 3: user follow-up
2786        assert_eq!(msgs[2]["role"], "user");
2787        let turn3_text = msgs[2]["content"].to_string();
2788        assert!(
2789            turn3_text.contains("meaning of make"),
2790            "Turn 3 missing follow-up: {turn3_text}"
2791        );
2792
2793        // Verify native tools are present
2794        let api_tools = body["tools"].as_array().expect("tools not an array");
2795        assert_eq!(api_tools.len(), 1);
2796        assert_eq!(api_tools[0]["name"], "shell");
2797        assert!(
2798            api_tools[0]["input_schema"].is_object(),
2799            "Missing input_schema"
2800        );
2801
2802        server_handle.abort();
2803    }
2804
2805    #[test]
2806    fn native_response_parses_usage() {
2807        let json = r#"{
2808            "content": [{"type": "text", "text": "Hello"}],
2809            "usage": {"input_tokens": 300, "output_tokens": 75}
2810        }"#;
2811        let resp: NativeChatResponse = serde_json::from_str(json).unwrap();
2812        let result = AnthropicModelProvider::parse_native_response(resp);
2813        let usage = result.usage.unwrap();
2814        assert_eq!(usage.input_tokens, Some(300));
2815        assert_eq!(usage.output_tokens, Some(75));
2816    }
2817
2818    #[test]
2819    fn native_response_sums_all_three_anthropic_input_buckets() {
2820        // Per https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching:
2821        //   total_input = cache_read + cache_creation + input_tokens
2822        // where Anthropic's `input_tokens` is *only* the tokens after the
2823        // last cache breakpoint. The three buckets are disjoint.
2824        //
2825        // This is the most common live shape on a long Anthropic session:
2826        // huge cache_read, tiny input_tokens, occasional cache_creation as
2827        // the conversation grows past the previous breakpoint.
2828        let json = r#"{
2829            "content": [{"type": "text", "text": "ok"}],
2830            "usage": {
2831                "input_tokens": 1,
2832                "cache_read_input_tokens": 148539,
2833                "cache_creation_input_tokens": 4200,
2834                "output_tokens": 27
2835            }
2836        }"#;
2837        let resp: NativeChatResponse = serde_json::from_str(json).unwrap();
2838        let result = AnthropicModelProvider::parse_native_response(resp);
2839        let usage = result.usage.expect("usage should be Some");
2840        assert_eq!(
2841            usage.input_tokens,
2842            Some(152_740),
2843            "total = 1 (after-breakpoint) + 148539 (cache_read) + 4200 (cache_creation)"
2844        );
2845        assert_eq!(
2846            usage.cached_input_tokens,
2847            Some(148_539),
2848            "cached_input_tokens is the cache-read portion only \
2849             (the discount-billed subset of the total)"
2850        );
2851        assert_eq!(usage.output_tokens, Some(27));
2852    }
2853
2854    #[test]
2855    fn native_response_parses_without_usage() {
2856        let json = r#"{"content": [{"type": "text", "text": "Hello"}]}"#;
2857        let resp: NativeChatResponse = serde_json::from_str(json).unwrap();
2858        let result = AnthropicModelProvider::parse_native_response(resp);
2859        assert!(result.usage.is_none());
2860    }
2861
2862    #[test]
2863    fn native_response_preserves_thinking_text_byte_for_byte() {
2864        // Signatures on extended-thinking blocks are computed over the exact
2865        // bytes the model returned. Any mutation — including trim() — breaks
2866        // signature validation on replay in a multi-turn tool-use conversation.
2867        let json = r#"{
2868            "content": [
2869                {
2870                    "type": "thinking",
2871                    "thinking": "  \nStep 1: consider the request.\nStep 2: respond.\n  ",
2872                    "signature": "sig_abc123"
2873                },
2874                {"type": "text", "text": "ok"}
2875            ]
2876        }"#;
2877        let resp: NativeChatResponse = serde_json::from_str(json).unwrap();
2878        let result = AnthropicModelProvider::parse_native_response(resp);
2879        let reasoning = result.reasoning_content.expect("thinking preserved");
2880        let parsed: serde_json::Value = serde_json::from_str(&reasoning).unwrap();
2881        assert_eq!(
2882            parsed.get("thinking").and_then(|v| v.as_str()),
2883            Some("  \nStep 1: consider the request.\nStep 2: respond.\n  ")
2884        );
2885        assert_eq!(
2886            parsed.get("signature").and_then(|v| v.as_str()),
2887            Some("sig_abc123")
2888        );
2889    }
2890
2891    #[test]
2892    fn native_response_drops_empty_thinking_blocks() {
2893        let json = r#"{
2894            "content": [
2895                {"type": "thinking", "thinking": "", "signature": "sig_xyz"},
2896                {"type": "text", "text": "hello"}
2897            ]
2898        }"#;
2899        let resp: NativeChatResponse = serde_json::from_str(json).unwrap();
2900        let result = AnthropicModelProvider::parse_native_response(resp);
2901        assert!(result.reasoning_content.is_none());
2902    }
2903
2904    #[test]
2905    fn capabilities_returns_vision_and_native_tools() {
2906        let model_provider = AnthropicModelProvider::new("test", Some("test-key"));
2907        let caps = model_provider.capabilities();
2908        assert!(
2909            caps.native_tool_calling,
2910            "Anthropic should support native tool calling"
2911        );
2912        assert!(caps.vision, "Anthropic should support vision");
2913    }
2914
2915    #[test]
2916    fn convert_messages_with_image_marker_data_uri() {
2917        let messages = vec![ChatMessage {
2918            role: "user".to_string(),
2919            content: "Check this image: [IMAGE:data:image/jpeg;base64,/9j/4AAQ] What do you see?"
2920                .to_string(),
2921        }];
2922
2923        let (_, native_msgs) = AnthropicModelProvider::convert_messages(&messages);
2924
2925        assert_eq!(native_msgs.len(), 1);
2926        assert_eq!(native_msgs[0].role, "user");
2927        // Should have 2 content blocks: image + text
2928        assert_eq!(native_msgs[0].content.len(), 2);
2929
2930        // First block should be image
2931        match &native_msgs[0].content[0] {
2932            NativeContentOut::Image { source } => {
2933                assert_eq!(source.source_type, "base64");
2934                assert_eq!(source.media_type, "image/jpeg");
2935                assert_eq!(source.data, "/9j/4AAQ");
2936            }
2937            _ => panic!("Expected Image content block"),
2938        }
2939
2940        // Second block should be text (parse_image_markers may leave extra spaces)
2941        match &native_msgs[0].content[1] {
2942            NativeContentOut::Text { text, .. } => {
2943                // The text may have extra spaces where the marker was removed
2944                assert!(
2945                    text.contains("Check this image:") && text.contains("What do you see?"),
2946                    "Expected text to contain 'Check this image:' and 'What do you see?', got: {}",
2947                    text
2948                );
2949            }
2950            _ => panic!("Expected Text content block"),
2951        }
2952    }
2953
2954    #[test]
2955    fn convert_messages_with_only_image_marker() {
2956        let messages = vec![ChatMessage {
2957            role: "user".to_string(),
2958            content: "[IMAGE:data:image/png;base64,iVBORw0KGgo]".to_string(),
2959        }];
2960
2961        let (_, native_msgs) = AnthropicModelProvider::convert_messages(&messages);
2962
2963        assert_eq!(native_msgs.len(), 1);
2964        assert_eq!(native_msgs[0].content.len(), 2);
2965
2966        // First block should be image
2967        match &native_msgs[0].content[0] {
2968            NativeContentOut::Image { source } => {
2969                assert_eq!(source.media_type, "image/png");
2970            }
2971            _ => panic!("Expected Image content block"),
2972        }
2973
2974        // Second block should be placeholder text
2975        match &native_msgs[0].content[1] {
2976            NativeContentOut::Text { text, .. } => {
2977                assert_eq!(text, "[image]");
2978            }
2979            _ => panic!("Expected Text content block with [image] placeholder"),
2980        }
2981    }
2982
2983    #[test]
2984    fn convert_messages_without_image_marker() {
2985        let messages = vec![ChatMessage {
2986            role: "user".to_string(),
2987            content: "Hello, how are you?".to_string(),
2988        }];
2989
2990        let (_, native_msgs) = AnthropicModelProvider::convert_messages(&messages);
2991
2992        assert_eq!(native_msgs.len(), 1);
2993        assert_eq!(native_msgs[0].content.len(), 1);
2994
2995        match &native_msgs[0].content[0] {
2996            NativeContentOut::Text { text, .. } => {
2997                assert_eq!(text, "Hello, how are you?");
2998            }
2999            _ => panic!("Expected Text content block"),
3000        }
3001    }
3002
3003    #[test]
3004    fn image_content_serializes_correctly() {
3005        let content = NativeContentOut::Image {
3006            source: ImageSource {
3007                source_type: "base64".to_string(),
3008                media_type: "image/jpeg".to_string(),
3009                data: "testdata".to_string(),
3010            },
3011        };
3012        let json = serde_json::to_string(&content).unwrap();
3013        // The outer "type" is the enum tag, inner "type" (source_type) is renamed
3014        assert!(json.contains(r#""type":"image""#), "JSON: {}", json);
3015        assert!(json.contains(r#""type":"base64""#), "JSON: {}", json); // source_type is serialized as "type"
3016        assert!(
3017            json.contains(r#""media_type":"image/jpeg""#),
3018            "JSON: {}",
3019            json
3020        );
3021        assert!(json.contains(r#""data":"testdata""#), "JSON: {}", json);
3022    }
3023
3024    #[test]
3025    fn convert_messages_merges_consecutive_tool_results() {
3026        // Simulate a multi-tool-call turn: assistant with two tool_use blocks
3027        // followed by two separate tool result messages.
3028        let messages = vec![
3029            ChatMessage {
3030                role: "system".to_string(),
3031                content: "You are helpful.".to_string(),
3032            },
3033            ChatMessage {
3034                role: "user".to_string(),
3035                content: "Do two things.".to_string(),
3036            },
3037            ChatMessage {
3038                role: "assistant".to_string(),
3039                content: serde_json::json!({
3040                    "content": "",
3041                    "tool_calls": [
3042                        {"id": "call_1", "name": "shell", "arguments": "{\"command\":\"ls\"}"},
3043                        {"id": "call_2", "name": "shell", "arguments": "{\"command\":\"pwd\"}"}
3044                    ]
3045                })
3046                .to_string(),
3047            },
3048            ChatMessage {
3049                role: "tool".to_string(),
3050                content: serde_json::json!({
3051                    "tool_call_id": "call_1",
3052                    "content": "file1.txt\nfile2.txt"
3053                })
3054                .to_string(),
3055            },
3056            ChatMessage {
3057                role: "tool".to_string(),
3058                content: serde_json::json!({
3059                    "tool_call_id": "call_2",
3060                    "content": "/home/user"
3061                })
3062                .to_string(),
3063            },
3064        ];
3065
3066        let (system, native_msgs) = AnthropicModelProvider::convert_messages(&messages);
3067
3068        assert!(system.is_some());
3069        // Should be: user, assistant, user (merged tool results)
3070        // NOT: user, assistant, user, user (which Anthropic rejects)
3071        assert_eq!(
3072            native_msgs.len(),
3073            3,
3074            "Expected 3 messages (user, assistant, merged tool results), got {}.\nRoles: {:?}",
3075            native_msgs.len(),
3076            native_msgs.iter().map(|m| &m.role).collect::<Vec<_>>()
3077        );
3078        assert_eq!(native_msgs[0].role, "user");
3079        assert_eq!(native_msgs[1].role, "assistant");
3080        assert_eq!(native_msgs[2].role, "user");
3081        // The merged user message should contain both tool results
3082        assert_eq!(
3083            native_msgs[2].content.len(),
3084            2,
3085            "Expected 2 tool_result blocks in merged message"
3086        );
3087    }
3088
3089    #[test]
3090    fn convert_messages_backfills_orphaned_tool_use() {
3091        // A turn interrupted mid-flight: assistant emitted a tool_use but the
3092        // matching tool_result was never persisted, and a new user message
3093        // follows. Sending this raw is a hard 400. The converter must
3094        // synthesize a stub tool_result so the history stays well-formed.
3095        let messages = vec![
3096            ChatMessage {
3097                role: "user".to_string(),
3098                content: "Do a thing.".to_string(),
3099            },
3100            ChatMessage {
3101                role: "assistant".to_string(),
3102                content: serde_json::json!({
3103                    "content": "",
3104                    "tool_calls": [
3105                        {"id": "orphan_1", "name": "shell", "arguments": "{\"command\":\"ls\"}"}
3106                    ]
3107                })
3108                .to_string(),
3109            },
3110            ChatMessage {
3111                role: "user".to_string(),
3112                content: "Actually, never mind.".to_string(),
3113            },
3114        ];
3115
3116        let (_, native_msgs) = AnthropicModelProvider::convert_messages(&messages);
3117
3118        let assistant_idx = native_msgs
3119            .iter()
3120            .position(|m| m.role == "assistant")
3121            .expect("assistant message present");
3122        let next = native_msgs
3123            .get(assistant_idx + 1)
3124            .expect("a message must follow the tool_use");
3125
3126        let has_stub = next.content.iter().any(|block| {
3127            matches!(
3128                block,
3129                NativeContentOut::ToolResult { tool_use_id, .. } if tool_use_id == "orphan_1"
3130            )
3131        });
3132        assert!(
3133            has_stub,
3134            "orphaned tool_use should be answered by a synthesized tool_result"
3135        );
3136
3137        // The tool_result must lead its message (before sibling text).
3138        assert!(
3139            matches!(
3140                next.content.first(),
3141                Some(NativeContentOut::ToolResult { .. })
3142            ),
3143            "tool_result must precede any text in the user message"
3144        );
3145    }
3146
3147    #[test]
3148    fn convert_messages_backfills_trailing_orphaned_tool_use() {
3149        // The interrupted tool_use is the very last thing in history with no
3150        // following message at all. A tool_result message must be appended.
3151        let messages = vec![
3152            ChatMessage {
3153                role: "user".to_string(),
3154                content: "Do a thing.".to_string(),
3155            },
3156            ChatMessage {
3157                role: "assistant".to_string(),
3158                content: serde_json::json!({
3159                    "content": "",
3160                    "tool_calls": [
3161                        {"id": "trailing_1", "name": "shell", "arguments": "{}"}
3162                    ]
3163                })
3164                .to_string(),
3165            },
3166        ];
3167
3168        let (_, native_msgs) = AnthropicModelProvider::convert_messages(&messages);
3169
3170        let last = native_msgs.last().expect("messages present");
3171        assert_eq!(last.role, "user");
3172        assert!(
3173            last.content.iter().any(|block| matches!(
3174                block,
3175                NativeContentOut::ToolResult { tool_use_id, .. } if tool_use_id == "trailing_1"
3176            )),
3177            "trailing orphaned tool_use should get an appended tool_result message"
3178        );
3179    }
3180
3181    #[test]
3182    fn convert_messages_no_adjacent_same_role() {
3183        // Verify that convert_messages never produces adjacent messages with the
3184        // same role, regardless of input ordering.
3185        let messages = vec![
3186            ChatMessage {
3187                role: "user".to_string(),
3188                content: "Hello".to_string(),
3189            },
3190            ChatMessage {
3191                role: "assistant".to_string(),
3192                content: serde_json::json!({
3193                    "content": "I'll run a command",
3194                    "tool_calls": [
3195                        {"id": "tc1", "name": "shell", "arguments": "{\"command\":\"echo hi\"}"}
3196                    ]
3197                })
3198                .to_string(),
3199            },
3200            ChatMessage {
3201                role: "tool".to_string(),
3202                content: serde_json::json!({
3203                    "tool_call_id": "tc1",
3204                    "content": "hi"
3205                })
3206                .to_string(),
3207            },
3208            ChatMessage {
3209                role: "user".to_string(),
3210                content: "Thanks!".to_string(),
3211            },
3212        ];
3213
3214        let (_system, native_msgs) = AnthropicModelProvider::convert_messages(&messages);
3215
3216        for window in native_msgs.windows(2) {
3217            assert_ne!(
3218                window[0].role, window[1].role,
3219                "Adjacent messages must not share the same role: found two '{}' messages in a row",
3220                window[0].role
3221            );
3222        }
3223    }
3224}