Skip to main content

zeroclaw_runtime/rpc/
local.rs

1//! Local IPC transport for the RPC layer.
2//!
3//! On Unix this binds a `SOCK_STREAM` AF_UNIX socket at
4//! `<config.data_dir>/daemon.sock`; on Windows it creates a per-user named
5//! pipe whose name is derived from the data_dir so each `--data-dir` gets
6//! its own endpoint. `$ZEROCLAW_SOCKET` overrides the endpoint path on
7//! both platforms.
8
9use super::context::RpcContext;
10use super::dispatch::RpcDispatcher;
11use super::transport::RpcTransport;
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
18use tokio::sync::mpsc;
19use tokio_util::sync::CancellationToken;
20use zeroclaw_config::schema::Config;
21
22use platform::LocalStream;
23
24/// Resolve the local-IPC endpoint path.
25///
26/// Returns `$ZEROCLAW_SOCKET` when set, otherwise a per-`data_dir`
27/// platform-native endpoint:
28/// - Unix: `<data_dir>/daemon.sock` (filesystem path)
29/// - Windows: `\\.\pipe\zeroclaw-<hash>` where `<hash>` is derived from
30///   `data_dir` so each data directory gets its own pipe
31pub fn socket_path(config: &Config) -> PathBuf {
32    if let Ok(p) = std::env::var("ZEROCLAW_SOCKET") {
33        return PathBuf::from(p);
34    }
35    platform::default_endpoint(&config.data_dir)
36}
37
38// ── Transport ────────────────────────────────────────────────────
39
40/// Platform-neutral half-write type produced by `tokio::io::split`.
41type LocalWriteHalf = tokio::io::WriteHalf<LocalStream>;
42/// Platform-neutral half-read type produced by `tokio::io::split`.
43type LocalReadHalf = tokio::io::ReadHalf<LocalStream>;
44
45pub struct LocalTransport {
46    reader: BufReader<LocalReadHalf>,
47    writer_tx: mpsc::Sender<String>,
48    peer_label: String,
49}
50
51impl LocalTransport {
52    pub fn new(stream: LocalStream) -> Self {
53        let peer_label = platform::peer_label_from(&stream);
54        let (read_half, write_half) = tokio::io::split(stream);
55
56        let (writer_tx, mut writer_rx) = mpsc::channel::<String>(64);
57        zeroclaw_spawn::spawn!(async move {
58            let mut writer: LocalWriteHalf = write_half;
59            while let Some(mut line) = writer_rx.recv().await {
60                if !line.ends_with('\n') {
61                    line.push('\n');
62                }
63                if writer.write_all(line.as_bytes()).await.is_err() {
64                    break;
65                }
66            }
67        });
68
69        Self {
70            reader: BufReader::new(read_half),
71            writer_tx,
72            peer_label,
73        }
74    }
75}
76
77#[async_trait]
78impl RpcTransport for LocalTransport {
79    fn writer(&self) -> mpsc::Sender<String> {
80        self.writer_tx.clone()
81    }
82
83    async fn next_frame(&mut self) -> Option<String> {
84        let mut line = String::new();
85        match self.reader.read_line(&mut line).await {
86            Ok(0) => None,
87            Ok(_) => Some(line),
88            Err(_) => None,
89        }
90    }
91
92    fn peer_label(&self) -> String {
93        self.peer_label.clone()
94    }
95}
96
97// ── Listener ─────────────────────────────────────────────────────
98
99/// Run the local IPC RPC listener as a daemon subsystem.
100///
101/// `client_count` is incremented on connect, decremented on disconnect.
102/// The daemon uses it for `--ephemeral` shutdown logic.
103pub async fn run_local_listener(
104    ctx: Arc<RpcContext>,
105    cancel: CancellationToken,
106    client_count: Arc<AtomicUsize>,
107) -> Result<()> {
108    let path = {
109        let config = ctx.config.read();
110        socket_path(&config)
111    };
112
113    platform::prepare_parent(&path).await?;
114    platform::remove_stale(&path).await?;
115
116    let mut listener = platform::bind(&path).context("binding local IPC endpoint")?;
117
118    platform::secure_endpoint(&path).await;
119
120    ::zeroclaw_log::record!(
121        INFO,
122        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
123            .with_attrs(::serde_json::json!({"path": path.display().to_string()})),
124        "RPC local IPC listening"
125    );
126
127    loop {
128        tokio::select! {
129            _ = cancel.cancelled() => {
130                ::zeroclaw_log::record!(
131                    INFO,
132                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
133                    "RPC local IPC shutting down"
134                );
135                break;
136            }
137            accept = platform::accept(&mut listener, &path) => {
138                let stream = match accept {
139                    Ok(v) => v,
140                    Err(e) => {
141                        ::zeroclaw_log::record!(
142                            WARN,
143                            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
144                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
145                            &format!("local IPC accept error: {e}")
146                        );
147                        continue;
148                    }
149                };
150
151                let ctx = ctx.clone();
152                let count = client_count.clone();
153
154                count.fetch_add(1, Ordering::Relaxed);
155
156                zeroclaw_spawn::spawn!(async move {
157                    let mut transport = LocalTransport::new(stream);
158                    let peer = transport.peer_label();
159                    let writer_tx = transport.writer();
160                    let mut dispatcher = RpcDispatcher::new(ctx.clone(), writer_tx, peer);
161                    dispatcher.run(&mut transport).await;
162
163                    if let Some(tui_id) = dispatcher.tui_id() {
164                        ctx.tui_registry.unregister(tui_id);
165                        use ::zeroclaw_log::Instrument as _;
166                        let span = ::zeroclaw_log::info_span!(
167                            target: "zeroclaw_log_internal_scope",
168                            "zeroclaw_scope",
169                            owner_tui_id = %tui_id,
170                            channel = "rpc",
171                        );
172                        async {
173                            ::zeroclaw_log::record!(
174                                INFO,
175                                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
176                                    .with_category(::zeroclaw_log::EventCategory::Agent),
177                                "TUI disconnected; sessions retained (persistent)"
178                            );
179                        }
180                        .instrument(span)
181                        .await;
182                    }
183
184                    count.fetch_sub(1, Ordering::Relaxed);
185                });
186            }
187        }
188    }
189
190    platform::cleanup(&path).await;
191    Ok(())
192}
193
194// ── Platform shims ───────────────────────────────────────────────
195
196#[cfg(unix)]
197mod platform {
198    use anyhow::{Context, Result};
199    use std::path::{Path, PathBuf};
200    use tokio::net::{UnixListener, UnixStream};
201
202    pub type LocalListener = UnixListener;
203    pub type LocalStream = UnixStream;
204
205    pub fn default_endpoint(data_dir: &Path) -> PathBuf {
206        data_dir.join("daemon.sock")
207    }
208
209    pub async fn prepare_parent(path: &Path) -> Result<()> {
210        if let Some(parent) = path.parent() {
211            tokio::fs::create_dir_all(parent).await?;
212            use std::os::unix::fs::PermissionsExt;
213            tokio::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700))
214                .await
215                .ok();
216        }
217        Ok(())
218    }
219
220    pub async fn remove_stale(path: &Path) -> Result<()> {
221        if path.exists() {
222            tokio::fs::remove_file(path)
223                .await
224                .context("removing stale socket")?;
225        }
226        Ok(())
227    }
228
229    pub fn bind(path: &Path) -> Result<LocalListener> {
230        UnixListener::bind(path).context("binding unix socket")
231    }
232
233    pub async fn secure_endpoint(path: &Path) {
234        use std::os::unix::fs::PermissionsExt;
235        tokio::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))
236            .await
237            .ok();
238    }
239
240    pub async fn accept(listener: &mut LocalListener, _path: &Path) -> Result<LocalStream> {
241        let (stream, _addr) = listener
242            .accept()
243            .await
244            .context("accepting local connection")?;
245        Ok(stream)
246    }
247
248    pub async fn cleanup(path: &Path) {
249        tokio::fs::remove_file(path).await.ok();
250    }
251
252    pub fn peer_label_from(stream: &LocalStream) -> String {
253        #[cfg(target_os = "linux")]
254        {
255            if let Ok(cred) = stream.peer_cred() {
256                return format!("unix:pid={},uid={}", cred.pid().unwrap_or(0), cred.uid());
257            }
258        }
259        let _ = stream;
260        "unix:unknown".to_string()
261    }
262}
263
264#[cfg(windows)]
265mod platform {
266    use anyhow::{Context, Result};
267    use std::path::{Path, PathBuf};
268    use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
269
270    /// On Windows the "listener" is a single pending server instance. After
271    /// each accept the caller creates a new pending instance for the next
272    /// client; see `accept`.
273    pub type LocalListener = NamedPipeServer;
274    pub type LocalStream = NamedPipeServer;
275
276    pub fn default_endpoint(data_dir: &Path) -> PathBuf {
277        use std::collections::hash_map::DefaultHasher;
278        use std::hash::{Hash, Hasher};
279        let mut hasher = DefaultHasher::new();
280        data_dir.hash(&mut hasher);
281        PathBuf::from(format!(r"\\.\pipe\zeroclaw-{:x}", hasher.finish()))
282    }
283
284    pub async fn prepare_parent(_path: &Path) -> Result<()> {
285        // Named pipes live in the kernel object namespace, not the
286        // filesystem — no parent directory to create.
287        Ok(())
288    }
289
290    pub async fn remove_stale(_path: &Path) -> Result<()> {
291        // Named pipes are cleaned up when the last handle closes; there is
292        // no "stale" file equivalent.
293        Ok(())
294    }
295
296    pub fn bind(path: &Path) -> Result<LocalListener> {
297        let name = path_to_pipe_name(path);
298        ServerOptions::new()
299            .first_pipe_instance(true)
300            .create(&name)
301            .with_context(|| format!("creating named pipe {name}"))
302    }
303
304    pub async fn secure_endpoint(_path: &Path) {
305        // The default ServerOptions ACL grants access to the creating user
306        // and SYSTEM, matching the spirit of Unix 0o600. Stricter SDDL is
307        // a separate hardening pass.
308    }
309
310    pub async fn accept(listener: &mut LocalListener, path: &Path) -> Result<LocalStream> {
311        listener
312            .connect()
313            .await
314            .context("awaiting named-pipe client")?;
315        // Take the now-connected pipe and replace `listener` with a fresh
316        // pending instance so the next accept call can wait on it.
317        let next = ServerOptions::new()
318            .create(path_to_pipe_name(path))
319            .context("creating next named-pipe instance")?;
320        let connected = std::mem::replace(listener, next);
321        Ok(connected)
322    }
323
324    pub async fn cleanup(_path: &Path) {
325        // Pipe handles drop with the server instance; nothing to remove.
326    }
327
328    pub fn peer_label_from(_stream: &LocalStream) -> String {
329        "pipe:local".to_string()
330    }
331
332    fn path_to_pipe_name(path: &Path) -> String {
333        path.to_string_lossy().into_owned()
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use crate::rpc::dispatch::Method;
341    use crate::rpc::session::SessionStore;
342    use crate::rpc::types::InitializeParams;
343    #[cfg(unix)]
344    use crate::rpc::types::StatusResult;
345    use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
346    use zeroclaw_api::jsonrpc::{JSONRPC_VERSION, JsonRpcRequest};
347    use zeroclaw_infra::session_queue::SessionActorQueue;
348
349    fn test_ctx(tmp: &std::path::Path) -> Arc<RpcContext> {
350        let config = Config {
351            data_dir: tmp.to_path_buf(),
352            config_path: tmp.join("config.toml"),
353            ..Config::default()
354        };
355        let session_queue = Arc::new(SessionActorQueue::new(4, 10, 60));
356        let sessions = Arc::new(SessionStore::new(64, session_queue));
357        RpcContext::minimal(config, sessions)
358    }
359
360    fn test_client_count() -> Arc<AtomicUsize> {
361        Arc::new(AtomicUsize::new(0))
362    }
363
364    fn rpc_request<T: serde::Serialize>(method: Method, params: &T, id: u64) -> String {
365        let req = JsonRpcRequest::new(
366            method.wire_name(),
367            serde_json::to_value(params).unwrap(),
368            serde_json::Value::Number(id.into()),
369        );
370        let mut s = serde_json::to_string(&req).unwrap();
371        s.push('\n');
372        s
373    }
374
375    #[cfg(unix)]
376    async fn read_result<T: serde::de::DeserializeOwned>(
377        reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
378    ) -> (serde_json::Value, T) {
379        let mut line = String::new();
380        reader.read_line(&mut line).await.unwrap();
381        let frame: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
382        assert!(frame["error"].is_null(), "unexpected RPC error: {frame}");
383        let result: T = serde_json::from_value(frame["result"].clone()).unwrap();
384        (frame, result)
385    }
386
387    #[cfg(unix)]
388    async fn do_initialize(
389        sock_path: &std::path::Path,
390    ) -> (
391        tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
392        tokio::net::unix::OwnedWriteHalf,
393    ) {
394        let stream = tokio::net::UnixStream::connect(sock_path).await.unwrap();
395        let (read_half, mut writer) = stream.into_split();
396        let mut reader = tokio::io::BufReader::new(read_half);
397
398        let params = InitializeParams {
399            protocol_version: 1,
400            tui_id: None,
401            tui_sig: None,
402            env: Default::default(),
403        };
404        writer
405            .write_all(rpc_request(Method::Initialize, &params, 1).as_bytes())
406            .await
407            .unwrap();
408
409        let (_frame, _result): (_, serde_json::Value) = read_result(&mut reader).await;
410        (reader, writer)
411    }
412
413    #[cfg(unix)]
414    async fn wait_for_socket(path: &std::path::Path) {
415        for _ in 0..50 {
416            if path.exists() {
417                return;
418            }
419            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
420        }
421        panic!("socket never appeared at {}", path.display());
422    }
423
424    #[cfg(unix)]
425    #[tokio::test]
426    async fn socket_initialize_handshake() {
427        let tmp = tempfile::tempdir().unwrap();
428        let ctx = test_ctx(tmp.path());
429        let sock_path = ctx.config.read().data_dir.join("daemon.sock");
430        let cancel = CancellationToken::new();
431
432        let server_cancel = cancel.clone();
433        let server_ctx = ctx.clone();
434        let handle = zeroclaw_spawn::spawn!(async move {
435            run_local_listener(server_ctx, server_cancel, test_client_count()).await
436        });
437
438        wait_for_socket(&sock_path).await;
439
440        let stream = tokio::net::UnixStream::connect(&sock_path).await.unwrap();
441        let (read_half, mut writer) = stream.into_split();
442        let mut reader = tokio::io::BufReader::new(read_half);
443
444        let init_params = InitializeParams {
445            protocol_version: 1,
446            tui_id: None,
447            tui_sig: None,
448            env: Default::default(),
449        };
450        writer
451            .write_all(rpc_request(Method::Initialize, &init_params, 1).as_bytes())
452            .await
453            .unwrap();
454
455        let (frame, init_result): (_, crate::rpc::types::InitializeResult) =
456            read_result(&mut reader).await;
457
458        assert_eq!(frame["jsonrpc"], JSONRPC_VERSION);
459        assert_eq!(frame["id"], 1);
460        assert_eq!(init_result.protocol_version, 1);
461        assert!(!init_result.server_version.is_empty());
462
463        writer
464            .write_all(rpc_request(Method::Status, &serde_json::json!({}), 2).as_bytes())
465            .await
466            .unwrap();
467
468        let (_frame2, status): (_, StatusResult) = read_result(&mut reader).await;
469        assert_eq!(status.active_sessions, 0);
470
471        cancel.cancel();
472        drop(writer);
473        let _ = handle.await;
474    }
475
476    #[cfg(unix)]
477    #[tokio::test]
478    async fn socket_rejects_before_initialize() {
479        let tmp = tempfile::tempdir().unwrap();
480        let ctx = test_ctx(tmp.path());
481        let sock_path = ctx.config.read().data_dir.join("daemon.sock");
482        let cancel = CancellationToken::new();
483
484        let server_cancel = cancel.clone();
485        let server_ctx = ctx.clone();
486        zeroclaw_spawn::spawn!(async move {
487            let _ = run_local_listener(server_ctx, server_cancel, test_client_count()).await;
488        });
489
490        wait_for_socket(&sock_path).await;
491
492        let stream = tokio::net::UnixStream::connect(&sock_path).await.unwrap();
493        let (reader, mut writer) = stream.into_split();
494        let mut reader = tokio::io::BufReader::new(reader);
495
496        writer
497            .write_all(rpc_request(Method::Status, &serde_json::json!({}), 1).as_bytes())
498            .await
499            .unwrap();
500
501        let mut line = String::new();
502        reader.read_line(&mut line).await.unwrap();
503        let resp: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
504
505        assert!(resp["error"].is_object());
506        assert_eq!(
507            resp["error"]["code"],
508            zeroclaw_api::jsonrpc::error_codes::AUTH_REQUIRED
509        );
510
511        cancel.cancel();
512    }
513
514    #[cfg(unix)]
515    #[tokio::test]
516    async fn socket_permissions() {
517        let tmp = tempfile::tempdir().unwrap();
518        let ctx = test_ctx(tmp.path());
519        let sock_path = ctx.config.read().data_dir.join("daemon.sock");
520        let cancel = CancellationToken::new();
521
522        let server_cancel = cancel.clone();
523        let server_ctx = ctx.clone();
524        zeroclaw_spawn::spawn!(async move {
525            let _ = run_local_listener(server_ctx, server_cancel, test_client_count()).await;
526        });
527
528        wait_for_socket(&sock_path).await;
529
530        use std::os::unix::fs::PermissionsExt;
531        let meta = std::fs::metadata(&sock_path).unwrap();
532        let mode = meta.permissions().mode() & 0o777;
533        assert_eq!(
534            mode, 0o600,
535            "socket should be owner-only (0o600), got {mode:#o}"
536        );
537
538        cancel.cancel();
539    }
540
541    #[cfg(unix)]
542    #[tokio::test]
543    async fn stale_socket_cleanup() {
544        let tmp = tempfile::tempdir().unwrap();
545        let ctx = test_ctx(tmp.path());
546        let sock_path = ctx.config.read().data_dir.join("daemon.sock");
547
548        std::fs::create_dir_all(tmp.path()).unwrap();
549        std::fs::write(&sock_path, b"stale").unwrap();
550        assert!(sock_path.exists());
551
552        let cancel = CancellationToken::new();
553        let server_cancel = cancel.clone();
554        let server_ctx = ctx.clone();
555        zeroclaw_spawn::spawn!(async move {
556            let _ = run_local_listener(server_ctx, server_cancel, test_client_count()).await;
557        });
558
559        for _ in 0..50 {
560            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
561            if tokio::net::UnixStream::connect(&sock_path).await.is_ok() {
562                break;
563            }
564        }
565
566        let stream = tokio::net::UnixStream::connect(&sock_path).await;
567        assert!(
568            stream.is_ok(),
569            "should be able to connect after stale cleanup"
570        );
571
572        cancel.cancel();
573    }
574
575    #[cfg(unix)]
576    #[tokio::test]
577    async fn session_approve_resolves_pending_approval() {
578        let tmp = tempfile::tempdir().unwrap();
579        let ctx = test_ctx(tmp.path());
580        let sock_path = ctx.config.read().data_dir.join("daemon.sock");
581        let cancel = CancellationToken::new();
582
583        let server_cancel = cancel.clone();
584        let server_ctx = ctx.clone();
585        zeroclaw_spawn::spawn!(async move {
586            let _ = run_local_listener(server_ctx, server_cancel, test_client_count()).await;
587        });
588        wait_for_socket(&sock_path).await;
589
590        let (mut reader, mut writer) = do_initialize(&sock_path).await;
591
592        let (pending_tx, mut pending_rx) =
593            tokio::sync::oneshot::channel::<zeroclaw_api::channel::ChannelApprovalResponse>();
594        ctx.approval_pending
595            .insert("test-req-1".to_string(), pending_tx);
596
597        let approve_params = serde_json::json!({
598            "session_id": "unused",
599            "request_id": "test-req-1",
600            "decision": "allow_once",
601        });
602        writer
603            .write_all(rpc_request(Method::SessionApprove, &approve_params, 10).as_bytes())
604            .await
605            .unwrap();
606
607        let (_frame, result): (_, serde_json::Value) = read_result(&mut reader).await;
608        assert_eq!(result["acknowledged"], true);
609
610        let decision = pending_rx.try_recv().expect("decision should be resolved");
611        assert_eq!(
612            decision,
613            zeroclaw_api::channel::ChannelApprovalResponse::Approve
614        );
615
616        cancel.cancel();
617    }
618
619    #[cfg(unix)]
620    #[tokio::test]
621    async fn client_count_tracks_connections() {
622        let tmp = tempfile::tempdir().unwrap();
623        let ctx = test_ctx(tmp.path());
624        let sock_path = ctx.config.read().data_dir.join("daemon.sock");
625        let cancel = CancellationToken::new();
626        let count = Arc::new(AtomicUsize::new(0));
627
628        let server_cancel = cancel.clone();
629        let server_ctx = ctx.clone();
630        let server_count = count.clone();
631        zeroclaw_spawn::spawn!(async move {
632            let _ = run_local_listener(server_ctx, server_cancel, server_count).await;
633        });
634
635        wait_for_socket(&sock_path).await;
636
637        assert_eq!(count.load(Ordering::Relaxed), 0);
638
639        let s1 = tokio::net::UnixStream::connect(&sock_path).await.unwrap();
640        let s2 = tokio::net::UnixStream::connect(&sock_path).await.unwrap();
641        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
642        assert_eq!(count.load(Ordering::Relaxed), 2);
643
644        drop(s1);
645        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
646        assert_eq!(count.load(Ordering::Relaxed), 1);
647
648        drop(s2);
649        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
650        assert_eq!(count.load(Ordering::Relaxed), 0);
651
652        cancel.cancel();
653    }
654
655    #[cfg(windows)]
656    #[tokio::test]
657    async fn pipe_initialize_handshake() {
658        use tokio::net::windows::named_pipe::ClientOptions;
659        use tokio::time::{Duration, sleep};
660
661        let tmp = tempfile::tempdir().unwrap();
662        let ctx = test_ctx(tmp.path());
663        let pipe_path = socket_path(&ctx.config.read());
664        let cancel = CancellationToken::new();
665
666        let server_cancel = cancel.clone();
667        let server_ctx = ctx.clone();
668        let handle = zeroclaw_spawn::spawn!(async move {
669            run_local_listener(server_ctx, server_cancel, test_client_count()).await
670        });
671
672        // Poll-connect until the server creates its pending instance.
673        let pipe_name = pipe_path.to_string_lossy().into_owned();
674        let mut client = None;
675        for _ in 0..50 {
676            match ClientOptions::new().open(&pipe_name) {
677                Ok(c) => {
678                    client = Some(c);
679                    break;
680                }
681                Err(_) => sleep(Duration::from_millis(20)).await,
682            }
683        }
684        let mut client = client.expect("named pipe never accepted a client");
685        let (read_half, mut write_half) = tokio::io::split(&mut client);
686        let mut reader = tokio::io::BufReader::new(read_half);
687
688        let init_params = InitializeParams {
689            protocol_version: 1,
690            tui_id: None,
691            tui_sig: None,
692            env: Default::default(),
693        };
694        write_half
695            .write_all(rpc_request(Method::Initialize, &init_params, 1).as_bytes())
696            .await
697            .unwrap();
698
699        let mut line = String::new();
700        reader.read_line(&mut line).await.unwrap();
701        let frame: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
702        assert!(frame["error"].is_null(), "unexpected RPC error: {frame}");
703        assert_eq!(frame["jsonrpc"], JSONRPC_VERSION);
704        assert_eq!(frame["id"], 1);
705
706        cancel.cancel();
707        drop(write_half);
708        drop(reader);
709        drop(client);
710        let _ = handle.await;
711    }
712}