1use async_trait::async_trait;
22use serde_json::json;
23use std::sync::Arc;
24use std::time::Duration;
25use zeroclaw_api::channel::{
26 Channel, ChannelApprovalRequest, ChannelApprovalResponse, ChannelMessage, SendMessage,
27};
28
29use crate::orchestrator::acp_server::RpcOutbound;
30
31pub struct AcpChannel {
34 name: String,
35 session_id: String,
36 rpc: Arc<RpcOutbound>,
37 approval_timeout: Duration,
42}
43
44impl AcpChannel {
45 pub fn new(
52 name: impl Into<String>,
53 session_id: impl Into<String>,
54 rpc: Arc<RpcOutbound>,
55 approval_timeout: Duration,
56 ) -> Self {
57 Self {
58 name: name.into(),
59 session_id: session_id.into(),
60 rpc,
61 approval_timeout,
62 }
63 }
64}
65
66impl ::zeroclaw_api::attribution::Attributable for AcpChannel {
67 fn role(&self) -> ::zeroclaw_api::attribution::Role {
68 ::zeroclaw_api::attribution::Role::Channel(
69 ::zeroclaw_api::attribution::ChannelKind::AcpChannel,
70 )
71 }
72 fn alias(&self) -> &str {
73 &self.name
74 }
75}
76
77fn map_approval_kind(tool_name: &str) -> &'static str {
81 match tool_name {
82 "file_edit" | "file_write" => "edit",
83 _ => "execute",
84 }
85}
86
87fn build_approval_raw_input(
93 tool_name: &str,
94 raw_arguments: &Option<serde_json::Value>,
95) -> serde_json::Value {
96 if let Some(args) = raw_arguments {
97 match tool_name {
98 "file_edit" => {
99 let path = args.get("path").cloned().unwrap_or(serde_json::Value::Null);
100 let old_text = args
101 .get("old_string")
102 .cloned()
103 .unwrap_or(serde_json::Value::Null);
104 let new_text = args
105 .get("new_string")
106 .cloned()
107 .unwrap_or(serde_json::Value::Null);
108 return json!({ "path": path, "oldText": old_text, "newText": new_text });
109 }
110 "file_write" => {
111 let path = args.get("path").cloned().unwrap_or(serde_json::Value::Null);
112 let new_text = args
113 .get("content")
114 .cloned()
115 .unwrap_or(serde_json::Value::Null);
116 return json!({ "path": path, "newText": new_text });
117 }
118 _ => {}
119 }
120 }
121 json!({ "tool": tool_name })
122}
123
124fn build_approval_content(
133 tool_name: &str,
134 raw_arguments: &Option<serde_json::Value>,
135 fallback_summary: &str,
136) -> serde_json::Value {
137 if let Some(args) = raw_arguments {
138 match tool_name {
139 "file_edit" => {
140 let path = args.get("path").cloned().unwrap_or(serde_json::Value::Null);
141 let old_text = args
142 .get("old_string")
143 .cloned()
144 .unwrap_or(serde_json::Value::Null);
145 let new_text = args
146 .get("new_string")
147 .cloned()
148 .unwrap_or(serde_json::Value::Null);
149 return json!([{
150 "type": "diff",
151 "path": path,
152 "oldText": old_text,
153 "newText": new_text,
154 }]);
155 }
156 "file_write" => {
157 let path = args.get("path").cloned().unwrap_or(serde_json::Value::Null);
158 let new_text = args
159 .get("content")
160 .cloned()
161 .unwrap_or(serde_json::Value::Null);
162 return json!([{
163 "type": "diff",
164 "path": path,
165 "newText": new_text,
166 }]);
167 }
168 _ => {}
169 }
170 }
171 json!([{
172 "type": "content",
173 "content": {
174 "type": "text",
175 "text": fallback_summary,
176 }
177 }])
178}
179
180#[async_trait]
181impl Channel for AcpChannel {
182 fn name(&self) -> &str {
183 &self.name
184 }
185
186 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
187 self.rpc
191 .notify(
192 "session/update",
193 json!({
194 "sessionId": self.session_id,
195 "update": {
196 "sessionUpdate": "agent_message_chunk",
197 "content": {
198 "type": "text",
199 "text": message.content,
200 }
201 }
202 }),
203 )
204 .await;
205 Ok(())
206 }
207
208 async fn listen(&self, _tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
209 anyhow::bail!(
215 "AcpChannel.listen is not supported (free-form ask_user awaits ACP elicitation RFD)"
216 )
217 }
218
219 fn supports_free_form_ask(&self) -> bool {
220 false
221 }
222
223 async fn add_reaction(
224 &self,
225 _channel_id: &str,
226 _message_id: &str,
227 _emoji: &str,
228 ) -> anyhow::Result<()> {
229 anyhow::bail!("AcpChannel does not support reactions")
234 }
235
236 async fn remove_reaction(
237 &self,
238 _channel_id: &str,
239 _message_id: &str,
240 _emoji: &str,
241 ) -> anyhow::Result<()> {
242 anyhow::bail!("AcpChannel does not support reactions")
243 }
244
245 async fn request_choice(
246 &self,
247 question: &str,
248 choices: &[String],
249 timeout: Duration,
250 ) -> anyhow::Result<Option<String>> {
251 if choices.is_empty() {
252 anyhow::bail!("AcpChannel.request_choice requires at least one choice")
256 }
257
258 let mut options = Vec::with_capacity(choices.len());
263 for (i, choice) in choices.iter().enumerate() {
264 let kind = if i == choices.len() - 1 && choices.len() > 1 {
265 "reject_once"
266 } else {
267 "allow_once"
268 };
269 options.push(json!({
270 "optionId": format!("choice-{i}"),
271 "name": choice,
272 "kind": kind,
273 }));
274 }
275
276 let params = json!({
277 "sessionId": self.session_id,
278 "options": options,
279 "toolCall": {
283 "toolCallId": format!("ask-user-{}", uuid::Uuid::new_v4()),
284 "title": question,
285 "kind": "other",
286 "status": "pending",
287 }
288 });
289
290 let call = self.rpc.request("session/request_permission", params);
291 let response = match tokio::time::timeout(timeout, call).await {
292 Ok(Ok(value)) => value,
293 Ok(Err(e)) => {
294 anyhow::bail!("ACP request_permission failed: {} ({})", e.message, e.code)
295 }
296 Err(_) => anyhow::bail!("ACP request_permission timed out after {timeout:?}"),
297 };
298
299 let outcome = response.get("outcome");
301 let kind = outcome
302 .and_then(|o| o.get("outcome"))
303 .and_then(|s| s.as_str())
304 .unwrap_or("");
305 match kind {
306 "selected" => {
307 let option_id = outcome
308 .and_then(|o| o.get("optionId"))
309 .and_then(|s| s.as_str())
310 .unwrap_or("");
311 let idx = option_id
312 .strip_prefix("choice-")
313 .and_then(|s| s.parse::<usize>().ok());
314 match idx.and_then(|i| choices.get(i)) {
315 Some(text) => Ok(Some(text.clone())),
316 None => anyhow::bail!("ACP returned unknown optionId: {option_id}"),
317 }
318 }
319 "cancelled" => Ok(None),
320 other => anyhow::bail!("ACP returned unexpected outcome: {other}"),
321 }
322 }
323
324 async fn request_approval(
325 &self,
326 _recipient: &str,
327 request: &ChannelApprovalRequest,
328 ) -> anyhow::Result<Option<ChannelApprovalResponse>> {
329 let options = [
330 json!({
331 "optionId": "allow-once",
332 "name": "Allow once",
333 "kind": "allow_once",
334 }),
335 json!({
336 "optionId": "allow-always",
337 "name": "Always allow",
338 "kind": "allow_always",
339 }),
340 json!({
341 "optionId": "reject-once",
342 "name": "Reject",
343 "kind": "reject_once",
344 }),
345 ];
346
347 let tool_call_id = format!("approval-{}", uuid::Uuid::new_v4());
348 let title = format!("Approve {}?", request.tool_name);
349 let kind = map_approval_kind(&request.tool_name);
350 let raw_input = build_approval_raw_input(&request.tool_name, &request.raw_arguments);
351 let content = build_approval_content(
352 &request.tool_name,
353 &request.raw_arguments,
354 &request.arguments_summary,
355 );
356 let params = json!({
357 "sessionId": self.session_id,
358 "options": options,
359 "toolCall": {
360 "toolCallId": tool_call_id,
361 "title": title,
362 "kind": kind,
363 "status": "pending",
364 "rawInput": raw_input,
365 "content": content,
366 }
367 });
368
369 let call = self.rpc.request("session/request_permission", params);
370 let response = match tokio::time::timeout(self.approval_timeout, call).await {
371 Ok(Ok(value)) => value,
372 Ok(Err(e)) => {
373 anyhow::bail!("ACP request_permission failed: {} ({})", e.message, e.code)
374 }
375 Err(_) => anyhow::bail!(
376 "ACP request_permission timed out after {:?}",
377 self.approval_timeout
378 ),
379 };
380
381 let outcome = response.get("outcome");
382 let kind = outcome
383 .and_then(|o| o.get("outcome"))
384 .and_then(|s| s.as_str())
385 .unwrap_or("");
386 match kind {
387 "selected" => {
388 let option_id = outcome
389 .and_then(|o| o.get("optionId"))
390 .and_then(|s| s.as_str())
391 .unwrap_or("");
392 match option_id {
393 "allow-once" => Ok(Some(ChannelApprovalResponse::Approve)),
394 "allow-always" => Ok(Some(ChannelApprovalResponse::AlwaysApprove)),
395 "reject-once" | "reject-always" => Ok(Some(ChannelApprovalResponse::Deny)),
396 other => anyhow::bail!("ACP returned unknown permission optionId: {other}"),
397 }
398 }
399 "cancelled" => Ok(Some(ChannelApprovalResponse::Deny)),
400 other => anyhow::bail!("ACP returned unexpected permission outcome: {other}"),
401 }
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use tokio::sync::mpsc;
409
410 fn make_rpc() -> (Arc<RpcOutbound>, mpsc::Receiver<String>) {
411 let (tx, rx) = mpsc::channel::<String>(16);
415 (Arc::new(RpcOutbound::for_testing(tx)), rx)
416 }
417
418 #[tokio::test]
419 async fn name_returns_provided_name() {
420 let (rpc, _rx) = make_rpc();
421 let ch = AcpChannel::new("acp", "sess-1", rpc, Duration::from_secs(30));
422 assert_eq!(ch.name(), "acp");
423 }
424
425 #[tokio::test]
426 async fn supports_free_form_ask_is_false() {
427 let (rpc, _rx) = make_rpc();
428 let ch = AcpChannel::new("acp", "sess-1", rpc, Duration::from_secs(30));
429 assert!(!ch.supports_free_form_ask());
430 }
431
432 #[tokio::test]
433 async fn send_emits_agent_message_chunk_notification() {
434 let (rpc, mut rx) = make_rpc();
435 let ch = AcpChannel::new("acp", "sess-1", rpc, Duration::from_secs(30));
436
437 ch.send(&SendMessage::new("hello", "")).await.unwrap();
438
439 let line = rx.recv().await.unwrap();
440 let v: serde_json::Value = serde_json::from_str(&line).unwrap();
441 assert_eq!(v["jsonrpc"], "2.0");
442 assert_eq!(v["method"], "session/update");
443 assert_eq!(v["params"]["sessionId"], "sess-1");
444 assert_eq!(
445 v["params"]["update"]["sessionUpdate"],
446 "agent_message_chunk"
447 );
448 assert_eq!(v["params"]["update"]["content"]["text"], "hello");
449 assert!(v.get("id").is_none());
451 }
452
453 #[tokio::test]
454 async fn add_reaction_returns_error() {
455 let (rpc, _rx) = make_rpc();
456 let ch = AcpChannel::new("acp", "sess-1", rpc, Duration::from_secs(30));
457 let res = ch.add_reaction("chan", "msg", "👍").await;
458 assert!(res.is_err());
459 }
460
461 #[tokio::test]
462 async fn remove_reaction_returns_error() {
463 let (rpc, _rx) = make_rpc();
464 let ch = AcpChannel::new("acp", "sess-1", rpc, Duration::from_secs(30));
465 let res = ch.remove_reaction("chan", "msg", "👍").await;
466 assert!(res.is_err());
467 }
468
469 #[tokio::test]
470 async fn listen_returns_error() {
471 let (rpc, _rx) = make_rpc();
472 let ch = AcpChannel::new("acp", "sess-1", rpc, Duration::from_secs(30));
473 let (tx, _) = mpsc::channel(1);
474 let res = ch.listen(tx).await;
475 assert!(res.is_err());
476 }
477
478 #[tokio::test]
479 async fn request_choice_rejects_empty_choices() {
480 let (rpc, _rx) = make_rpc();
481 let ch = AcpChannel::new("acp", "sess-1", rpc, Duration::from_secs(30));
482 let res = ch
483 .request_choice("Pick one", &[], Duration::from_secs(1))
484 .await;
485 assert!(res.is_err());
486 }
487
488 #[tokio::test]
489 async fn request_choice_emits_request_permission_and_resolves_selection() {
490 let (rpc, mut rx) = make_rpc();
491 let rpc_for_resp = Arc::clone(&rpc);
492 let ch = AcpChannel::new("acp", "sess-1", Arc::clone(&rpc), Duration::from_secs(30));
493
494 let choices = vec![
495 "Option A".to_string(),
496 "Option B".to_string(),
497 "Cancel".to_string(),
498 ];
499
500 let task = tokio::spawn(async move {
503 ch.request_choice("Confirm?", &choices, Duration::from_secs(5))
504 .await
505 });
506
507 let line = rx.recv().await.unwrap();
508 let req: serde_json::Value = serde_json::from_str(&line).unwrap();
509 assert_eq!(req["method"], "session/request_permission");
510 assert_eq!(req["params"]["options"].as_array().unwrap().len(), 3);
511 assert_eq!(req["params"]["options"][0]["name"], "Option A");
512 assert_eq!(req["params"]["options"][2]["kind"], "reject_once");
513 let id = req["id"].as_str().unwrap().to_string();
514
515 rpc_for_resp.dispatch_response_for_test(
517 &id,
518 Some(json!({"outcome": {"outcome": "selected", "optionId": "choice-1"}})),
519 None,
520 );
521
522 let result = task.await.unwrap().unwrap();
523 assert_eq!(result, Some("Option B".to_string()));
524 }
525
526 #[tokio::test]
527 async fn request_choice_handles_cancel_outcome() {
528 let (rpc, mut rx) = make_rpc();
529 let rpc_for_resp = Arc::clone(&rpc);
530 let ch = AcpChannel::new("acp", "sess-1", Arc::clone(&rpc), Duration::from_secs(30));
531
532 let choices = vec!["Yes".to_string(), "No".to_string()];
533
534 let task = tokio::spawn(async move {
535 ch.request_choice("Confirm?", &choices, Duration::from_secs(5))
536 .await
537 });
538
539 let line = rx.recv().await.unwrap();
540 let req: serde_json::Value = serde_json::from_str(&line).unwrap();
541 let id = req["id"].as_str().unwrap().to_string();
542
543 rpc_for_resp.dispatch_response_for_test(
544 &id,
545 Some(json!({"outcome": {"outcome": "cancelled"}})),
546 None,
547 );
548
549 let result = task.await.unwrap().unwrap();
550 assert_eq!(result, None);
551 }
552
553 #[tokio::test]
554 async fn request_choice_times_out_when_no_response() {
555 let (rpc, _rx) = make_rpc();
556 let ch = AcpChannel::new("acp", "sess-1", rpc, Duration::from_secs(30));
557 let choices = vec!["Yes".to_string(), "No".to_string()];
558 let res = ch
559 .request_choice("Confirm?", &choices, Duration::from_millis(50))
560 .await;
561 assert!(res.is_err());
562 let msg = format!("{}", res.unwrap_err());
563 assert!(msg.contains("timed out"), "unexpected error: {msg}");
564 }
565
566 #[tokio::test]
567 async fn request_approval_emits_request_permission_and_resolves_approve() {
568 let (rpc, mut rx) = make_rpc();
569 let rpc_for_resp = Arc::clone(&rpc);
570 let ch = AcpChannel::new("acp", "sess-1", Arc::clone(&rpc), Duration::from_secs(30));
571 let request = ChannelApprovalRequest {
572 tool_name: "git".to_string(),
573 arguments_summary: "git status --short".to_string(),
574 raw_arguments: None,
575 };
576
577 let task = tokio::spawn(async move { ch.request_approval("", &request).await });
578
579 let line = rx.recv().await.unwrap();
580 let req: serde_json::Value = serde_json::from_str(&line).unwrap();
581 assert_eq!(req["method"], "session/request_permission");
582 assert_eq!(req["params"]["sessionId"], "sess-1");
583 assert_eq!(req["params"]["options"].as_array().unwrap().len(), 3);
584 assert_eq!(req["params"]["options"][0]["optionId"], "allow-once");
585 assert_eq!(req["params"]["options"][1]["kind"], "allow_always");
586 assert_eq!(req["params"]["toolCall"]["title"], "Approve git?");
587 assert_eq!(req["params"]["toolCall"]["status"], "pending");
588 assert_eq!(
589 req["params"]["toolCall"]["content"][0]["content"]["text"],
590 "git status --short"
591 );
592 let id = req["id"].as_str().unwrap().to_string();
593
594 rpc_for_resp.dispatch_response_for_test(
595 &id,
596 Some(json!({"outcome": {"outcome": "selected", "optionId": "allow-once"}})),
597 None,
598 );
599
600 let result = task.await.unwrap().unwrap();
601 assert_eq!(result, Some(ChannelApprovalResponse::Approve));
602 }
603
604 #[tokio::test]
605 async fn request_approval_maps_always_and_cancel() {
606 let (rpc, mut rx) = make_rpc();
607 let rpc_for_resp = Arc::clone(&rpc);
608 let ch = AcpChannel::new("acp", "sess-1", Arc::clone(&rpc), Duration::from_secs(30));
609 let request = ChannelApprovalRequest {
610 tool_name: "git".to_string(),
611 arguments_summary: "git commit".to_string(),
612 raw_arguments: None,
613 };
614
615 let task = tokio::spawn(async move { ch.request_approval("", &request).await });
616 let line = rx.recv().await.unwrap();
617 let req: serde_json::Value = serde_json::from_str(&line).unwrap();
618 let id = req["id"].as_str().unwrap().to_string();
619
620 rpc_for_resp.dispatch_response_for_test(
621 &id,
622 Some(json!({"outcome": {"outcome": "selected", "optionId": "allow-always"}})),
623 None,
624 );
625 assert_eq!(
626 task.await.unwrap().unwrap(),
627 Some(ChannelApprovalResponse::AlwaysApprove)
628 );
629
630 let (rpc, mut rx) = make_rpc();
631 let rpc_for_resp = Arc::clone(&rpc);
632 let ch = AcpChannel::new("acp", "sess-1", Arc::clone(&rpc), Duration::from_secs(30));
633 let request = ChannelApprovalRequest {
634 tool_name: "git".to_string(),
635 arguments_summary: "git push".to_string(),
636 raw_arguments: None,
637 };
638 let task = tokio::spawn(async move { ch.request_approval("", &request).await });
639 let line = rx.recv().await.unwrap();
640 let req: serde_json::Value = serde_json::from_str(&line).unwrap();
641 let id = req["id"].as_str().unwrap().to_string();
642 rpc_for_resp.dispatch_response_for_test(
643 &id,
644 Some(json!({"outcome": {"outcome": "cancelled"}})),
645 None,
646 );
647 assert_eq!(
648 task.await.unwrap().unwrap(),
649 Some(ChannelApprovalResponse::Deny)
650 );
651 }
652
653 #[tokio::test]
654 async fn file_edit_approval_emits_diff_content_item() {
655 let (rpc, mut rx) = make_rpc();
656 let rpc_for_resp = Arc::clone(&rpc);
657 let ch = AcpChannel::new("acp", "sess-1", Arc::clone(&rpc), Duration::from_secs(30));
658 let request = ChannelApprovalRequest {
659 tool_name: "file_edit".to_string(),
660 arguments_summary: "old_string: let x = 1;, new_string: let x = 2;".to_string(),
661 raw_arguments: Some(serde_json::json!({
662 "path": "src/foo.rs",
663 "old_string": "let x = 1;",
664 "new_string": "let x = 2;"
665 })),
666 };
667
668 let task = tokio::spawn(async move { ch.request_approval("", &request).await });
669 let line = rx.recv().await.unwrap();
670 let req: serde_json::Value = serde_json::from_str(&line).unwrap();
671
672 assert_eq!(req["params"]["toolCall"]["kind"], "edit");
674
675 let content = &req["params"]["toolCall"]["content"];
677 assert_eq!(
678 content[0]["type"], "diff",
679 "file_edit approval must emit a diff content item"
680 );
681 assert_eq!(content[0]["path"], "src/foo.rs");
682 assert_eq!(content[0]["oldText"], "let x = 1;");
683 assert_eq!(content[0]["newText"], "let x = 2;");
684
685 let id = req["id"].as_str().unwrap().to_string();
686 rpc_for_resp.dispatch_response_for_test(
687 &id,
688 Some(json!({"outcome": {"outcome": "selected", "optionId": "allow-once"}})),
689 None,
690 );
691 assert_eq!(
692 task.await.unwrap().unwrap(),
693 Some(ChannelApprovalResponse::Approve)
694 );
695 }
696
697 #[test]
698 fn build_approval_content_returns_diff_for_file_edit() {
699 let args = serde_json::json!({
700 "path": "README.md",
701 "old_string": "# Old Title",
702 "new_string": "# New Title"
703 });
704 let content = build_approval_content("file_edit", &Some(args), "fallback");
705 let arr = content.as_array().expect("content must be an array");
706 assert_eq!(arr.len(), 1);
707 assert_eq!(arr[0]["type"], "diff");
708 assert_eq!(arr[0]["path"], "README.md");
709 assert_eq!(arr[0]["oldText"], "# Old Title");
710 assert_eq!(arr[0]["newText"], "# New Title");
711 }
712
713 #[test]
714 fn build_approval_content_falls_back_to_text_for_other_tools() {
715 let content = build_approval_content("shell", &None, "ls -la");
716 let arr = content.as_array().expect("content must be an array");
717 assert_eq!(arr[0]["type"], "content");
718 assert_eq!(arr[0]["content"]["type"], "text");
719 assert_eq!(arr[0]["content"]["text"], "ls -la");
720 }
721}