1use std::borrow::Cow;
4
5use anyhow::{Context, Result, bail};
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::process::{Child, Command};
8use tokio::sync::{Mutex, Notify, oneshot};
9use tokio::time::{Duration, timeout};
10use tokio_stream::StreamExt;
11
12use crate::mcp_protocol::{INTERNAL_ERROR, JsonRpcError, JsonRpcRequest, JsonRpcResponse};
13use zeroclaw_config::schema::{McpServerConfig, McpTransport};
14
15const MAX_LINE_BYTES: usize = 4 * 1024 * 1024; const RECV_TIMEOUT_SECS: u64 = 30;
20
21const DEFAULT_HTTP_REQUEST_TIMEOUT_SECS: u64 = 120;
23
24const TOOLS_CALL_METHOD: &str = "tools/call";
26
27const MCP_STREAMABLE_ACCEPT: &str = "application/json, text/event-stream";
29
30const MCP_JSON_CONTENT_TYPE: &str = "application/json";
32const MCP_SESSION_ID_HEADER: &str = "Mcp-Session-Id";
34
35fn http_request_timeout_secs(
36 request: &JsonRpcRequest,
37 tool_timeout_secs: Option<u64>,
38) -> Option<u64> {
39 if request.method == TOOLS_CALL_METHOD {
40 tool_timeout_secs
41 } else {
42 Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS)
43 }
44}
45
46fn http_sse_read_timeout_secs(
47 request: &JsonRpcRequest,
48 tool_timeout_secs: Option<u64>,
49) -> Option<u64> {
50 if request.method == TOOLS_CALL_METHOD {
51 tool_timeout_secs
52 } else {
53 Some(RECV_TIMEOUT_SECS)
54 }
55}
56
57fn apply_request_timeout(
58 req: reqwest::RequestBuilder,
59 timeout_secs: Option<u64>,
60) -> reqwest::RequestBuilder {
61 if let Some(timeout_secs) = timeout_secs {
62 req.timeout(Duration::from_secs(timeout_secs))
63 } else {
64 req
65 }
66}
67
68#[async_trait::async_trait]
72pub trait McpTransportConn: Send + Sync {
73 async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse>;
75
76 async fn close(&mut self) -> Result<()>;
78}
79
80pub struct StdioTransport {
84 _child: Child,
85 stdin: tokio::process::ChildStdin,
86 stdout_lines: tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
87}
88
89impl StdioTransport {
90 pub fn new(config: &McpServerConfig) -> Result<Self> {
91 let mut child = Command::new(&config.command)
92 .args(&config.args)
93 .envs(&config.env)
94 .stdin(std::process::Stdio::piped())
95 .stdout(std::process::Stdio::piped())
96 .stderr(std::process::Stdio::inherit())
97 .kill_on_drop(true)
98 .spawn()
99 .with_context(|| format!("failed to spawn MCP server `{}`", config.name))?;
100
101 let stdin = child.stdin.take().ok_or_else(|| {
102 ::zeroclaw_log::record!(
103 ERROR,
104 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
105 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
106 .with_attrs(::serde_json::json!({
107 "mcp_server": &config.name,
108 "missing": "stdin",
109 })),
110 "mcp_transport: no stdin on spawned MCP server"
111 );
112 anyhow::Error::msg(format!("no stdin on MCP server `{}`", config.name))
113 })?;
114 let stdout = child.stdout.take().ok_or_else(|| {
115 ::zeroclaw_log::record!(
116 ERROR,
117 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
118 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
119 .with_attrs(::serde_json::json!({
120 "mcp_server": &config.name,
121 "missing": "stdout",
122 })),
123 "mcp_transport: no stdout on spawned MCP server"
124 );
125 anyhow::Error::msg(format!("no stdout on MCP server `{}`", config.name))
126 })?;
127 let stdout_lines = BufReader::new(stdout).lines();
128
129 Ok(Self {
130 _child: child,
131 stdin,
132 stdout_lines,
133 })
134 }
135
136 async fn send_raw(&mut self, line: &str) -> Result<()> {
137 self.stdin
138 .write_all(line.as_bytes())
139 .await
140 .context("failed to write to MCP server stdin")?;
141 self.stdin
142 .write_all(b"\n")
143 .await
144 .context("failed to write newline to MCP server stdin")?;
145 self.stdin.flush().await.context("failed to flush stdin")?;
146 Ok(())
147 }
148
149 async fn recv_raw(&mut self) -> Result<String> {
150 let line = self.stdout_lines.next_line().await?.ok_or_else(|| {
151 ::zeroclaw_log::record!(
152 ERROR,
153 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
154 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
155 "mcp_transport: MCP server closed stdout"
156 );
157 anyhow::Error::msg("MCP server closed stdout")
158 })?;
159 if line.len() > MAX_LINE_BYTES {
160 bail!("MCP response too large: {} bytes", line.len());
161 }
162 Ok(line)
163 }
164}
165
166#[async_trait::async_trait]
167impl McpTransportConn for StdioTransport {
168 async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
169 let line = serde_json::to_string(request)?;
170 self.send_raw(&line).await?;
171 if request.id.is_none() {
172 return Ok(JsonRpcResponse {
173 jsonrpc: crate::mcp_protocol::JSONRPC_VERSION.to_string(),
174 id: None,
175 result: None,
176 error: None,
177 });
178 }
179 let deadline = std::time::Instant::now() + Duration::from_secs(RECV_TIMEOUT_SECS);
180 loop {
181 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
182 if remaining.is_zero() {
183 bail!("timeout waiting for MCP response");
184 }
185 let resp_line = timeout(remaining, self.recv_raw())
186 .await
187 .context("timeout waiting for MCP response")??;
188 let resp: JsonRpcResponse = serde_json::from_str(&resp_line)
189 .with_context(|| format!("invalid JSON-RPC response: {}", resp_line))?;
190 if resp.id.is_none() {
191 ::zeroclaw_log::record!(
194 DEBUG,
195 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
196 "MCP stdio: skipping server notification while waiting for response"
197 );
198 continue;
199 }
200 return Ok(resp);
201 }
202 }
203
204 async fn close(&mut self) -> Result<()> {
205 let _ = self.stdin.shutdown().await;
206 Ok(())
207 }
208}
209
210pub struct HttpTransport {
214 url: String,
215 tool_timeout_secs: Option<u64>,
220 client: reqwest::Client,
221 headers: std::collections::HashMap<String, String>,
222 session_id: Option<String>,
223}
224
225impl HttpTransport {
226 pub fn new(config: &McpServerConfig) -> Result<Self> {
227 let url = config
228 .url
229 .as_ref()
230 .ok_or_else(|| {
231 ::zeroclaw_log::record!(
232 WARN,
233 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
234 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
235 .with_attrs(::serde_json::json!({
236 "mcp_server": &config.name,
237 "transport": "http",
238 })),
239 "mcp_transport: HTTP transport requires URL"
240 );
241 anyhow::Error::msg("URL required for HTTP transport")
242 })?
243 .clone();
244
245 let client = reqwest::Client::builder()
246 .build()
247 .context("failed to build HTTP client")?;
248
249 Ok(Self {
250 url,
251 tool_timeout_secs: config.tool_timeout_secs,
252 client,
253 headers: config.headers.clone(),
254 session_id: None,
255 })
256 }
257
258 fn apply_session_header(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
259 if let Some(session_id) = self.session_id.as_deref() {
260 req.header(MCP_SESSION_ID_HEADER, session_id)
261 } else {
262 req
263 }
264 }
265
266 fn update_session_id_from_headers(&mut self, headers: &reqwest::header::HeaderMap) {
267 if let Some(session_id) = headers
268 .get(MCP_SESSION_ID_HEADER)
269 .and_then(|v| v.to_str().ok())
270 .map(str::trim)
271 .filter(|v| !v.is_empty())
272 {
273 self.session_id = Some(session_id.to_string());
274 }
275 }
276}
277
278#[async_trait::async_trait]
279impl McpTransportConn for HttpTransport {
280 async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
281 let body = serde_json::to_string(request)?;
282
283 let has_accept = self
284 .headers
285 .keys()
286 .any(|k| k.eq_ignore_ascii_case("Accept"));
287 let has_content_type = self
288 .headers
289 .keys()
290 .any(|k| k.eq_ignore_ascii_case("Content-Type"));
291
292 let mut req = apply_request_timeout(
293 self.client.post(&self.url).body(body),
294 http_request_timeout_secs(request, self.tool_timeout_secs),
295 );
296 if !has_content_type {
297 req = req.header("Content-Type", MCP_JSON_CONTENT_TYPE);
298 }
299 for (key, value) in &self.headers {
300 req = req.header(key, value);
301 }
302 req = self.apply_session_header(req);
303 if !has_accept {
304 req = req.header("Accept", MCP_STREAMABLE_ACCEPT);
305 }
306
307 let resp = req
308 .send()
309 .await
310 .context("HTTP request to MCP server failed")?;
311
312 if !resp.status().is_success() {
313 bail!("MCP server returned HTTP {}", resp.status());
314 }
315
316 self.update_session_id_from_headers(resp.headers());
317
318 if request.id.is_none() {
319 return Ok(JsonRpcResponse {
320 jsonrpc: crate::mcp_protocol::JSONRPC_VERSION.to_string(),
321 id: None,
322 result: None,
323 error: None,
324 });
325 }
326
327 let is_sse = resp
328 .headers()
329 .get(reqwest::header::CONTENT_TYPE)
330 .and_then(|v| v.to_str().ok())
331 .is_some_and(|v| v.to_ascii_lowercase().contains("text/event-stream"));
332 if is_sse {
333 let read_response = read_first_jsonrpc_from_sse_response(resp);
334 let maybe_resp = if let Some(sse_timeout) =
335 http_sse_read_timeout_secs(request, self.tool_timeout_secs)
336 {
337 timeout(Duration::from_secs(sse_timeout), read_response)
338 .await
339 .context("timeout waiting for MCP response from streamable HTTP SSE stream")??
340 } else {
341 read_response.await?
342 };
343 return maybe_resp.ok_or_else(|| {
344 ::zeroclaw_log::record!(
345 ERROR,
346 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
347 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
348 "mcp_transport: MCP server returned no response in SSE stream"
349 );
350 anyhow::Error::msg("MCP server returned no response in SSE stream")
351 });
352 }
353
354 let resp_text = resp.text().await.context("failed to read HTTP response")?;
355 parse_jsonrpc_response_text(&resp_text)
356 }
357
358 async fn close(&mut self) -> Result<()> {
359 Ok(())
360 }
361}
362
363#[derive(Copy, Clone, Debug, Eq, PartialEq)]
367enum SseStreamState {
368 Unknown,
369 Connected,
370 Unsupported,
371}
372
373pub struct SseTransport {
374 sse_url: String,
375 server_name: String,
376 tool_timeout_secs: Option<u64>,
377 client: reqwest::Client,
378 headers: std::collections::HashMap<String, String>,
379 stream_state: SseStreamState,
380 shared: std::sync::Arc<Mutex<SseSharedState>>,
381 notify: std::sync::Arc<Notify>,
382 shutdown_tx: Option<oneshot::Sender<()>>,
383 reader_task: Option<tokio::task::JoinHandle<()>>,
384}
385
386impl SseTransport {
387 pub fn new(config: &McpServerConfig) -> Result<Self> {
388 let sse_url = config
389 .url
390 .as_ref()
391 .ok_or_else(|| {
392 ::zeroclaw_log::record!(
393 WARN,
394 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
395 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
396 .with_attrs(::serde_json::json!({
397 "mcp_server": &config.name,
398 "transport": "sse",
399 })),
400 "mcp_transport: SSE transport requires URL"
401 );
402 anyhow::Error::msg("URL required for SSE transport")
403 })?
404 .clone();
405
406 let client = reqwest::Client::builder()
407 .build()
408 .context("failed to build HTTP client")?;
409
410 Ok(Self {
411 sse_url,
412 server_name: config.name.clone(),
413 tool_timeout_secs: config.tool_timeout_secs,
414 client,
415 headers: config.headers.clone(),
416 stream_state: SseStreamState::Unknown,
417 shared: std::sync::Arc::new(Mutex::new(SseSharedState::default())),
418 notify: std::sync::Arc::new(Notify::new()),
419 shutdown_tx: None,
420 reader_task: None,
421 })
422 }
423
424 async fn ensure_connected(&mut self) -> Result<()> {
425 if self.stream_state == SseStreamState::Unsupported {
426 return Ok(());
427 }
428 if let Some(task) = &self.reader_task
429 && !task.is_finished()
430 {
431 self.stream_state = SseStreamState::Connected;
432 return Ok(());
433 }
434
435 let has_accept = self
436 .headers
437 .keys()
438 .any(|k| k.eq_ignore_ascii_case("Accept"));
439
440 let mut req = self
441 .client
442 .get(&self.sse_url)
443 .header("Cache-Control", "no-cache");
444 for (key, value) in &self.headers {
445 req = req.header(key, value);
446 }
447 if !has_accept {
448 req = req.header("Accept", MCP_STREAMABLE_ACCEPT);
449 }
450
451 let resp = req.send().await.context("SSE GET to MCP server failed")?;
452 if resp.status() == reqwest::StatusCode::NOT_FOUND
453 || resp.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED
454 {
455 self.stream_state = SseStreamState::Unsupported;
456 return Ok(());
457 }
458 if !resp.status().is_success() {
459 let status = resp.status();
460 ::zeroclaw_log::record!(
461 ERROR,
462 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
463 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
464 .with_attrs(::serde_json::json!({"status": status.as_u16()})),
465 "mcp_transport: MCP server returned non-success HTTP"
466 );
467 return Err(anyhow::Error::msg(format!(
468 "MCP server returned HTTP {}",
469 status
470 )));
471 }
472 let is_event_stream = resp
473 .headers()
474 .get(reqwest::header::CONTENT_TYPE)
475 .and_then(|v| v.to_str().ok())
476 .is_some_and(|v| v.to_ascii_lowercase().contains("text/event-stream"));
477 if !is_event_stream {
478 self.stream_state = SseStreamState::Unsupported;
479 return Ok(());
480 }
481
482 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
483 self.shutdown_tx = Some(shutdown_tx);
484
485 let shared = self.shared.clone();
486 let notify = self.notify.clone();
487 let sse_url = self.sse_url.clone();
488 let server_name = self.server_name.clone();
489
490 self.reader_task = Some(tokio::spawn(async move {
491 let stream = resp
492 .bytes_stream()
493 .map(|item| item.map_err(std::io::Error::other));
494 let reader = tokio_util::io::StreamReader::new(stream);
495 let mut lines = BufReader::new(reader).lines();
496
497 let mut cur_event: Option<String> = None;
498 let mut cur_id: Option<String> = None;
499 let mut cur_data: Vec<String> = Vec::new();
500
501 loop {
502 tokio::select! {
503 _ = &mut shutdown_rx => {
504 break;
505 }
506 line = lines.next_line() => {
507 let Ok(line_opt) = line else { break; };
508 let Some(mut line) = line_opt else { break; };
509 if line.ends_with('\r') {
510 line.pop();
511 }
512 if line.is_empty() {
513 if cur_event.is_none() && cur_id.is_none() && cur_data.is_empty() {
514 continue;
515 }
516 let event = cur_event.take();
517 let data = cur_data.join("\n");
518 cur_data.clear();
519 let id = cur_id.take();
520 handle_sse_event(&server_name, &sse_url, &shared, ¬ify, event.as_deref(), id.as_deref(), data).await;
521 continue;
522 }
523
524 if line.starts_with(':') {
525 continue;
526 }
527
528 if let Some(rest) = line.strip_prefix("event:") {
529 cur_event = Some(rest.trim().to_string());
530 }
531 if let Some(rest) = line.strip_prefix("data:") {
532 let rest = rest.strip_prefix(' ').unwrap_or(rest);
533 cur_data.push(rest.to_string());
534 }
535 if let Some(rest) = line.strip_prefix("id:") {
536 cur_id = Some(rest.trim().to_string());
537 }
538 }
539 }
540 }
541
542 let pending = {
543 let mut guard = shared.lock().await;
544 std::mem::take(&mut guard.pending)
545 };
546 for (_, tx) in pending {
547 let _ = tx.send(JsonRpcResponse {
548 jsonrpc: crate::mcp_protocol::JSONRPC_VERSION.to_string(),
549 id: None,
550 result: None,
551 error: Some(JsonRpcError {
552 code: INTERNAL_ERROR,
553 message: "SSE connection closed".to_string(),
554 data: None,
555 }),
556 });
557 }
558 }));
559 self.stream_state = SseStreamState::Connected;
560
561 Ok(())
562 }
563
564 async fn get_message_url(&self) -> Result<(String, bool)> {
565 let guard = self.shared.lock().await;
566 if let Some(url) = &guard.message_url {
567 return Ok((url.clone(), guard.message_url_from_endpoint));
568 }
569 drop(guard);
570
571 let derived = derive_message_url(&self.sse_url, "messages")
572 .or_else(|| derive_message_url(&self.sse_url, "message"))
573 .ok_or_else(|| {
574 ::zeroclaw_log::record!(
575 WARN,
576 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
577 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
578 .with_attrs(::serde_json::json!({"sse_url": &self.sse_url})),
579 "mcp_transport: invalid SSE URL"
580 );
581 anyhow::Error::msg("invalid SSE URL")
582 })?;
583 let mut guard = self.shared.lock().await;
584 if guard.message_url.is_none() {
585 guard.message_url = Some(derived.clone());
586 guard.message_url_from_endpoint = false;
587 }
588 Ok((derived, false))
589 }
590}
591
592#[derive(Default)]
593struct SseSharedState {
594 message_url: Option<String>,
595 message_url_from_endpoint: bool,
596 pending: std::collections::HashMap<u64, oneshot::Sender<JsonRpcResponse>>,
597}
598
599fn derive_message_url(sse_url: &str, message_path: &str) -> Option<String> {
600 let url = reqwest::Url::parse(sse_url).ok()?;
601 let mut segments: Vec<&str> = url.path_segments()?.collect();
602 if segments.is_empty() {
603 return None;
604 }
605 if segments.last().copied() == Some("sse") {
606 segments.pop();
607 segments.push(message_path);
608 let mut new_url = url.clone();
609 new_url.set_path(&format!("/{}", segments.join("/")));
610 return Some(new_url.to_string());
611 }
612 let mut new_url = url.clone();
613 let mut path = url.path().trim_end_matches('/').to_string();
614 path.push('/');
615 path.push_str(message_path);
616 new_url.set_path(&path);
617 Some(new_url.to_string())
618}
619
620async fn handle_sse_event(
621 server_name: &str,
622 sse_url: &str,
623 shared: &std::sync::Arc<Mutex<SseSharedState>>,
624 notify: &std::sync::Arc<Notify>,
625 event: Option<&str>,
626 _id: Option<&str>,
627 data: String,
628) {
629 let event = event.unwrap_or("message");
630 let trimmed = data.trim();
631 if trimmed.is_empty() {
632 return;
633 }
634
635 if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint") {
636 if let Some(url) = parse_endpoint_from_data(sse_url, trimmed) {
637 let mut guard = shared.lock().await;
638 guard.message_url = Some(url);
639 guard.message_url_from_endpoint = true;
640 drop(guard);
641 notify.notify_waiters();
642 }
643 return;
644 }
645
646 if !event.eq_ignore_ascii_case("message") {
647 return;
648 }
649
650 let Ok(value) = serde_json::from_str::<serde_json::Value>(trimmed) else {
651 return;
652 };
653
654 let Ok(resp) = serde_json::from_value::<JsonRpcResponse>(value.clone()) else {
655 let _ = serde_json::from_value::<JsonRpcRequest>(value);
656 return;
657 };
658
659 let Some(id_val) = resp.id.clone() else {
660 return;
661 };
662 let id = match id_val.as_u64() {
663 Some(v) => v,
664 None => return,
665 };
666
667 let tx = {
668 let mut guard = shared.lock().await;
669 guard.pending.remove(&id)
670 };
671 if let Some(tx) = tx {
672 let _ = tx.send(resp);
673 } else {
674 ::zeroclaw_log::record!(
675 DEBUG,
676 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
677 &format!(
678 "MCP SSE `{}` received response for unknown id {}",
679 server_name, id
680 )
681 );
682 }
683}
684
685fn parse_endpoint_from_data(sse_url: &str, data: &str) -> Option<String> {
686 if data.starts_with('{') {
687 let v: serde_json::Value = serde_json::from_str(data).ok()?;
688 let endpoint = v.get("endpoint")?.as_str()?;
689 return parse_endpoint_from_data(sse_url, endpoint);
690 }
691 if data.starts_with("http://") || data.starts_with("https://") {
692 return Some(data.to_string());
693 }
694 let base = reqwest::Url::parse(sse_url).ok()?;
695 base.join(data).ok().map(|u| u.to_string())
696}
697
698fn extract_json_from_sse_text(resp_text: &str) -> Cow<'_, str> {
699 let text = resp_text.trim_start_matches('\u{feff}');
700 let mut current_data_lines: Vec<&str> = Vec::new();
701 let mut last_event_data_lines: Vec<&str> = Vec::new();
702
703 for raw_line in text.lines() {
704 let line = raw_line.trim_end_matches('\r').trim_start();
705 if line.is_empty() {
706 if !current_data_lines.is_empty() {
707 last_event_data_lines = std::mem::take(&mut current_data_lines);
708 }
709 continue;
710 }
711
712 if line.starts_with(':') {
713 continue;
714 }
715
716 if let Some(rest) = line.strip_prefix("data:") {
717 let rest = rest.strip_prefix(' ').unwrap_or(rest);
718 current_data_lines.push(rest);
719 }
720 }
721
722 if !current_data_lines.is_empty() {
723 last_event_data_lines = current_data_lines;
724 }
725
726 if last_event_data_lines.is_empty() {
727 return Cow::Borrowed(text.trim());
728 }
729
730 if last_event_data_lines.len() == 1 {
731 return Cow::Borrowed(last_event_data_lines[0].trim());
732 }
733
734 let joined = last_event_data_lines.join("\n");
735 Cow::Owned(joined.trim().to_string())
736}
737
738fn parse_jsonrpc_response_text(resp_text: &str) -> Result<JsonRpcResponse> {
739 let trimmed = resp_text.trim();
740 if trimmed.is_empty() {
741 bail!("MCP server returned no response");
742 }
743
744 let json_text = if looks_like_sse_text(trimmed) {
745 extract_json_from_sse_text(trimmed)
746 } else {
747 Cow::Borrowed(trimmed)
748 };
749
750 let mcp_resp: JsonRpcResponse = serde_json::from_str(json_text.as_ref())
751 .with_context(|| format!("invalid JSON-RPC response: {}", resp_text))?;
752 Ok(mcp_resp)
753}
754
755fn looks_like_sse_text(text: &str) -> bool {
756 text.starts_with("data:")
757 || text.starts_with("event:")
758 || text.contains("\ndata:")
759 || text.contains("\nevent:")
760}
761
762async fn read_first_jsonrpc_from_sse_response(
763 resp: reqwest::Response,
764) -> Result<Option<JsonRpcResponse>> {
765 let stream = resp
766 .bytes_stream()
767 .map(|item| item.map_err(std::io::Error::other));
768 let reader = tokio_util::io::StreamReader::new(stream);
769 let mut lines = BufReader::new(reader).lines();
770
771 let mut cur_event: Option<String> = None;
772 let mut cur_data: Vec<String> = Vec::new();
773
774 while let Ok(line_opt) = lines.next_line().await {
775 let Some(mut line) = line_opt else { break };
776 if line.ends_with('\r') {
777 line.pop();
778 }
779 if line.is_empty() {
780 if cur_event.is_none() && cur_data.is_empty() {
781 continue;
782 }
783 let event = cur_event.take();
784 let data = cur_data.join("\n");
785 cur_data.clear();
786
787 let event = event.unwrap_or_else(|| "message".to_string());
788 if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint")
789 {
790 continue;
791 }
792 if !event.eq_ignore_ascii_case("message") {
793 continue;
794 }
795
796 let trimmed = data.trim();
797 if trimmed.is_empty() {
798 continue;
799 }
800 let json_str = extract_json_from_sse_text(trimmed);
801 if let Ok(resp) = serde_json::from_str::<JsonRpcResponse>(json_str.as_ref()) {
802 return Ok(Some(resp));
803 }
804 continue;
805 }
806
807 if line.starts_with(':') {
808 continue;
809 }
810 if let Some(rest) = line.strip_prefix("event:") {
811 cur_event = Some(rest.trim().to_string());
812 }
813 if let Some(rest) = line.strip_prefix("data:") {
814 let rest = rest.strip_prefix(' ').unwrap_or(rest);
815 cur_data.push(rest.to_string());
816 }
817 }
818
819 Ok(None)
820}
821
822#[async_trait::async_trait]
823impl McpTransportConn for SseTransport {
824 async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
825 self.ensure_connected().await?;
826
827 let id = request.id.as_ref().and_then(|v| v.as_u64());
828 let body = serde_json::to_string(request)?;
829
830 let (mut message_url, mut from_endpoint) = self.get_message_url().await?;
831 if self.stream_state == SseStreamState::Connected && !from_endpoint {
832 for _ in 0..3 {
833 {
834 let guard = self.shared.lock().await;
835 if guard.message_url_from_endpoint
836 && let Some(url) = &guard.message_url
837 {
838 message_url = url.clone();
839 from_endpoint = true;
840 break;
841 }
842 }
843 let _ = timeout(Duration::from_millis(300), self.notify.notified()).await;
844 }
845 }
846 let primary_url = if from_endpoint {
847 message_url.clone()
848 } else {
849 self.sse_url.clone()
850 };
851 let secondary_url = if message_url == self.sse_url {
852 None
853 } else if primary_url == message_url {
854 Some(self.sse_url.clone())
855 } else {
856 Some(message_url.clone())
857 };
858 let has_secondary = secondary_url.is_some();
859
860 let mut rx = None;
861 if let Some(id) = id
862 && self.stream_state == SseStreamState::Connected
863 {
864 let (tx, ch) = oneshot::channel();
865 {
866 let mut guard = self.shared.lock().await;
867 guard.pending.insert(id, tx);
868 }
869 rx = Some((id, ch));
870 }
871
872 let mut got_direct = None;
873 let mut last_status = None;
874
875 for (i, url) in std::iter::once(primary_url)
876 .chain(secondary_url)
877 .enumerate()
878 {
879 let has_accept = self
880 .headers
881 .keys()
882 .any(|k| k.eq_ignore_ascii_case("Accept"));
883 let has_content_type = self
884 .headers
885 .keys()
886 .any(|k| k.eq_ignore_ascii_case("Content-Type"));
887 let mut req = apply_request_timeout(
888 self.client.post(&url).body(body.clone()),
889 http_request_timeout_secs(request, self.tool_timeout_secs),
890 );
891 if !has_content_type {
892 req = req.header("Content-Type", MCP_JSON_CONTENT_TYPE);
893 }
894 for (key, value) in &self.headers {
895 req = req.header(key, value);
896 }
897 if !has_accept {
898 req = req.header("Accept", MCP_STREAMABLE_ACCEPT);
899 }
900
901 let resp = req.send().await.context("SSE POST to MCP server failed")?;
902 let status = resp.status();
903 last_status = Some(status);
904
905 if (status == reqwest::StatusCode::NOT_FOUND
906 || status == reqwest::StatusCode::METHOD_NOT_ALLOWED)
907 && i == 0
908 {
909 continue;
910 }
911
912 if !status.is_success() {
913 break;
914 }
915
916 if request.id.is_none() {
917 got_direct = Some(JsonRpcResponse {
918 jsonrpc: crate::mcp_protocol::JSONRPC_VERSION.to_string(),
919 id: None,
920 result: None,
921 error: None,
922 });
923 break;
924 }
925
926 let is_sse = resp
927 .headers()
928 .get(reqwest::header::CONTENT_TYPE)
929 .and_then(|v| v.to_str().ok())
930 .is_some_and(|v| v.to_ascii_lowercase().contains("text/event-stream"));
931
932 if is_sse {
933 if i == 0 && has_secondary {
934 match timeout(
935 Duration::from_secs(3),
936 read_first_jsonrpc_from_sse_response(resp),
937 )
938 .await
939 {
940 Ok(res) => {
941 if let Some(resp) = res? {
942 got_direct = Some(resp);
943 }
944 break;
945 }
946 Err(_) => continue,
947 }
948 }
949 if let Some(resp) = read_first_jsonrpc_from_sse_response(resp).await? {
950 got_direct = Some(resp);
951 }
952 break;
953 }
954
955 let text = if i == 0 && has_secondary {
956 match timeout(Duration::from_secs(3), resp.text()).await {
957 Ok(Ok(t)) => t,
958 Ok(Err(_)) => String::new(),
959 Err(_) => continue,
960 }
961 } else {
962 resp.text().await.unwrap_or_default()
963 };
964 let trimmed = text.trim();
965 if !trimmed.is_empty() {
966 let json_str = if trimmed.contains("\ndata:") || trimmed.starts_with("data:") {
967 extract_json_from_sse_text(trimmed)
968 } else {
969 Cow::Borrowed(trimmed)
970 };
971 if let Ok(mcp_resp) = serde_json::from_str::<JsonRpcResponse>(json_str.as_ref()) {
972 got_direct = Some(mcp_resp);
973 }
974 }
975 break;
976 }
977
978 if let Some((id, _)) = rx.as_ref() {
979 if got_direct.is_some() {
980 let mut guard = self.shared.lock().await;
981 guard.pending.remove(id);
982 } else if let Some(status) = last_status
983 && !status.is_success()
984 {
985 let mut guard = self.shared.lock().await;
986 guard.pending.remove(id);
987 }
988 }
989
990 if let Some(resp) = got_direct {
991 return Ok(resp);
992 }
993
994 if let Some(status) = last_status {
995 if !status.is_success() {
996 bail!("MCP server returned HTTP {}", status);
997 }
998 } else {
999 bail!("MCP request not sent");
1000 }
1001
1002 let Some((_id, rx)) = rx else {
1003 bail!("MCP server returned no response");
1004 };
1005
1006 rx.await.map_err(|_| {
1007 ::zeroclaw_log::record!(
1008 ERROR,
1009 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
1010 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
1011 "mcp_transport: SSE response channel closed"
1012 );
1013 anyhow::Error::msg("SSE response channel closed")
1014 })
1015 }
1016
1017 async fn close(&mut self) -> Result<()> {
1018 if let Some(tx) = self.shutdown_tx.take() {
1019 let _ = tx.send(());
1020 }
1021 if let Some(task) = self.reader_task.take() {
1022 task.abort();
1023 }
1024 Ok(())
1025 }
1026}
1027
1028pub fn create_transport(config: &McpServerConfig) -> Result<Box<dyn McpTransportConn>> {
1032 match config.transport {
1033 McpTransport::Stdio => Ok(Box::new(StdioTransport::new(config)?)),
1034 McpTransport::Http => Ok(Box::new(HttpTransport::new(config)?)),
1035 McpTransport::Sse => Ok(Box::new(SseTransport::new(config)?)),
1036 }
1037}
1038
1039#[cfg(test)]
1042mod tests {
1043 use super::*;
1044
1045 #[test]
1046 fn test_transport_default_is_stdio() {
1047 let config = McpServerConfig::default();
1048 assert_eq!(config.transport, McpTransport::Stdio);
1049 }
1050
1051 #[test]
1052 fn test_http_transport_requires_url() {
1053 let config = McpServerConfig {
1054 name: "test".into(),
1055 transport: McpTransport::Http,
1056 ..Default::default()
1057 };
1058 assert!(HttpTransport::new(&config).is_err());
1059 }
1060
1061 #[test]
1062 fn test_sse_transport_requires_url() {
1063 let config = McpServerConfig {
1064 name: "test".into(),
1065 transport: McpTransport::Sse,
1066 ..Default::default()
1067 };
1068 assert!(SseTransport::new(&config).is_err());
1069 }
1070
1071 #[test]
1072 fn http_request_timeout_defaults_non_tool_requests_to_legacy_value() {
1073 let request = JsonRpcRequest::new(1, "initialize", serde_json::json!({}));
1074 assert_eq!(
1075 http_request_timeout_secs(&request, None),
1076 Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS)
1077 );
1078 }
1079
1080 #[test]
1081 fn http_request_timeout_does_not_shorten_non_tool_requests_from_tool_config() {
1082 let request = JsonRpcRequest::new(1, "tools/list", serde_json::json!({}));
1083 assert_eq!(
1084 http_request_timeout_secs(&request, Some(5)),
1085 Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS)
1086 );
1087 }
1088
1089 #[test]
1090 fn http_request_timeout_honors_configured_tool_call_timeout_above_legacy_value() {
1091 let request = JsonRpcRequest::new(1, TOOLS_CALL_METHOD, serde_json::json!({}));
1092 assert_eq!(
1093 http_request_timeout_secs(&request, Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)),
1094 Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)
1095 );
1096 }
1097
1098 #[test]
1099 fn http_request_timeout_leaves_default_tool_call_budget_to_client_wrapper() {
1100 let request = JsonRpcRequest::new(1, TOOLS_CALL_METHOD, serde_json::json!({}));
1101 assert_eq!(http_request_timeout_secs(&request, None), None);
1102 }
1103
1104 #[test]
1105 fn http_sse_read_timeout_defaults_non_tool_requests_to_recv_timeout() {
1106 let request = JsonRpcRequest::new(1, "initialize", serde_json::json!({}));
1107 assert_eq!(
1108 http_sse_read_timeout_secs(&request, None),
1109 Some(RECV_TIMEOUT_SECS)
1110 );
1111 }
1112
1113 #[test]
1114 fn http_sse_read_timeout_honors_configured_tool_call_timeout() {
1115 let request = JsonRpcRequest::new(1, TOOLS_CALL_METHOD, serde_json::json!({}));
1116 assert_eq!(
1117 http_sse_read_timeout_secs(&request, Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)),
1118 Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)
1119 );
1120 }
1121
1122 #[test]
1123 fn http_sse_read_timeout_leaves_default_tool_call_budget_to_client_wrapper() {
1124 let request = JsonRpcRequest::new(1, TOOLS_CALL_METHOD, serde_json::json!({}));
1125 assert_eq!(http_sse_read_timeout_secs(&request, None), None);
1126 }
1127
1128 #[test]
1129 fn http_transport_stores_configured_tool_timeout() {
1130 let config = McpServerConfig {
1131 name: "test-http".into(),
1132 transport: McpTransport::Http,
1133 url: Some("http://localhost/mcp".into()),
1134 tool_timeout_secs: Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60),
1135 ..Default::default()
1136 };
1137 let transport = HttpTransport::new(&config).expect("build transport");
1138 assert_eq!(
1139 transport.tool_timeout_secs,
1140 Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)
1141 );
1142 }
1143
1144 #[test]
1145 fn sse_transport_stores_configured_tool_timeout() {
1146 let config = McpServerConfig {
1147 name: "test-sse".into(),
1148 transport: McpTransport::Sse,
1149 url: Some("http://localhost/sse".into()),
1150 tool_timeout_secs: Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60),
1151 ..Default::default()
1152 };
1153 let transport = SseTransport::new(&config).expect("build transport");
1154 assert_eq!(
1155 transport.tool_timeout_secs,
1156 Some(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS + 60)
1157 );
1158 }
1159
1160 #[test]
1161 fn test_extract_json_from_sse_data_no_space() {
1162 let input = "data:{\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1163 let extracted = extract_json_from_sse_text(input);
1164 let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1165 }
1166
1167 #[test]
1168 fn test_extract_json_from_sse_with_event_and_id() {
1169 let input = "id: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1170 let extracted = extract_json_from_sse_text(input);
1171 let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1172 }
1173
1174 #[test]
1175 fn test_extract_json_from_sse_multiline_data() {
1176 let input = "event: message\ndata: {\ndata: \"jsonrpc\": \"2.0\",\ndata: \"result\": {}\ndata: }\n\n";
1177 let extracted = extract_json_from_sse_text(input);
1178 let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1179 }
1180
1181 #[test]
1182 fn test_extract_json_from_sse_skips_bom_and_leading_whitespace() {
1183 let input = "\u{feff}\n\n data: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1184 let extracted = extract_json_from_sse_text(input);
1185 let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1186 }
1187
1188 #[test]
1189 fn test_extract_json_from_sse_uses_last_event_with_data() {
1190 let input =
1191 ": keep-alive\n\nid: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1192 let extracted = extract_json_from_sse_text(input);
1193 let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
1194 }
1195
1196 #[test]
1197 fn test_parse_jsonrpc_response_text_handles_plain_json() {
1198 let parsed = parse_jsonrpc_response_text("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{}}")
1199 .expect("plain JSON response should parse");
1200 assert_eq!(parsed.id, Some(serde_json::json!(1)));
1201 assert!(parsed.error.is_none());
1202 }
1203
1204 #[test]
1205 fn test_parse_jsonrpc_response_text_handles_sse_framed_json() {
1206 let sse =
1207 "event: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":2,\"result\":{\"ok\":true}}\n\n";
1208 let parsed =
1209 parse_jsonrpc_response_text(sse).expect("SSE-framed JSON response should parse");
1210 assert_eq!(parsed.id, Some(serde_json::json!(2)));
1211 assert_eq!(
1212 parsed
1213 .result
1214 .as_ref()
1215 .and_then(|v| v.get("ok"))
1216 .and_then(|v| v.as_bool()),
1217 Some(true)
1218 );
1219 }
1220
1221 #[test]
1222 fn test_parse_jsonrpc_response_text_rejects_empty_payload() {
1223 assert!(parse_jsonrpc_response_text(" \n\t ").is_err());
1224 }
1225
1226 #[test]
1227 fn http_transport_updates_session_id_from_response_headers() {
1228 let config = McpServerConfig {
1229 name: "test-http".into(),
1230 transport: McpTransport::Http,
1231 url: Some("http://localhost/mcp".into()),
1232 ..Default::default()
1233 };
1234 let mut transport = HttpTransport::new(&config).expect("build transport");
1235
1236 let mut headers = reqwest::header::HeaderMap::new();
1237 headers.insert(
1238 reqwest::header::HeaderName::from_static("mcp-session-id"),
1239 reqwest::header::HeaderValue::from_static("session-abc"),
1240 );
1241 transport.update_session_id_from_headers(&headers);
1242 assert_eq!(transport.session_id.as_deref(), Some("session-abc"));
1243 }
1244
1245 #[test]
1246 fn http_transport_injects_session_id_header_when_available() {
1247 let config = McpServerConfig {
1248 name: "test-http".into(),
1249 transport: McpTransport::Http,
1250 url: Some("http://localhost/mcp".into()),
1251 ..Default::default()
1252 };
1253 let mut transport = HttpTransport::new(&config).expect("build transport");
1254 transport.session_id = Some("session-xyz".to_string());
1255
1256 let req = transport
1257 .apply_session_header(reqwest::Client::new().post("http://localhost/mcp"))
1258 .build()
1259 .expect("build request");
1260 assert_eq!(
1261 req.headers()
1262 .get(MCP_SESSION_ID_HEADER)
1263 .and_then(|v| v.to_str().ok()),
1264 Some("session-xyz")
1265 );
1266 }
1267
1268 #[test]
1271 fn derive_message_url_replaces_sse_segment_with_messages() {
1272 let url = derive_message_url("http://localhost:3000/mcp/sse", "messages");
1273 assert_eq!(url, Some("http://localhost:3000/mcp/messages".to_string()));
1274 }
1275
1276 #[test]
1277 fn derive_message_url_appends_when_no_sse_segment() {
1278 let url = derive_message_url("http://localhost:3000/mcp", "messages");
1279 assert_eq!(url, Some("http://localhost:3000/mcp/messages".to_string()));
1280 }
1281
1282 #[test]
1283 fn derive_message_url_returns_none_for_invalid_url() {
1284 let url = derive_message_url("not-a-url", "messages");
1285 assert!(url.is_none());
1286 }
1287
1288 #[test]
1289 fn derive_message_url_message_path_variant() {
1290 let url = derive_message_url("http://localhost:3000/mcp/sse", "message");
1291 assert_eq!(url, Some("http://localhost:3000/mcp/message".to_string()));
1292 }
1293
1294 #[test]
1297 fn parse_endpoint_absolute_http_url_returned_as_is() {
1298 let result = parse_endpoint_from_data("http://base/sse", "http://other/messages");
1299 assert_eq!(result, Some("http://other/messages".to_string()));
1300 }
1301
1302 #[test]
1303 fn parse_endpoint_absolute_https_url_returned_as_is() {
1304 let result = parse_endpoint_from_data("https://base/sse", "https://other/messages");
1305 assert_eq!(result, Some("https://other/messages".to_string()));
1306 }
1307
1308 #[test]
1309 fn parse_endpoint_relative_path_resolved_against_base() {
1310 let result = parse_endpoint_from_data("http://localhost:3000/sse", "/messages");
1311 assert_eq!(result, Some("http://localhost:3000/messages".to_string()));
1312 }
1313
1314 #[test]
1315 fn parse_endpoint_json_object_with_endpoint_key() {
1316 let json_data = r#"{"endpoint":"/messages"}"#;
1317 let result = parse_endpoint_from_data("http://localhost:3000/sse", json_data);
1318 assert_eq!(result, Some("http://localhost:3000/messages".to_string()));
1319 }
1320
1321 #[test]
1324 fn looks_like_sse_text_detects_data_prefix() {
1325 assert!(looks_like_sse_text("data:{\"jsonrpc\":\"2.0\"}"));
1326 }
1327
1328 #[test]
1329 fn looks_like_sse_text_detects_event_prefix() {
1330 assert!(looks_like_sse_text("event: message\ndata: {}"));
1331 }
1332
1333 #[test]
1334 fn looks_like_sse_text_detects_embedded_data_line() {
1335 assert!(looks_like_sse_text("id: 1\ndata:{\"x\":1}"));
1336 }
1337
1338 #[test]
1339 fn looks_like_sse_text_plain_json_is_not_sse() {
1340 assert!(!looks_like_sse_text(
1341 "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{}}"
1342 ));
1343 }
1344
1345 #[test]
1348 fn extract_json_skips_comment_lines() {
1349 let input = ": keep-alive\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
1350 let extracted = extract_json_from_sse_text(input);
1351 let v: serde_json::Value = serde_json::from_str(extracted.as_ref()).unwrap();
1352 assert_eq!(v["jsonrpc"], "2.0");
1353 }
1354
1355 #[test]
1356 fn extract_json_empty_input_returns_empty_trimmed() {
1357 let result = extract_json_from_sse_text(" ");
1358 assert!(result.as_ref().trim().is_empty());
1359 }
1360
1361 #[test]
1362 fn extract_json_plain_json_returned_unchanged() {
1363 let input = "{\"jsonrpc\":\"2.0\",\"result\":{}}";
1364 let extracted = extract_json_from_sse_text(input);
1365 assert_eq!(extracted.as_ref(), input);
1367 }
1368
1369 #[test]
1372 fn parse_jsonrpc_response_rejects_whitespace_only() {
1373 assert!(parse_jsonrpc_response_text(" \n\t ").is_err());
1374 }
1375
1376 #[test]
1377 fn parse_jsonrpc_response_with_error_result() {
1378 let json = r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32601,"message":"not found"}}"#;
1379 let resp = parse_jsonrpc_response_text(json).unwrap();
1380 assert!(resp.error.is_some());
1381 assert_eq!(resp.error.unwrap().code, -32601);
1382 }
1383
1384 #[test]
1387 fn create_transport_stdio_fails_without_valid_command() {
1388 let config = McpServerConfig {
1390 name: "test-stdio".into(),
1391 transport: McpTransport::Stdio,
1392 command: "/usr/bin/zeroclaw_nonexistent_binary_abc123".into(),
1393 ..Default::default()
1394 };
1395 let result = create_transport(&config);
1396 assert!(result.is_err());
1397 }
1398
1399 #[test]
1400 fn create_transport_http_without_url_fails() {
1401 let config = McpServerConfig {
1402 name: "test-http".into(),
1403 transport: McpTransport::Http,
1404 ..Default::default()
1405 };
1406 assert!(create_transport(&config).is_err());
1407 }
1408
1409 #[test]
1410 fn create_transport_sse_without_url_fails() {
1411 let config = McpServerConfig {
1412 name: "test-sse".into(),
1413 transport: McpTransport::Sse,
1414 ..Default::default()
1415 };
1416 assert!(create_transport(&config).is_err());
1417 }
1418
1419 #[test]
1420 fn create_transport_http_with_url_succeeds() {
1421 let config = McpServerConfig {
1422 name: "test-http".into(),
1423 transport: McpTransport::Http,
1424 url: Some("http://localhost:9999/mcp".into()),
1425 ..Default::default()
1426 };
1427 assert!(create_transport(&config).is_ok());
1429 }
1430
1431 #[test]
1432 fn create_transport_sse_with_url_succeeds() {
1433 let config = McpServerConfig {
1434 name: "test-sse".into(),
1435 transport: McpTransport::Sse,
1436 url: Some("http://localhost:9999/sse".into()),
1437 ..Default::default()
1438 };
1439 assert!(create_transport(&config).is_ok());
1440 }
1441
1442 #[test]
1445 fn http_transport_ignores_empty_session_id_header() {
1446 let config = McpServerConfig {
1447 name: "test-http".into(),
1448 transport: McpTransport::Http,
1449 url: Some("http://localhost/mcp".into()),
1450 ..Default::default()
1451 };
1452 let mut transport = HttpTransport::new(&config).expect("build transport");
1453 let mut headers = reqwest::header::HeaderMap::new();
1454 headers.insert(
1455 reqwest::header::HeaderName::from_static("mcp-session-id"),
1456 reqwest::header::HeaderValue::from_static(" "),
1457 );
1458 transport.update_session_id_from_headers(&headers);
1459 assert!(transport.session_id.is_none());
1461 }
1462
1463 #[test]
1464 fn http_transport_no_session_header_leaves_none() {
1465 let config = McpServerConfig {
1466 name: "test-http".into(),
1467 transport: McpTransport::Http,
1468 url: Some("http://localhost/mcp".into()),
1469 ..Default::default()
1470 };
1471 let transport = HttpTransport::new(&config).expect("build transport");
1472 assert!(transport.session_id.is_none());
1473 }
1474
1475 #[test]
1476 fn http_transport_apply_session_header_noop_when_no_session() {
1477 let config = McpServerConfig {
1478 name: "test-http".into(),
1479 transport: McpTransport::Http,
1480 url: Some("http://localhost/mcp".into()),
1481 ..Default::default()
1482 };
1483 let transport = HttpTransport::new(&config).expect("build transport");
1484 let req = transport
1485 .apply_session_header(reqwest::Client::new().post("http://localhost/mcp"))
1486 .build()
1487 .expect("build request");
1488 assert!(req.headers().get(MCP_SESSION_ID_HEADER).is_none());
1489 }
1490}