1use super::context::RpcContext;
10use super::dispatch::RpcDispatcher;
11use super::transport::RpcTransport;
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
18use tokio::sync::mpsc;
19use tokio_util::sync::CancellationToken;
20use zeroclaw_config::schema::Config;
21
22use platform::LocalStream;
23
24pub fn socket_path(config: &Config) -> PathBuf {
32 if let Ok(p) = std::env::var("ZEROCLAW_SOCKET") {
33 return PathBuf::from(p);
34 }
35 platform::default_endpoint(&config.data_dir)
36}
37
38type LocalWriteHalf = tokio::io::WriteHalf<LocalStream>;
42type LocalReadHalf = tokio::io::ReadHalf<LocalStream>;
44
45pub struct LocalTransport {
46 reader: BufReader<LocalReadHalf>,
47 writer_tx: mpsc::Sender<String>,
48 peer_label: String,
49}
50
51impl LocalTransport {
52 pub fn new(stream: LocalStream) -> Self {
53 let peer_label = platform::peer_label_from(&stream);
54 let (read_half, write_half) = tokio::io::split(stream);
55
56 let (writer_tx, mut writer_rx) = mpsc::channel::<String>(64);
57 zeroclaw_spawn::spawn!(async move {
58 let mut writer: LocalWriteHalf = write_half;
59 while let Some(mut line) = writer_rx.recv().await {
60 if !line.ends_with('\n') {
61 line.push('\n');
62 }
63 if writer.write_all(line.as_bytes()).await.is_err() {
64 break;
65 }
66 }
67 });
68
69 Self {
70 reader: BufReader::new(read_half),
71 writer_tx,
72 peer_label,
73 }
74 }
75}
76
77#[async_trait]
78impl RpcTransport for LocalTransport {
79 fn writer(&self) -> mpsc::Sender<String> {
80 self.writer_tx.clone()
81 }
82
83 async fn next_frame(&mut self) -> Option<String> {
84 let mut line = String::new();
85 match self.reader.read_line(&mut line).await {
86 Ok(0) => None,
87 Ok(_) => Some(line),
88 Err(_) => None,
89 }
90 }
91
92 fn peer_label(&self) -> String {
93 self.peer_label.clone()
94 }
95}
96
97pub async fn run_local_listener(
104 ctx: Arc<RpcContext>,
105 cancel: CancellationToken,
106 client_count: Arc<AtomicUsize>,
107) -> Result<()> {
108 let path = {
109 let config = ctx.config.read();
110 socket_path(&config)
111 };
112
113 platform::prepare_parent(&path).await?;
114 platform::remove_stale(&path).await?;
115
116 let mut listener = platform::bind(&path).context("binding local IPC endpoint")?;
117
118 platform::secure_endpoint(&path).await;
119
120 ::zeroclaw_log::record!(
121 INFO,
122 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
123 .with_attrs(::serde_json::json!({"path": path.display().to_string()})),
124 "RPC local IPC listening"
125 );
126
127 loop {
128 tokio::select! {
129 _ = cancel.cancelled() => {
130 ::zeroclaw_log::record!(
131 INFO,
132 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
133 "RPC local IPC shutting down"
134 );
135 break;
136 }
137 accept = platform::accept(&mut listener, &path) => {
138 let stream = match accept {
139 Ok(v) => v,
140 Err(e) => {
141 ::zeroclaw_log::record!(
142 WARN,
143 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
144 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
145 &format!("local IPC accept error: {e}")
146 );
147 continue;
148 }
149 };
150
151 let ctx = ctx.clone();
152 let count = client_count.clone();
153
154 count.fetch_add(1, Ordering::Relaxed);
155
156 zeroclaw_spawn::spawn!(async move {
157 let mut transport = LocalTransport::new(stream);
158 let peer = transport.peer_label();
159 let writer_tx = transport.writer();
160 let mut dispatcher = RpcDispatcher::new(ctx.clone(), writer_tx, peer);
161 dispatcher.run(&mut transport).await;
162
163 if let Some(tui_id) = dispatcher.tui_id() {
164 ctx.tui_registry.unregister(tui_id);
165 use ::zeroclaw_log::Instrument as _;
166 let span = ::zeroclaw_log::info_span!(
167 target: "zeroclaw_log_internal_scope",
168 "zeroclaw_scope",
169 owner_tui_id = %tui_id,
170 channel = "rpc",
171 );
172 async {
173 ::zeroclaw_log::record!(
174 INFO,
175 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
176 .with_category(::zeroclaw_log::EventCategory::Agent),
177 "TUI disconnected; sessions retained (persistent)"
178 );
179 }
180 .instrument(span)
181 .await;
182 }
183
184 count.fetch_sub(1, Ordering::Relaxed);
185 });
186 }
187 }
188 }
189
190 platform::cleanup(&path).await;
191 Ok(())
192}
193
194#[cfg(unix)]
197mod platform {
198 use anyhow::{Context, Result};
199 use std::path::{Path, PathBuf};
200 use tokio::net::{UnixListener, UnixStream};
201
202 pub type LocalListener = UnixListener;
203 pub type LocalStream = UnixStream;
204
205 pub fn default_endpoint(data_dir: &Path) -> PathBuf {
206 data_dir.join("daemon.sock")
207 }
208
209 pub async fn prepare_parent(path: &Path) -> Result<()> {
210 if let Some(parent) = path.parent() {
211 tokio::fs::create_dir_all(parent).await?;
212 use std::os::unix::fs::PermissionsExt;
213 tokio::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700))
214 .await
215 .ok();
216 }
217 Ok(())
218 }
219
220 pub async fn remove_stale(path: &Path) -> Result<()> {
221 if path.exists() {
222 tokio::fs::remove_file(path)
223 .await
224 .context("removing stale socket")?;
225 }
226 Ok(())
227 }
228
229 pub fn bind(path: &Path) -> Result<LocalListener> {
230 UnixListener::bind(path).context("binding unix socket")
231 }
232
233 pub async fn secure_endpoint(path: &Path) {
234 use std::os::unix::fs::PermissionsExt;
235 tokio::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))
236 .await
237 .ok();
238 }
239
240 pub async fn accept(listener: &mut LocalListener, _path: &Path) -> Result<LocalStream> {
241 let (stream, _addr) = listener
242 .accept()
243 .await
244 .context("accepting local connection")?;
245 Ok(stream)
246 }
247
248 pub async fn cleanup(path: &Path) {
249 tokio::fs::remove_file(path).await.ok();
250 }
251
252 pub fn peer_label_from(stream: &LocalStream) -> String {
253 #[cfg(target_os = "linux")]
254 {
255 if let Ok(cred) = stream.peer_cred() {
256 return format!("unix:pid={},uid={}", cred.pid().unwrap_or(0), cred.uid());
257 }
258 }
259 let _ = stream;
260 "unix:unknown".to_string()
261 }
262}
263
264#[cfg(windows)]
265mod platform {
266 use anyhow::{Context, Result};
267 use std::path::{Path, PathBuf};
268 use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
269
270 pub type LocalListener = NamedPipeServer;
274 pub type LocalStream = NamedPipeServer;
275
276 pub fn default_endpoint(data_dir: &Path) -> PathBuf {
277 use std::collections::hash_map::DefaultHasher;
278 use std::hash::{Hash, Hasher};
279 let mut hasher = DefaultHasher::new();
280 data_dir.hash(&mut hasher);
281 PathBuf::from(format!(r"\\.\pipe\zeroclaw-{:x}", hasher.finish()))
282 }
283
284 pub async fn prepare_parent(_path: &Path) -> Result<()> {
285 Ok(())
288 }
289
290 pub async fn remove_stale(_path: &Path) -> Result<()> {
291 Ok(())
294 }
295
296 pub fn bind(path: &Path) -> Result<LocalListener> {
297 let name = path_to_pipe_name(path);
298 ServerOptions::new()
299 .first_pipe_instance(true)
300 .create(&name)
301 .with_context(|| format!("creating named pipe {name}"))
302 }
303
304 pub async fn secure_endpoint(_path: &Path) {
305 }
309
310 pub async fn accept(listener: &mut LocalListener, path: &Path) -> Result<LocalStream> {
311 listener
312 .connect()
313 .await
314 .context("awaiting named-pipe client")?;
315 let next = ServerOptions::new()
318 .create(path_to_pipe_name(path))
319 .context("creating next named-pipe instance")?;
320 let connected = std::mem::replace(listener, next);
321 Ok(connected)
322 }
323
324 pub async fn cleanup(_path: &Path) {
325 }
327
328 pub fn peer_label_from(_stream: &LocalStream) -> String {
329 "pipe:local".to_string()
330 }
331
332 fn path_to_pipe_name(path: &Path) -> String {
333 path.to_string_lossy().into_owned()
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::rpc::dispatch::Method;
341 use crate::rpc::session::SessionStore;
342 use crate::rpc::types::InitializeParams;
343 #[cfg(unix)]
344 use crate::rpc::types::StatusResult;
345 use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
346 use zeroclaw_api::jsonrpc::{JSONRPC_VERSION, JsonRpcRequest};
347 use zeroclaw_infra::session_queue::SessionActorQueue;
348
349 fn test_ctx(tmp: &std::path::Path) -> Arc<RpcContext> {
350 let config = Config {
351 data_dir: tmp.to_path_buf(),
352 config_path: tmp.join("config.toml"),
353 ..Config::default()
354 };
355 let session_queue = Arc::new(SessionActorQueue::new(4, 10, 60));
356 let sessions = Arc::new(SessionStore::new(64, session_queue));
357 RpcContext::minimal(config, sessions)
358 }
359
360 fn test_client_count() -> Arc<AtomicUsize> {
361 Arc::new(AtomicUsize::new(0))
362 }
363
364 fn rpc_request<T: serde::Serialize>(method: Method, params: &T, id: u64) -> String {
365 let req = JsonRpcRequest::new(
366 method.wire_name(),
367 serde_json::to_value(params).unwrap(),
368 serde_json::Value::Number(id.into()),
369 );
370 let mut s = serde_json::to_string(&req).unwrap();
371 s.push('\n');
372 s
373 }
374
375 #[cfg(unix)]
376 async fn read_result<T: serde::de::DeserializeOwned>(
377 reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
378 ) -> (serde_json::Value, T) {
379 let mut line = String::new();
380 reader.read_line(&mut line).await.unwrap();
381 let frame: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
382 assert!(frame["error"].is_null(), "unexpected RPC error: {frame}");
383 let result: T = serde_json::from_value(frame["result"].clone()).unwrap();
384 (frame, result)
385 }
386
387 #[cfg(unix)]
388 async fn do_initialize(
389 sock_path: &std::path::Path,
390 ) -> (
391 tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
392 tokio::net::unix::OwnedWriteHalf,
393 ) {
394 let stream = tokio::net::UnixStream::connect(sock_path).await.unwrap();
395 let (read_half, mut writer) = stream.into_split();
396 let mut reader = tokio::io::BufReader::new(read_half);
397
398 let params = InitializeParams {
399 protocol_version: 1,
400 tui_id: None,
401 tui_sig: None,
402 env: Default::default(),
403 };
404 writer
405 .write_all(rpc_request(Method::Initialize, ¶ms, 1).as_bytes())
406 .await
407 .unwrap();
408
409 let (_frame, _result): (_, serde_json::Value) = read_result(&mut reader).await;
410 (reader, writer)
411 }
412
413 #[cfg(unix)]
414 async fn wait_for_socket(path: &std::path::Path) {
415 for _ in 0..50 {
416 if path.exists() {
417 return;
418 }
419 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
420 }
421 panic!("socket never appeared at {}", path.display());
422 }
423
424 #[cfg(unix)]
425 #[tokio::test]
426 async fn socket_initialize_handshake() {
427 let tmp = tempfile::tempdir().unwrap();
428 let ctx = test_ctx(tmp.path());
429 let sock_path = ctx.config.read().data_dir.join("daemon.sock");
430 let cancel = CancellationToken::new();
431
432 let server_cancel = cancel.clone();
433 let server_ctx = ctx.clone();
434 let handle = zeroclaw_spawn::spawn!(async move {
435 run_local_listener(server_ctx, server_cancel, test_client_count()).await
436 });
437
438 wait_for_socket(&sock_path).await;
439
440 let stream = tokio::net::UnixStream::connect(&sock_path).await.unwrap();
441 let (read_half, mut writer) = stream.into_split();
442 let mut reader = tokio::io::BufReader::new(read_half);
443
444 let init_params = InitializeParams {
445 protocol_version: 1,
446 tui_id: None,
447 tui_sig: None,
448 env: Default::default(),
449 };
450 writer
451 .write_all(rpc_request(Method::Initialize, &init_params, 1).as_bytes())
452 .await
453 .unwrap();
454
455 let (frame, init_result): (_, crate::rpc::types::InitializeResult) =
456 read_result(&mut reader).await;
457
458 assert_eq!(frame["jsonrpc"], JSONRPC_VERSION);
459 assert_eq!(frame["id"], 1);
460 assert_eq!(init_result.protocol_version, 1);
461 assert!(!init_result.server_version.is_empty());
462
463 writer
464 .write_all(rpc_request(Method::Status, &serde_json::json!({}), 2).as_bytes())
465 .await
466 .unwrap();
467
468 let (_frame2, status): (_, StatusResult) = read_result(&mut reader).await;
469 assert_eq!(status.active_sessions, 0);
470
471 cancel.cancel();
472 drop(writer);
473 let _ = handle.await;
474 }
475
476 #[cfg(unix)]
477 #[tokio::test]
478 async fn socket_rejects_before_initialize() {
479 let tmp = tempfile::tempdir().unwrap();
480 let ctx = test_ctx(tmp.path());
481 let sock_path = ctx.config.read().data_dir.join("daemon.sock");
482 let cancel = CancellationToken::new();
483
484 let server_cancel = cancel.clone();
485 let server_ctx = ctx.clone();
486 zeroclaw_spawn::spawn!(async move {
487 let _ = run_local_listener(server_ctx, server_cancel, test_client_count()).await;
488 });
489
490 wait_for_socket(&sock_path).await;
491
492 let stream = tokio::net::UnixStream::connect(&sock_path).await.unwrap();
493 let (reader, mut writer) = stream.into_split();
494 let mut reader = tokio::io::BufReader::new(reader);
495
496 writer
497 .write_all(rpc_request(Method::Status, &serde_json::json!({}), 1).as_bytes())
498 .await
499 .unwrap();
500
501 let mut line = String::new();
502 reader.read_line(&mut line).await.unwrap();
503 let resp: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
504
505 assert!(resp["error"].is_object());
506 assert_eq!(
507 resp["error"]["code"],
508 zeroclaw_api::jsonrpc::error_codes::AUTH_REQUIRED
509 );
510
511 cancel.cancel();
512 }
513
514 #[cfg(unix)]
515 #[tokio::test]
516 async fn socket_permissions() {
517 let tmp = tempfile::tempdir().unwrap();
518 let ctx = test_ctx(tmp.path());
519 let sock_path = ctx.config.read().data_dir.join("daemon.sock");
520 let cancel = CancellationToken::new();
521
522 let server_cancel = cancel.clone();
523 let server_ctx = ctx.clone();
524 zeroclaw_spawn::spawn!(async move {
525 let _ = run_local_listener(server_ctx, server_cancel, test_client_count()).await;
526 });
527
528 wait_for_socket(&sock_path).await;
529
530 use std::os::unix::fs::PermissionsExt;
531 let meta = std::fs::metadata(&sock_path).unwrap();
532 let mode = meta.permissions().mode() & 0o777;
533 assert_eq!(
534 mode, 0o600,
535 "socket should be owner-only (0o600), got {mode:#o}"
536 );
537
538 cancel.cancel();
539 }
540
541 #[cfg(unix)]
542 #[tokio::test]
543 async fn stale_socket_cleanup() {
544 let tmp = tempfile::tempdir().unwrap();
545 let ctx = test_ctx(tmp.path());
546 let sock_path = ctx.config.read().data_dir.join("daemon.sock");
547
548 std::fs::create_dir_all(tmp.path()).unwrap();
549 std::fs::write(&sock_path, b"stale").unwrap();
550 assert!(sock_path.exists());
551
552 let cancel = CancellationToken::new();
553 let server_cancel = cancel.clone();
554 let server_ctx = ctx.clone();
555 zeroclaw_spawn::spawn!(async move {
556 let _ = run_local_listener(server_ctx, server_cancel, test_client_count()).await;
557 });
558
559 for _ in 0..50 {
560 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
561 if tokio::net::UnixStream::connect(&sock_path).await.is_ok() {
562 break;
563 }
564 }
565
566 let stream = tokio::net::UnixStream::connect(&sock_path).await;
567 assert!(
568 stream.is_ok(),
569 "should be able to connect after stale cleanup"
570 );
571
572 cancel.cancel();
573 }
574
575 #[cfg(unix)]
576 #[tokio::test]
577 async fn session_approve_resolves_pending_approval() {
578 let tmp = tempfile::tempdir().unwrap();
579 let ctx = test_ctx(tmp.path());
580 let sock_path = ctx.config.read().data_dir.join("daemon.sock");
581 let cancel = CancellationToken::new();
582
583 let server_cancel = cancel.clone();
584 let server_ctx = ctx.clone();
585 zeroclaw_spawn::spawn!(async move {
586 let _ = run_local_listener(server_ctx, server_cancel, test_client_count()).await;
587 });
588 wait_for_socket(&sock_path).await;
589
590 let (mut reader, mut writer) = do_initialize(&sock_path).await;
591
592 let (pending_tx, mut pending_rx) =
593 tokio::sync::oneshot::channel::<zeroclaw_api::channel::ChannelApprovalResponse>();
594 ctx.approval_pending
595 .insert("test-req-1".to_string(), pending_tx);
596
597 let approve_params = serde_json::json!({
598 "session_id": "unused",
599 "request_id": "test-req-1",
600 "decision": "allow_once",
601 });
602 writer
603 .write_all(rpc_request(Method::SessionApprove, &approve_params, 10).as_bytes())
604 .await
605 .unwrap();
606
607 let (_frame, result): (_, serde_json::Value) = read_result(&mut reader).await;
608 assert_eq!(result["acknowledged"], true);
609
610 let decision = pending_rx.try_recv().expect("decision should be resolved");
611 assert_eq!(
612 decision,
613 zeroclaw_api::channel::ChannelApprovalResponse::Approve
614 );
615
616 cancel.cancel();
617 }
618
619 #[cfg(unix)]
620 #[tokio::test]
621 async fn client_count_tracks_connections() {
622 let tmp = tempfile::tempdir().unwrap();
623 let ctx = test_ctx(tmp.path());
624 let sock_path = ctx.config.read().data_dir.join("daemon.sock");
625 let cancel = CancellationToken::new();
626 let count = Arc::new(AtomicUsize::new(0));
627
628 let server_cancel = cancel.clone();
629 let server_ctx = ctx.clone();
630 let server_count = count.clone();
631 zeroclaw_spawn::spawn!(async move {
632 let _ = run_local_listener(server_ctx, server_cancel, server_count).await;
633 });
634
635 wait_for_socket(&sock_path).await;
636
637 assert_eq!(count.load(Ordering::Relaxed), 0);
638
639 let s1 = tokio::net::UnixStream::connect(&sock_path).await.unwrap();
640 let s2 = tokio::net::UnixStream::connect(&sock_path).await.unwrap();
641 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
642 assert_eq!(count.load(Ordering::Relaxed), 2);
643
644 drop(s1);
645 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
646 assert_eq!(count.load(Ordering::Relaxed), 1);
647
648 drop(s2);
649 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
650 assert_eq!(count.load(Ordering::Relaxed), 0);
651
652 cancel.cancel();
653 }
654
655 #[cfg(windows)]
656 #[tokio::test]
657 async fn pipe_initialize_handshake() {
658 use tokio::net::windows::named_pipe::ClientOptions;
659 use tokio::time::{Duration, sleep};
660
661 let tmp = tempfile::tempdir().unwrap();
662 let ctx = test_ctx(tmp.path());
663 let pipe_path = socket_path(&ctx.config.read());
664 let cancel = CancellationToken::new();
665
666 let server_cancel = cancel.clone();
667 let server_ctx = ctx.clone();
668 let handle = zeroclaw_spawn::spawn!(async move {
669 run_local_listener(server_ctx, server_cancel, test_client_count()).await
670 });
671
672 let pipe_name = pipe_path.to_string_lossy().into_owned();
674 let mut client = None;
675 for _ in 0..50 {
676 match ClientOptions::new().open(&pipe_name) {
677 Ok(c) => {
678 client = Some(c);
679 break;
680 }
681 Err(_) => sleep(Duration::from_millis(20)).await,
682 }
683 }
684 let mut client = client.expect("named pipe never accepted a client");
685 let (read_half, mut write_half) = tokio::io::split(&mut client);
686 let mut reader = tokio::io::BufReader::new(read_half);
687
688 let init_params = InitializeParams {
689 protocol_version: 1,
690 tui_id: None,
691 tui_sig: None,
692 env: Default::default(),
693 };
694 write_half
695 .write_all(rpc_request(Method::Initialize, &init_params, 1).as_bytes())
696 .await
697 .unwrap();
698
699 let mut line = String::new();
700 reader.read_line(&mut line).await.unwrap();
701 let frame: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
702 assert!(frame["error"].is_null(), "unexpected RPC error: {frame}");
703 assert_eq!(frame["jsonrpc"], JSONRPC_VERSION);
704 assert_eq!(frame["id"], 1);
705
706 cancel.cancel();
707 drop(write_half);
708 drop(reader);
709 drop(client);
710 let _ = handle.await;
711 }
712}