Skip to main content

zeroclaw_tools/
mcp_transport.rs

1//! MCP transport abstraction — supports stdio, SSE, and HTTP transports.
2
3use std::borrow::Cow;
4
5use anyhow::{Context, Result, bail};
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::process::{Child, Command};
8use tokio::sync::{Mutex, Notify, oneshot};
9use tokio::time::{Duration, timeout};
10use tokio_stream::StreamExt;
11
12use crate::mcp_protocol::{INTERNAL_ERROR, JsonRpcError, JsonRpcRequest, JsonRpcResponse};
13use zeroclaw_config::schema::{McpServerConfig, McpTransport};
14
15/// Maximum bytes for a single JSON-RPC response.
16const MAX_LINE_BYTES: usize = 4 * 1024 * 1024; // 4 MB
17
18/// Timeout for init/list operations.
19const RECV_TIMEOUT_SECS: u64 = 30;
20
21/// Legacy default HTTP request timeout for non-tool MCP HTTP/SSE requests.
22const DEFAULT_HTTP_REQUEST_TIMEOUT_SECS: u64 = 120;
23
24/// JSON-RPC method name for MCP tool calls.
25const TOOLS_CALL_METHOD: &str = "tools/call";
26
27/// Streamable HTTP Accept header required by MCP HTTP transport.
28const MCP_STREAMABLE_ACCEPT: &str = "application/json, text/event-stream";
29
30/// Default media type for MCP JSON-RPC request bodies.
31const MCP_JSON_CONTENT_TYPE: &str = "application/json";
32/// Streamable HTTP session header used to preserve MCP server state.
33const MCP_SESSION_ID_HEADER: &str = "Mcp-Session-Id";
34
35fn http_request_timeout_secs(
36    request: &JsonRpcRequest,
37    tool_timeout_secs: Option<u64>,
38) -> Option<u64> {
39    if request.method == TOOLS_CALL_METHOD {
40        tool_timeout_secs
41    } else {
42        Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS)
43    }
44}
45
46fn http_sse_read_timeout_secs(
47    request: &JsonRpcRequest,
48    tool_timeout_secs: Option<u64>,
49) -> Option<u64> {
50    if request.method == TOOLS_CALL_METHOD {
51        tool_timeout_secs
52    } else {
53        Some(RECV_TIMEOUT_SECS)
54    }
55}
56
57fn apply_request_timeout(
58    req: reqwest::RequestBuilder,
59    timeout_secs: Option<u64>,
60) -> reqwest::RequestBuilder {
61    if let Some(timeout_secs) = timeout_secs {
62        req.timeout(Duration::from_secs(timeout_secs))
63    } else {
64        req
65    }
66}
67
68// ── Transport Trait ──────────────────────────────────────────────────────
69
70/// Abstract transport for MCP communication.
71#[async_trait::async_trait]
72pub trait McpTransportConn: Send + Sync {
73    /// Send a JSON-RPC request and receive the response.
74    async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse>;
75
76    /// Close the connection.
77    async fn close(&mut self) -> Result<()>;
78}
79
80// ── Stdio Transport ──────────────────────────────────────────────────────
81
82/// Stdio-based transport (spawn local process).
83pub struct StdioTransport {
84    _child: Child,
85    stdin: tokio::process::ChildStdin,
86    stdout_lines: tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
87}
88
89impl StdioTransport {
90    pub fn new(config: &McpServerConfig) -> Result<Self> {
91        let mut child = Command::new(&config.command)
92            .args(&config.args)
93            .envs(&config.env)
94            .stdin(std::process::Stdio::piped())
95            .stdout(std::process::Stdio::piped())
96            .stderr(std::process::Stdio::inherit())
97            .kill_on_drop(true)
98            .spawn()
99            .with_context(|| format!("failed to spawn MCP server `{}`", config.name))?;
100
101        let stdin = child.stdin.take().ok_or_else(|| {
102            ::zeroclaw_log::record!(
103                ERROR,
104                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
105                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
106                    .with_attrs(::serde_json::json!({
107                        "mcp_server": &config.name,
108                        "missing": "stdin",
109                    })),
110                "mcp_transport: no stdin on spawned MCP server"
111            );
112            anyhow::Error::msg(format!("no stdin on MCP server `{}`", config.name))
113        })?;
114        let stdout = child.stdout.take().ok_or_else(|| {
115            ::zeroclaw_log::record!(
116                ERROR,
117                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
118                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
119                    .with_attrs(::serde_json::json!({
120                        "mcp_server": &config.name,
121                        "missing": "stdout",
122                    })),
123                "mcp_transport: no stdout on spawned MCP server"
124            );
125            anyhow::Error::msg(format!("no stdout on MCP server `{}`", config.name))
126        })?;
127        let stdout_lines = BufReader::new(stdout).lines();
128
129        Ok(Self {
130            _child: child,
131            stdin,
132            stdout_lines,
133        })
134    }
135
136    async fn send_raw(&mut self, line: &str) -> Result<()> {
137        self.stdin
138            .write_all(line.as_bytes())
139            .await
140            .context("failed to write to MCP server stdin")?;
141        self.stdin
142            .write_all(b"\n")
143            .await
144            .context("failed to write newline to MCP server stdin")?;
145        self.stdin.flush().await.context("failed to flush stdin")?;
146        Ok(())
147    }
148
149    async fn recv_raw(&mut self) -> Result<String> {
150        let line = self.stdout_lines.next_line().await?.ok_or_else(|| {
151            ::zeroclaw_log::record!(
152                ERROR,
153                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
154                    .with_outcome(::zeroclaw_log::EventOutcome::Failure),
155                "mcp_transport: MCP server closed stdout"
156            );
157            anyhow::Error::msg("MCP server closed stdout")
158        })?;
159        if line.len() > MAX_LINE_BYTES {
160            bail!("MCP response too large: {} bytes", line.len());
161        }
162        Ok(line)
163    }
164}
165
166#[async_trait::async_trait]
167impl McpTransportConn for StdioTransport {
168    async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
169        let line = serde_json::to_string(request)?;
170        self.send_raw(&line).await?;
171        if request.id.is_none() {
172            return Ok(JsonRpcResponse {
173                jsonrpc: crate::mcp_protocol::JSONRPC_VERSION.to_string(),
174                id: None,
175                result: None,
176                error: None,
177            });
178        }
179        let deadline = std::time::Instant::now() + Duration::from_secs(RECV_TIMEOUT_SECS);
180        loop {
181            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
182            if remaining.is_zero() {
183                bail!("timeout waiting for MCP response");
184            }
185            let resp_line = timeout(remaining, self.recv_raw())
186                .await
187                .context("timeout waiting for MCP response")??;
188            let resp: JsonRpcResponse = serde_json::from_str(&resp_line)
189                .with_context(|| format!("invalid JSON-RPC response: {}", resp_line))?;
190            if resp.id.is_none() {
191                // Server-sent notification (e.g. `notifications/initialized`) — skip and
192                // keep waiting for the actual response to our request.
193                ::zeroclaw_log::record!(
194                    DEBUG,
195                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
196                    "MCP stdio: skipping server notification while waiting for response"
197                );
198                continue;
199            }
200            return Ok(resp);
201        }
202    }
203
204    async fn close(&mut self) -> Result<()> {
205        let _ = self.stdin.shutdown().await;
206        Ok(())
207    }
208}
209
210// ── HTTP Transport ───────────────────────────────────────────────────────
211
212/// HTTP-based transport (POST requests).
213pub struct HttpTransport {
214    url: String,
215    /// Per-server tool-call timeout, from `McpServerConfig.tool_timeout_secs`.
216    /// Non-tool requests keep the legacy HTTP request timeout and short SSE
217    /// read timeout. Tool calls use the configured budget when present; when
218    /// absent, the client layer's outer tool-call timeout owns the budget.
219    tool_timeout_secs: Option<u64>,
220    client: reqwest::Client,
221    headers: std::collections::HashMap<String, String>,
222    session_id: Option<String>,
223}
224
225impl HttpTransport {
226    pub fn new(config: &McpServerConfig) -> Result<Self> {
227        let url = config
228            .url
229            .as_ref()
230            .ok_or_else(|| {
231                ::zeroclaw_log::record!(
232                    WARN,
233                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
234                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
235                        .with_attrs(::serde_json::json!({
236                            "mcp_server": &config.name,
237                            "transport": "http",
238                        })),
239                    "mcp_transport: HTTP transport requires URL"
240                );
241                anyhow::Error::msg("URL required for HTTP transport")
242            })?
243            .clone();
244
245        let client = reqwest::Client::builder()
246            .build()
247            .context("failed to build HTTP client")?;
248
249        Ok(Self {
250            url,
251            tool_timeout_secs: config.tool_timeout_secs,
252            client,
253            headers: config.headers.clone(),
254            session_id: None,
255        })
256    }
257
258    fn apply_session_header(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
259        if let Some(session_id) = self.session_id.as_deref() {
260            req.header(MCP_SESSION_ID_HEADER, session_id)
261        } else {
262            req
263        }
264    }
265
266    fn update_session_id_from_headers(&mut self, headers: &reqwest::header::HeaderMap) {
267        if let Some(session_id) = headers
268            .get(MCP_SESSION_ID_HEADER)
269            .and_then(|v| v.to_str().ok())
270            .map(str::trim)
271            .filter(|v| !v.is_empty())
272        {
273            self.session_id = Some(session_id.to_string());
274        }
275    }
276}
277
278#[async_trait::async_trait]
279impl McpTransportConn for HttpTransport {
280    async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
281        let body = serde_json::to_string(request)?;
282
283        let has_accept = self
284            .headers
285            .keys()
286            .any(|k| k.eq_ignore_ascii_case("Accept"));
287        let has_content_type = self
288            .headers
289            .keys()
290            .any(|k| k.eq_ignore_ascii_case("Content-Type"));
291
292        let mut req = apply_request_timeout(
293            self.client.post(&self.url).body(body),
294            http_request_timeout_secs(request, self.tool_timeout_secs),
295        );
296        if !has_content_type {
297            req = req.header("Content-Type", MCP_JSON_CONTENT_TYPE);
298        }
299        for (key, value) in &self.headers {
300            req = req.header(key, value);
301        }
302        req = self.apply_session_header(req);
303        if !has_accept {
304            req = req.header("Accept", MCP_STREAMABLE_ACCEPT);
305        }
306
307        let resp = req
308            .send()
309            .await
310            .context("HTTP request to MCP server failed")?;
311
312        if !resp.status().is_success() {
313            bail!("MCP server returned HTTP {}", resp.status());
314        }
315
316        self.update_session_id_from_headers(resp.headers());
317
318        if request.id.is_none() {
319            return Ok(JsonRpcResponse {
320                jsonrpc: crate::mcp_protocol::JSONRPC_VERSION.to_string(),
321                id: None,
322                result: None,
323                error: None,
324            });
325        }
326
327        let is_sse = resp
328            .headers()
329            .get(reqwest::header::CONTENT_TYPE)
330            .and_then(|v| v.to_str().ok())
331            .is_some_and(|v| v.to_ascii_lowercase().contains("text/event-stream"));
332        if is_sse {
333            let read_response = read_first_jsonrpc_from_sse_response(resp);
334            let maybe_resp = if let Some(sse_timeout) =
335                http_sse_read_timeout_secs(request, self.tool_timeout_secs)
336            {
337                timeout(Duration::from_secs(sse_timeout), read_response)
338                    .await
339                    .context("timeout waiting for MCP response from streamable HTTP SSE stream")??
340            } else {
341                read_response.await?
342            };
343            return maybe_resp.ok_or_else(|| {
344                ::zeroclaw_log::record!(
345                    ERROR,
346                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
347                        .with_outcome(::zeroclaw_log::EventOutcome::Failure),
348                    "mcp_transport: MCP server returned no response in SSE stream"
349                );
350                anyhow::Error::msg("MCP server returned no response in SSE stream")
351            });
352        }
353
354        let resp_text = resp.text().await.context("failed to read HTTP response")?;
355        parse_jsonrpc_response_text(&resp_text)
356    }
357
358    async fn close(&mut self) -> Result<()> {
359        Ok(())
360    }
361}
362
363// ── SSE Transport ─────────────────────────────────────────────────────────
364
365/// SSE-based transport (HTTP POST for requests, SSE for responses).
366#[derive(Copy, Clone, Debug, Eq, PartialEq)]
367enum SseStreamState {
368    Unknown,
369    Connected,
370    Unsupported,
371}
372
373pub struct SseTransport {
374    sse_url: String,
375    server_name: String,
376    tool_timeout_secs: Option<u64>,
377    client: reqwest::Client,
378    headers: std::collections::HashMap<String, String>,
379    stream_state: SseStreamState,
380    shared: std::sync::Arc<Mutex<SseSharedState>>,
381    notify: std::sync::Arc<Notify>,
382    shutdown_tx: Option<oneshot::Sender<()>>,
383    reader_task: Option<tokio::task::JoinHandle<()>>,
384}
385
386impl SseTransport {
387    pub fn new(config: &McpServerConfig) -> Result<Self> {
388        let sse_url = config
389            .url
390            .as_ref()
391            .ok_or_else(|| {
392                ::zeroclaw_log::record!(
393                    WARN,
394                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
395                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
396                        .with_attrs(::serde_json::json!({
397                            "mcp_server": &config.name,
398                            "transport": "sse",
399                        })),
400                    "mcp_transport: SSE transport requires URL"
401                );
402                anyhow::Error::msg("URL required for SSE transport")
403            })?
404            .clone();
405
406        let client = reqwest::Client::builder()
407            .build()
408            .context("failed to build HTTP client")?;
409
410        Ok(Self {
411            sse_url,
412            server_name: config.name.clone(),
413            tool_timeout_secs: config.tool_timeout_secs,
414            client,
415            headers: config.headers.clone(),
416            stream_state: SseStreamState::Unknown,
417            shared: std::sync::Arc::new(Mutex::new(SseSharedState::default())),
418            notify: std::sync::Arc::new(Notify::new()),
419            shutdown_tx: None,
420            reader_task: None,
421        })
422    }
423
424    async fn ensure_connected(&mut self) -> Result<()> {
425        if self.stream_state == SseStreamState::Unsupported {
426            return Ok(());
427        }
428        if let Some(task) = &self.reader_task
429            && !task.is_finished()
430        {
431            self.stream_state = SseStreamState::Connected;
432            return Ok(());
433        }
434
435        let has_accept = self
436            .headers
437            .keys()
438            .any(|k| k.eq_ignore_ascii_case("Accept"));
439
440        let mut req = self
441            .client
442            .get(&self.sse_url)
443            .header("Cache-Control", "no-cache");
444        for (key, value) in &self.headers {
445            req = req.header(key, value);
446        }
447        if !has_accept {
448            req = req.header("Accept", MCP_STREAMABLE_ACCEPT);
449        }
450
451        let resp = req.send().await.context("SSE GET to MCP server failed")?;
452        if resp.status() == reqwest::StatusCode::NOT_FOUND
453            || resp.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED
454        {
455            self.stream_state = SseStreamState::Unsupported;
456            return Ok(());
457        }
458        if !resp.status().is_success() {
459            let status = resp.status();
460            ::zeroclaw_log::record!(
461                ERROR,
462                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
463                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
464                    .with_attrs(::serde_json::json!({"status": status.as_u16()})),
465                "mcp_transport: MCP server returned non-success HTTP"
466            );
467            return Err(anyhow::Error::msg(format!(
468                "MCP server returned HTTP {}",
469                status
470            )));
471        }
472        let is_event_stream = resp
473            .headers()
474            .get(reqwest::header::CONTENT_TYPE)
475            .and_then(|v| v.to_str().ok())
476            .is_some_and(|v| v.to_ascii_lowercase().contains("text/event-stream"));
477        if !is_event_stream {
478            self.stream_state = SseStreamState::Unsupported;
479            return Ok(());
480        }
481
482        let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
483        self.shutdown_tx = Some(shutdown_tx);
484
485        let shared = self.shared.clone();
486        let notify = self.notify.clone();
487        let sse_url = self.sse_url.clone();
488        let server_name = self.server_name.clone();
489
490        self.reader_task = Some(tokio::spawn(async move {
491            let stream = resp
492                .bytes_stream()
493                .map(|item| item.map_err(std::io::Error::other));
494            let reader = tokio_util::io::StreamReader::new(stream);
495            let mut lines = BufReader::new(reader).lines();
496
497            let mut cur_event: Option<String> = None;
498            let mut cur_id: Option<String> = None;
499            let mut cur_data: Vec<String> = Vec::new();
500
501            loop {
502                tokio::select! {
503                    _ = &mut shutdown_rx => {
504                        break;
505                    }
506                    line = lines.next_line() => {
507                        let Ok(line_opt) = line else { break; };
508                        let Some(mut line) = line_opt else { break; };
509                        if line.ends_with('\r') {
510                            line.pop();
511                        }
512                        if line.is_empty() {
513                            if cur_event.is_none() && cur_id.is_none() && cur_data.is_empty() {
514                                continue;
515                            }
516                            let event = cur_event.take();
517                            let data = cur_data.join("\n");
518                            cur_data.clear();
519                            let id = cur_id.take();
520                            handle_sse_event(&server_name, &sse_url, &shared, &notify, event.as_deref(), id.as_deref(), data).await;
521                            continue;
522                        }
523
524                        if line.starts_with(':') {
525                            continue;
526                        }
527
528                        if let Some(rest) = line.strip_prefix("event:") {
529                            cur_event = Some(rest.trim().to_string());
530                        }
531                        if let Some(rest) = line.strip_prefix("data:") {
532                            let rest = rest.strip_prefix(' ').unwrap_or(rest);
533                            cur_data.push(rest.to_string());
534                        }
535                        if let Some(rest) = line.strip_prefix("id:") {
536                            cur_id = Some(rest.trim().to_string());
537                        }
538                    }
539                }
540            }
541
542            let pending = {
543                let mut guard = shared.lock().await;
544                std::mem::take(&mut guard.pending)
545            };
546            for (_, tx) in pending {
547                let _ = tx.send(JsonRpcResponse {
548                    jsonrpc: crate::mcp_protocol::JSONRPC_VERSION.to_string(),
549                    id: None,
550                    result: None,
551                    error: Some(JsonRpcError {
552                        code: INTERNAL_ERROR,
553                        message: "SSE connection closed".to_string(),
554                        data: None,
555                    }),
556                });
557            }
558        }));
559        self.stream_state = SseStreamState::Connected;
560
561        Ok(())
562    }
563
564    async fn get_message_url(&self) -> Result<(String, bool)> {
565        let guard = self.shared.lock().await;
566        if let Some(url) = &guard.message_url {
567            return Ok((url.clone(), guard.message_url_from_endpoint));
568        }
569        drop(guard);
570
571        let derived = derive_message_url(&self.sse_url, "messages")
572            .or_else(|| derive_message_url(&self.sse_url, "message"))
573            .ok_or_else(|| {
574                ::zeroclaw_log::record!(
575                    WARN,
576                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
577                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
578                        .with_attrs(::serde_json::json!({"sse_url": &self.sse_url})),
579                    "mcp_transport: invalid SSE URL"
580                );
581                anyhow::Error::msg("invalid SSE URL")
582            })?;
583        let mut guard = self.shared.lock().await;
584        if guard.message_url.is_none() {
585            guard.message_url = Some(derived.clone());
586            guard.message_url_from_endpoint = false;
587        }
588        Ok((derived, false))
589    }
590}
591
592#[derive(Default)]
593struct SseSharedState {
594    message_url: Option<String>,
595    message_url_from_endpoint: bool,
596    pending: std::collections::HashMap<u64, oneshot::Sender<JsonRpcResponse>>,
597}
598
599fn derive_message_url(sse_url: &str, message_path: &str) -> Option<String> {
600    let url = reqwest::Url::parse(sse_url).ok()?;
601    let mut segments: Vec<&str> = url.path_segments()?.collect();
602    if segments.is_empty() {
603        return None;
604    }
605    if segments.last().copied() == Some("sse") {
606        segments.pop();
607        segments.push(message_path);
608        let mut new_url = url.clone();
609        new_url.set_path(&format!("/{}", segments.join("/")));
610        return Some(new_url.to_string());
611    }
612    let mut new_url = url.clone();
613    let mut path = url.path().trim_end_matches('/').to_string();
614    path.push('/');
615    path.push_str(message_path);
616    new_url.set_path(&path);
617    Some(new_url.to_string())
618}
619
620async fn handle_sse_event(
621    server_name: &str,
622    sse_url: &str,
623    shared: &std::sync::Arc<Mutex<SseSharedState>>,
624    notify: &std::sync::Arc<Notify>,
625    event: Option<&str>,
626    _id: Option<&str>,
627    data: String,
628) {
629    let event = event.unwrap_or("message");
630    let trimmed = data.trim();
631    if trimmed.is_empty() {
632        return;
633    }
634
635    if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint") {
636        if let Some(url) = parse_endpoint_from_data(sse_url, trimmed) {
637            let mut guard = shared.lock().await;
638            guard.message_url = Some(url);
639            guard.message_url_from_endpoint = true;
640            drop(guard);
641            notify.notify_waiters();
642        }
643        return;
644    }
645
646    if !event.eq_ignore_ascii_case("message") {
647        return;
648    }
649
650    let Ok(value) = serde_json::from_str::<serde_json::Value>(trimmed) else {
651        return;
652    };
653
654    let Ok(resp) = serde_json::from_value::<JsonRpcResponse>(value.clone()) else {
655        let _ = serde_json::from_value::<JsonRpcRequest>(value);
656        return;
657    };
658
659    let Some(id_val) = resp.id.clone() else {
660        return;
661    };
662    let id = match id_val.as_u64() {
663        Some(v) => v,
664        None => return,
665    };
666
667    let tx = {
668        let mut guard = shared.lock().await;
669        guard.pending.remove(&id)
670    };
671    if let Some(tx) = tx {
672        let _ = tx.send(resp);
673    } else {
674        ::zeroclaw_log::record!(
675            DEBUG,
676            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
677            &format!(
678                "MCP SSE `{}` received response for unknown id {}",
679                server_name, id
680            )
681        );
682    }
683}
684
685fn parse_endpoint_from_data(sse_url: &str, data: &str) -> Option<String> {
686    if data.starts_with('{') {
687        let v: serde_json::Value = serde_json::from_str(data).ok()?;
688        let endpoint = v.get("endpoint")?.as_str()?;
689        return parse_endpoint_from_data(sse_url, endpoint);
690    }
691    if data.starts_with("http://") || data.starts_with("https://") {
692        return Some(data.to_string());
693    }
694    let base = reqwest::Url::parse(sse_url).ok()?;
695    base.join(data).ok().map(|u| u.to_string())
696}
697
698fn extract_json_from_sse_text(resp_text: &str) -> Cow<'_, str> {
699    let text = resp_text.trim_start_matches('\u{feff}');
700    let mut current_data_lines: Vec<&str> = Vec::new();
701    let mut last_event_data_lines: Vec<&str> = Vec::new();
702
703    for raw_line in text.lines() {
704        let line = raw_line.trim_end_matches('\r').trim_start();
705        if line.is_empty() {
706            if !current_data_lines.is_empty() {
707                last_event_data_lines = std::mem::take(&mut current_data_lines);
708            }
709            continue;
710        }
711
712        if line.starts_with(':') {
713            continue;
714        }
715
716        if let Some(rest) = line.strip_prefix("data:") {
717            let rest = rest.strip_prefix(' ').unwrap_or(rest);
718            current_data_lines.push(rest);
719        }
720    }
721
722    if !current_data_lines.is_empty() {
723        last_event_data_lines = current_data_lines;
724    }
725
726    if last_event_data_lines.is_empty() {
727        return Cow::Borrowed(text.trim());
728    }
729
730    if last_event_data_lines.len() == 1 {
731        return Cow::Borrowed(last_event_data_lines[0].trim());
732    }
733
734    let joined = last_event_data_lines.join("\n");
735    Cow::Owned(joined.trim().to_string())
736}
737
738fn parse_jsonrpc_response_text(resp_text: &str) -> Result<JsonRpcResponse> {
739    let trimmed = resp_text.trim();
740    if trimmed.is_empty() {
741        bail!("MCP server returned no response");
742    }
743
744    let json_text = if looks_like_sse_text(trimmed) {
745        extract_json_from_sse_text(trimmed)
746    } else {
747        Cow::Borrowed(trimmed)
748    };
749
750    let mcp_resp: JsonRpcResponse = serde_json::from_str(json_text.as_ref())
751        .with_context(|| format!("invalid JSON-RPC response: {}", resp_text))?;
752    Ok(mcp_resp)
753}
754
755fn looks_like_sse_text(text: &str) -> bool {
756    text.starts_with("data:")
757        || text.starts_with("event:")
758        || text.contains("\ndata:")
759        || text.contains("\nevent:")
760}
761
762async fn read_first_jsonrpc_from_sse_response(
763    resp: reqwest::Response,
764) -> Result<Option<JsonRpcResponse>> {
765    let stream = resp
766        .bytes_stream()
767        .map(|item| item.map_err(std::io::Error::other));
768    let reader = tokio_util::io::StreamReader::new(stream);
769    let mut lines = BufReader::new(reader).lines();
770
771    let mut cur_event: Option<String> = None;
772    let mut cur_data: Vec<String> = Vec::new();
773
774    while let Ok(line_opt) = lines.next_line().await {
775        let Some(mut line) = line_opt else { break };
776        if line.ends_with('\r') {
777            line.pop();
778        }
779        if line.is_empty() {
780            if cur_event.is_none() && cur_data.is_empty() {
781                continue;
782            }
783            let event = cur_event.take();
784            let data = cur_data.join("\n");
785            cur_data.clear();
786
787            let event = event.unwrap_or_else(|| "message".to_string());
788            if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint")
789            {
790                continue;
791            }
792            if !event.eq_ignore_ascii_case("message") {
793                continue;
794            }
795
796            let trimmed = data.trim();
797            if trimmed.is_empty() {
798                continue;
799            }
800            let json_str = extract_json_from_sse_text(trimmed);
801            if let Ok(resp) = serde_json::from_str::<JsonRpcResponse>(json_str.as_ref()) {
802                return Ok(Some(resp));
803            }
804            continue;
805        }
806
807        if line.starts_with(':') {
808            continue;
809        }
810        if let Some(rest) = line.strip_prefix("event:") {
811            cur_event = Some(rest.trim().to_string());
812        }
813        if let Some(rest) = line.strip_prefix("data:") {
814            let rest = rest.strip_prefix(' ').unwrap_or(rest);
815            cur_data.push(rest.to_string());
816        }
817    }
818
819    Ok(None)
820}
821
822#[async_trait::async_trait]
823impl McpTransportConn for SseTransport {
824    async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
825        self.ensure_connected().await?;
826
827        let id = request.id.as_ref().and_then(|v| v.as_u64());
828        let body = serde_json::to_string(request)?;
829
830        let (mut message_url, mut from_endpoint) = self.get_message_url().await?;
831        if self.stream_state == SseStreamState::Connected && !from_endpoint {
832            for _ in 0..3 {
833                {
834                    let guard = self.shared.lock().await;
835                    if guard.message_url_from_endpoint
836                        && let Some(url) = &guard.message_url
837                    {
838                        message_url = url.clone();
839                        from_endpoint = true;
840                        break;
841                    }
842                }
843                let _ = timeout(Duration::from_millis(300), self.notify.notified()).await;
844            }
845        }
846        let primary_url = if from_endpoint {
847            message_url.clone()
848        } else {
849            self.sse_url.clone()
850        };
851        let secondary_url = if message_url == self.sse_url {
852            None
853        } else if primary_url == message_url {
854            Some(self.sse_url.clone())
855        } else {
856            Some(message_url.clone())
857        };
858        let has_secondary = secondary_url.is_some();
859
860        let mut rx = None;
861        if let Some(id) = id
862            && self.stream_state == SseStreamState::Connected
863        {
864            let (tx, ch) = oneshot::channel();
865            {
866                let mut guard = self.shared.lock().await;
867                guard.pending.insert(id, tx);
868            }
869            rx = Some((id, ch));
870        }
871
872        let mut got_direct = None;
873        let mut last_status = None;
874
875        for (i, url) in std::iter::once(primary_url)
876            .chain(secondary_url)
877            .enumerate()
878        {
879            let has_accept = self
880                .headers
881                .keys()
882                .any(|k| k.eq_ignore_ascii_case("Accept"));
883            let has_content_type = self
884                .headers
885                .keys()
886                .any(|k| k.eq_ignore_ascii_case("Content-Type"));
887            let mut req = apply_request_timeout(
888                self.client.post(&url).body(body.clone()),
889                http_request_timeout_secs(request, self.tool_timeout_secs),
890            );
891            if !has_content_type {
892                req = req.header("Content-Type", MCP_JSON_CONTENT_TYPE);
893            }
894            for (key, value) in &self.headers {
895                req = req.header(key, value);
896            }
897            if !has_accept {
898                req = req.header("Accept", MCP_STREAMABLE_ACCEPT);
899            }
900
901            let resp = req.send().await.context("SSE POST to MCP server failed")?;
902            let status = resp.status();
903            last_status = Some(status);
904
905            if (status == reqwest::StatusCode::NOT_FOUND
906                || status == reqwest::StatusCode::METHOD_NOT_ALLOWED)
907                && i == 0
908            {
909                continue;
910            }
911
912            if !status.is_success() {
913                break;
914            }
915
916            if request.id.is_none() {
917                got_direct = Some(JsonRpcResponse {
918                    jsonrpc: crate::mcp_protocol::JSONRPC_VERSION.to_string(),
919                    id: None,
920                    result: None,
921                    error: None,
922                });
923                break;
924            }
925
926            let is_sse = resp
927                .headers()
928                .get(reqwest::header::CONTENT_TYPE)
929                .and_then(|v| v.to_str().ok())
930                .is_some_and(|v| v.to_ascii_lowercase().contains("text/event-stream"));
931
932            if is_sse {
933                if i == 0 && has_secondary {
934                    match timeout(
935                        Duration::from_secs(3),
936                        read_first_jsonrpc_from_sse_response(resp),
937                    )
938                    .await
939                    {
940                        Ok(res) => {
941                            if let Some(resp) = res? {
942                                got_direct = Some(resp);
943                            }
944                            break;
945                        }
946                        Err(_) => continue,
947                    }
948                }
949                if let Some(resp) = read_first_jsonrpc_from_sse_response(resp).await? {
950                    got_direct = Some(resp);
951                }
952                break;
953            }
954
955            let text = if i == 0 && has_secondary {
956                match timeout(Duration::from_secs(3), resp.text()).await {
957                    Ok(Ok(t)) => t,
958                    Ok(Err(_)) => String::new(),
959                    Err(_) => continue,
960                }
961            } else {
962                resp.text().await.unwrap_or_default()
963            };
964            let trimmed = text.trim();
965            if !trimmed.is_empty() {
966                let json_str = if trimmed.contains("\ndata:") || trimmed.starts_with("data:") {
967                    extract_json_from_sse_text(trimmed)
968                } else {
969                    Cow::Borrowed(trimmed)
970                };
971                if let Ok(mcp_resp) = serde_json::from_str::<JsonRpcResponse>(json_str.as_ref()) {
972                    got_direct = Some(mcp_resp);
973                }
974            }
975            break;
976        }
977
978        if let Some((id, _)) = rx.as_ref() {
979            if got_direct.is_some() {
980                let mut guard = self.shared.lock().await;
981                guard.pending.remove(id);
982            } else if let Some(status) = last_status
983                && !status.is_success()
984            {
985                let mut guard = self.shared.lock().await;
986                guard.pending.remove(id);
987            }
988        }
989
990        if let Some(resp) = got_direct {
991            return Ok(resp);
992        }
993
994        if let Some(status) = last_status {
995            if !status.is_success() {
996                bail!("MCP server returned HTTP {}", status);
997            }
998        } else {
999            bail!("MCP request not sent");
1000        }
1001
1002        let Some((_id, rx)) = rx else {
1003            bail!("MCP server returned no response");
1004        };
1005
1006        rx.await.map_err(|_| {
1007            ::zeroclaw_log::record!(
1008                ERROR,
1009                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1010                    .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1011                "mcp_transport: SSE response channel closed"
1012            );
1013            anyhow::Error::msg("SSE response channel closed")
1014        })
1015    }
1016
1017    async fn close(&mut self) -> Result<()> {
1018        if let Some(tx) = self.shutdown_tx.take() {
1019            let _ = tx.send(());
1020        }
1021        if let Some(task) = self.reader_task.take() {
1022            task.abort();
1023        }
1024        Ok(())
1025    }
1026}
1027
1028// ── Factory ──────────────────────────────────────────────────────────────
1029
1030/// Create a transport based on config.
1031pub fn create_transport(config: &McpServerConfig) -> Result<Box<dyn McpTransportConn>> {
1032    match config.transport {
1033        McpTransport::Stdio => Ok(Box::new(StdioTransport::new(config)?)),
1034        McpTransport::Http => Ok(Box::new(HttpTransport::new(config)?)),
1035        McpTransport::Sse => Ok(Box::new(SseTransport::new(config)?)),
1036    }
1037}
1038
1039// ── Tests ─────────────────────────────────────────────────────────────────
1040
1041#[cfg(test)]
1042mod tests {
1043    use super::*;
1044
1045    #[test]
1046    fn test_transport_default_is_stdio() {
1047        let config = McpServerConfig::default();
1048        assert_eq!(config.transport, McpTransport::Stdio);
1049    }
1050
1051    #[test]
1052    fn test_http_transport_requires_url() {
1053        let config = McpServerConfig {
1054            name: "test".into(),
1055            transport: McpTransport::Http,
1056            ..Default::default()
1057        };
1058        assert!(HttpTransport::new(&config).is_err());
1059    }
1060
1061    #[test]
1062    fn test_sse_transport_requires_url() {
1063        let config = McpServerConfig {
1064            name: "test".into(),
1065            transport: McpTransport::Sse,
1066            ..Default::default()
1067        };
1068        assert!(SseTransport::new(&config).is_err());
1069    }
1070
1071    #[test]
1072    fn http_request_timeout_defaults_non_tool_requests_to_legacy_value() {
1073        let request = JsonRpcRequest::new(1, "initialize", serde_json::json!({}));
1074        assert_eq!(
1075            http_request_timeout_secs(&request, None),
1076            Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS)
1077        );
1078    }
1079
1080    #[test]
1081    fn http_request_timeout_does_not_shorten_non_tool_requests_from_tool_config() {
1082        let request = JsonRpcRequest::new(1, "tools/list", serde_json::json!({}));
1083        assert_eq!(
1084            http_request_timeout_secs(&request, Some(5)),
1085            Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS)
1086        );
1087    }
1088
1089    #[test]
1090    fn http_request_timeout_honors_configured_tool_call_timeout_above_legacy_value() {
1091        let request = JsonRpcRequest::new(1, TOOLS_CALL_METHOD, serde_json::json!({}));
1092        assert_eq!(
1093            http_request_timeout_secs(&request, Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)),
1094            Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)
1095        );
1096    }
1097
1098    #[test]
1099    fn http_request_timeout_leaves_default_tool_call_budget_to_client_wrapper() {
1100        let request = JsonRpcRequest::new(1, TOOLS_CALL_METHOD, serde_json::json!({}));
1101        assert_eq!(http_request_timeout_secs(&request, None), None);
1102    }
1103
1104    #[test]
1105    fn http_sse_read_timeout_defaults_non_tool_requests_to_recv_timeout() {
1106        let request = JsonRpcRequest::new(1, "initialize", serde_json::json!({}));
1107        assert_eq!(
1108            http_sse_read_timeout_secs(&request, None),
1109            Some(RECV_TIMEOUT_SECS)
1110        );
1111    }
1112
1113    #[test]
1114    fn http_sse_read_timeout_honors_configured_tool_call_timeout() {
1115        let request = JsonRpcRequest::new(1, TOOLS_CALL_METHOD, serde_json::json!({}));
1116        assert_eq!(
1117            http_sse_read_timeout_secs(&request, Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)),
1118            Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)
1119        );
1120    }
1121
1122    #[test]
1123    fn http_sse_read_timeout_leaves_default_tool_call_budget_to_client_wrapper() {
1124        let request = JsonRpcRequest::new(1, TOOLS_CALL_METHOD, serde_json::json!({}));
1125        assert_eq!(http_sse_read_timeout_secs(&request, None), None);
1126    }
1127
1128    #[test]
1129    fn http_transport_stores_configured_tool_timeout() {
1130        let config = McpServerConfig {
1131            name: "test-http".into(),
1132            transport: McpTransport::Http,
1133            url: Some("http://localhost/mcp".into()),
1134            tool_timeout_secs: Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60),
1135            ..Default::default()
1136        };
1137        let transport = HttpTransport::new(&config).expect("build transport");
1138        assert_eq!(
1139            transport.tool_timeout_secs,
1140            Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)
1141        );
1142    }
1143
1144    #[test]
1145    fn sse_transport_stores_configured_tool_timeout() {
1146        let config = McpServerConfig {
1147            name: "test-sse".into(),
1148            transport: McpTransport::Sse,
1149            url: Some("http://localhost/sse".into()),
1150            tool_timeout_secs: Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60),
1151            ..Default::default()
1152        };
1153        let transport = SseTransport::new(&config).expect("build transport");
1154        assert_eq!(
1155            transport.tool_timeout_secs,
1156            Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)
1157        );
1158    }
1159
1160    #[test]
1161    fn test_extract_json_from_sse_data_no_space() {
1162        let input = "data:{\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1163        let extracted = extract_json_from_sse_text(input);
1164        let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1165    }
1166
1167    #[test]
1168    fn test_extract_json_from_sse_with_event_and_id() {
1169        let input = "id: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1170        let extracted = extract_json_from_sse_text(input);
1171        let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1172    }
1173
1174    #[test]
1175    fn test_extract_json_from_sse_multiline_data() {
1176        let input = "event: message\ndata: {\ndata:   \"jsonrpc\": \"2.0\",\ndata:   \"result\": {}\ndata: }\n\n";
1177        let extracted = extract_json_from_sse_text(input);
1178        let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1179    }
1180
1181    #[test]
1182    fn test_extract_json_from_sse_skips_bom_and_leading_whitespace() {
1183        let input = "\u{feff}\n\n  data: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1184        let extracted = extract_json_from_sse_text(input);
1185        let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1186    }
1187
1188    #[test]
1189    fn test_extract_json_from_sse_uses_last_event_with_data() {
1190        let input =
1191            ": keep-alive\n\nid: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1192        let extracted = extract_json_from_sse_text(input);
1193        let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1194    }
1195
1196    #[test]
1197    fn test_parse_jsonrpc_response_text_handles_plain_json() {
1198        let parsed = parse_jsonrpc_response_text("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{}}")
1199            .expect("plain JSON response should parse");
1200        assert_eq!(parsed.id, Some(serde_json::json!(1)));
1201        assert!(parsed.error.is_none());
1202    }
1203
1204    #[test]
1205    fn test_parse_jsonrpc_response_text_handles_sse_framed_json() {
1206        let sse =
1207            "event: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":2,\"result\":{\"ok\":true}}\n\n";
1208        let parsed =
1209            parse_jsonrpc_response_text(sse).expect("SSE-framed JSON response should parse");
1210        assert_eq!(parsed.id, Some(serde_json::json!(2)));
1211        assert_eq!(
1212            parsed
1213                .result
1214                .as_ref()
1215                .and_then(|v| v.get("ok"))
1216                .and_then(|v| v.as_bool()),
1217            Some(true)
1218        );
1219    }
1220
1221    #[test]
1222    fn test_parse_jsonrpc_response_text_rejects_empty_payload() {
1223        assert!(parse_jsonrpc_response_text(" \n\t ").is_err());
1224    }
1225
1226    #[test]
1227    fn http_transport_updates_session_id_from_response_headers() {
1228        let config = McpServerConfig {
1229            name: "test-http".into(),
1230            transport: McpTransport::Http,
1231            url: Some("http://localhost/mcp".into()),
1232            ..Default::default()
1233        };
1234        let mut transport = HttpTransport::new(&config).expect("build transport");
1235
1236        let mut headers = reqwest::header::HeaderMap::new();
1237        headers.insert(
1238            reqwest::header::HeaderName::from_static("mcp-session-id"),
1239            reqwest::header::HeaderValue::from_static("session-abc"),
1240        );
1241        transport.update_session_id_from_headers(&headers);
1242        assert_eq!(transport.session_id.as_deref(), Some("session-abc"));
1243    }
1244
1245    #[test]
1246    fn http_transport_injects_session_id_header_when_available() {
1247        let config = McpServerConfig {
1248            name: "test-http".into(),
1249            transport: McpTransport::Http,
1250            url: Some("http://localhost/mcp".into()),
1251            ..Default::default()
1252        };
1253        let mut transport = HttpTransport::new(&config).expect("build transport");
1254        transport.session_id = Some("session-xyz".to_string());
1255
1256        let req = transport
1257            .apply_session_header(reqwest::Client::new().post("http://localhost/mcp"))
1258            .build()
1259            .expect("build request");
1260        assert_eq!(
1261            req.headers()
1262                .get(MCP_SESSION_ID_HEADER)
1263                .and_then(|v| v.to_str().ok()),
1264            Some("session-xyz")
1265        );
1266    }
1267
1268    // ── derive_message_url tests ──────────────────────────────────────────────
1269
1270    #[test]
1271    fn derive_message_url_replaces_sse_segment_with_messages() {
1272        let url = derive_message_url("http://localhost:3000/mcp/sse", "messages");
1273        assert_eq!(url, Some("http://localhost:3000/mcp/messages".to_string()));
1274    }
1275
1276    #[test]
1277    fn derive_message_url_appends_when_no_sse_segment() {
1278        let url = derive_message_url("http://localhost:3000/mcp", "messages");
1279        assert_eq!(url, Some("http://localhost:3000/mcp/messages".to_string()));
1280    }
1281
1282    #[test]
1283    fn derive_message_url_returns_none_for_invalid_url() {
1284        let url = derive_message_url("not-a-url", "messages");
1285        assert!(url.is_none());
1286    }
1287
1288    #[test]
1289    fn derive_message_url_message_path_variant() {
1290        let url = derive_message_url("http://localhost:3000/mcp/sse", "message");
1291        assert_eq!(url, Some("http://localhost:3000/mcp/message".to_string()));
1292    }
1293
1294    // ── parse_endpoint_from_data tests ───────────────────────────────────────
1295
1296    #[test]
1297    fn parse_endpoint_absolute_http_url_returned_as_is() {
1298        let result = parse_endpoint_from_data("http://base/sse", "http://other/messages");
1299        assert_eq!(result, Some("http://other/messages".to_string()));
1300    }
1301
1302    #[test]
1303    fn parse_endpoint_absolute_https_url_returned_as_is() {
1304        let result = parse_endpoint_from_data("https://base/sse", "https://other/messages");
1305        assert_eq!(result, Some("https://other/messages".to_string()));
1306    }
1307
1308    #[test]
1309    fn parse_endpoint_relative_path_resolved_against_base() {
1310        let result = parse_endpoint_from_data("http://localhost:3000/sse", "/messages");
1311        assert_eq!(result, Some("http://localhost:3000/messages".to_string()));
1312    }
1313
1314    #[test]
1315    fn parse_endpoint_json_object_with_endpoint_key() {
1316        let json_data = r#"{"endpoint":"/messages"}"#;
1317        let result = parse_endpoint_from_data("http://localhost:3000/sse", json_data);
1318        assert_eq!(result, Some("http://localhost:3000/messages".to_string()));
1319    }
1320
1321    // ── looks_like_sse_text tests ─────────────────────────────────────────────
1322
1323    #[test]
1324    fn looks_like_sse_text_detects_data_prefix() {
1325        assert!(looks_like_sse_text("data:{\"jsonrpc\":\"2.0\"}"));
1326    }
1327
1328    #[test]
1329    fn looks_like_sse_text_detects_event_prefix() {
1330        assert!(looks_like_sse_text("event: message\ndata: {}"));
1331    }
1332
1333    #[test]
1334    fn looks_like_sse_text_detects_embedded_data_line() {
1335        assert!(looks_like_sse_text("id: 1\ndata:{\"x\":1}"));
1336    }
1337
1338    #[test]
1339    fn looks_like_sse_text_plain_json_is_not_sse() {
1340        assert!(!looks_like_sse_text(
1341            "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{}}"
1342        ));
1343    }
1344
1345    // ── extract_json_from_sse_text edge cases ─────────────────────────────────
1346
1347    #[test]
1348    fn extract_json_skips_comment_lines() {
1349        let input = ": keep-alive\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1350        let extracted = extract_json_from_sse_text(input);
1351        let v: serde_json::Value = serde_json::from_str(extracted.as_ref()).unwrap();
1352        assert_eq!(v["jsonrpc"], "2.0");
1353    }
1354
1355    #[test]
1356    fn extract_json_empty_input_returns_empty_trimmed() {
1357        let result = extract_json_from_sse_text("   ");
1358        assert!(result.as_ref().trim().is_empty());
1359    }
1360
1361    #[test]
1362    fn extract_json_plain_json_returned_unchanged() {
1363        let input = "{\"jsonrpc\":\"2.0\",\"result\":{}}";
1364        let extracted = extract_json_from_sse_text(input);
1365        // No SSE framing, extracted as-is (trimmed)
1366        assert_eq!(extracted.as_ref(), input);
1367    }
1368
1369    // ── parse_jsonrpc_response_text edge cases ────────────────────────────────
1370
1371    #[test]
1372    fn parse_jsonrpc_response_rejects_whitespace_only() {
1373        assert!(parse_jsonrpc_response_text("   \n\t  ").is_err());
1374    }
1375
1376    #[test]
1377    fn parse_jsonrpc_response_with_error_result() {
1378        let json = r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32601,"message":"not found"}}"#;
1379        let resp = parse_jsonrpc_response_text(json).unwrap();
1380        assert!(resp.error.is_some());
1381        assert_eq!(resp.error.unwrap().code, -32601);
1382    }
1383
1384    // ── create_transport factory ──────────────────────────────────────────────
1385
1386    #[test]
1387    fn create_transport_stdio_fails_without_valid_command() {
1388        // Spawning a non-existent binary should fail
1389        let config = McpServerConfig {
1390            name: "test-stdio".into(),
1391            transport: McpTransport::Stdio,
1392            command: "/usr/bin/zeroclaw_nonexistent_binary_abc123".into(),
1393            ..Default::default()
1394        };
1395        let result = create_transport(&config);
1396        assert!(result.is_err());
1397    }
1398
1399    #[test]
1400    fn create_transport_http_without_url_fails() {
1401        let config = McpServerConfig {
1402            name: "test-http".into(),
1403            transport: McpTransport::Http,
1404            ..Default::default()
1405        };
1406        assert!(create_transport(&config).is_err());
1407    }
1408
1409    #[test]
1410    fn create_transport_sse_without_url_fails() {
1411        let config = McpServerConfig {
1412            name: "test-sse".into(),
1413            transport: McpTransport::Sse,
1414            ..Default::default()
1415        };
1416        assert!(create_transport(&config).is_err());
1417    }
1418
1419    #[test]
1420    fn create_transport_http_with_url_succeeds() {
1421        let config = McpServerConfig {
1422            name: "test-http".into(),
1423            transport: McpTransport::Http,
1424            url: Some("http://localhost:9999/mcp".into()),
1425            ..Default::default()
1426        };
1427        // Build should succeed even if server isn't running
1428        assert!(create_transport(&config).is_ok());
1429    }
1430
1431    #[test]
1432    fn create_transport_sse_with_url_succeeds() {
1433        let config = McpServerConfig {
1434            name: "test-sse".into(),
1435            transport: McpTransport::Sse,
1436            url: Some("http://localhost:9999/sse".into()),
1437            ..Default::default()
1438        };
1439        assert!(create_transport(&config).is_ok());
1440    }
1441
1442    // ── HTTP session id whitespace handling ───────────────────────────────────
1443
1444    #[test]
1445    fn http_transport_ignores_empty_session_id_header() {
1446        let config = McpServerConfig {
1447            name: "test-http".into(),
1448            transport: McpTransport::Http,
1449            url: Some("http://localhost/mcp".into()),
1450            ..Default::default()
1451        };
1452        let mut transport = HttpTransport::new(&config).expect("build transport");
1453        let mut headers = reqwest::header::HeaderMap::new();
1454        headers.insert(
1455            reqwest::header::HeaderName::from_static("mcp-session-id"),
1456            reqwest::header::HeaderValue::from_static("   "),
1457        );
1458        transport.update_session_id_from_headers(&headers);
1459        // Whitespace-only session id should not be stored
1460        assert!(transport.session_id.is_none());
1461    }
1462
1463    #[test]
1464    fn http_transport_no_session_header_leaves_none() {
1465        let config = McpServerConfig {
1466            name: "test-http".into(),
1467            transport: McpTransport::Http,
1468            url: Some("http://localhost/mcp".into()),
1469            ..Default::default()
1470        };
1471        let transport = HttpTransport::new(&config).expect("build transport");
1472        assert!(transport.session_id.is_none());
1473    }
1474
1475    #[test]
1476    fn http_transport_apply_session_header_noop_when_no_session() {
1477        let config = McpServerConfig {
1478            name: "test-http".into(),
1479            transport: McpTransport::Http,
1480            url: Some("http://localhost/mcp".into()),
1481            ..Default::default()
1482        };
1483        let transport = HttpTransport::new(&config).expect("build transport");
1484        let req = transport
1485            .apply_session_header(reqwest::Client::new().post("http://localhost/mcp"))
1486            .build()
1487            .expect("build request");
1488        assert!(req.headers().get(MCP_SESSION_ID_HEADER).is_none());
1489    }
1490}