zeroclaw_gateway/ws_approval.rs
1//! WebSocket-backed [`Channel`] implementation that surfaces tool approval
2//! prompts to the gateway client and waits for the operator's decision.
3//!
4//! The agent's tool loop calls
5//! [`Channel::request_approval`](zeroclaw_api::channel::Channel::request_approval)
6//! whenever a supervised-mode tool needs operator consent. This struct mints
7//! a `request_id`, emits a [`TurnEvent::ApprovalRequest`] that the existing
8//! forward loop serialises onto the wire, and parks on a oneshot until the
9//! matching `approval_response` frame arrives.
10//!
11//! The pending-request map is shared with the connection's receive loop; on
12//! `approval_response` the loop pops the oneshot sender keyed by `request_id`
13//! and resolves the agent's pending future. If the operator does not respond
14//! within `timeout_secs` the wait yields `Deny`, matching the policy of every
15//! other channel that implements `request_approval`.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use parking_lot::Mutex;
23use tokio::sync::{mpsc, oneshot};
24use uuid::Uuid;
25use zeroclaw_api::agent::TurnEvent;
26use zeroclaw_api::channel::{
27 Channel, ChannelApprovalRequest, ChannelApprovalResponse, ChannelMessage, SendMessage,
28};
29
30/// Shared map keyed by `request_id`. Consumed by the receive loop to resolve
31/// the oneshot when an `approval_response` frame arrives.
32pub type PendingApprovals = Arc<Mutex<HashMap<String, oneshot::Sender<ChannelApprovalResponse>>>>;
33
34/// Construct an empty pending-approvals registry for a fresh connection.
35pub fn new_pending_approvals() -> PendingApprovals {
36 Arc::new(Mutex::new(HashMap::new()))
37}
38
39/// `Channel` implementation that emits approval frames over a connection's
40/// existing `event_tx` and parks on a oneshot until the matching response
41/// arrives or `timeout` elapses.
42pub struct WsApprovalChannel {
43 event_tx: mpsc::Sender<TurnEvent>,
44 pending: PendingApprovals,
45 timeout: Duration,
46}
47
48impl WsApprovalChannel {
49 pub fn new(
50 event_tx: mpsc::Sender<TurnEvent>,
51 pending: PendingApprovals,
52 timeout: Duration,
53 ) -> Self {
54 Self {
55 event_tx,
56 pending,
57 timeout,
58 }
59 }
60}
61
62impl ::zeroclaw_api::attribution::Attributable for WsApprovalChannel {
63 fn role(&self) -> ::zeroclaw_api::attribution::Role {
64 ::zeroclaw_api::attribution::Role::Channel(
65 ::zeroclaw_api::attribution::ChannelKind::Webhook,
66 )
67 }
68 fn alias(&self) -> &str {
69 "ws_approval"
70 }
71}
72
73#[async_trait]
74impl Channel for WsApprovalChannel {
75 fn name(&self) -> &str {
76 "ws"
77 }
78
79 async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
80 // The gateway WS path streams agent output via TurnEvent::Chunk /
81 // ::Thinking / ::ToolCall / ::ToolResult; it does not deliver
82 // free-form `send()` messages. Returning Ok here keeps any caller
83 // that probes for a generic delivery target from erroring out.
84 Ok(())
85 }
86
87 async fn listen(&self, _tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
88 // The gateway WS path does not act as a message source for the
89 // channel orchestrator; turns are driven directly by the WS
90 // handler loop. Listen is a no-op for this transport.
91 Ok(())
92 }
93
94 async fn request_approval(
95 &self,
96 _recipient: &str,
97 request: &ChannelApprovalRequest,
98 ) -> anyhow::Result<Option<ChannelApprovalResponse>> {
99 let request_id = Uuid::new_v4().to_string();
100 let (tx, rx) = oneshot::channel();
101 self.pending.lock().insert(request_id.clone(), tx);
102
103 let event = TurnEvent::ApprovalRequest {
104 request_id: request_id.clone(),
105 tool_name: request.tool_name.clone(),
106 arguments_summary: request.arguments_summary.clone(),
107 timeout_secs: self.timeout.as_secs(),
108 };
109 if self.event_tx.send(event).await.is_err() {
110 // Forward task has gone away; the WS is closing. Clean up the
111 // pending entry and let the agent's caller treat this the same
112 // as any other channel that returns None: fall through to
113 // auto-deny per ApprovalManager policy.
114 self.pending.lock().remove(&request_id);
115 return Ok(None);
116 }
117
118 match tokio::time::timeout(self.timeout, rx).await {
119 Ok(Ok(decision)) => Ok(Some(decision)),
120 Ok(Err(_)) => {
121 // Sender dropped without responding (connection closed
122 // mid-prompt). Treat as deny rather than None so the agent
123 // does not silently fall back to "no channel handled this".
124 self.pending.lock().remove(&request_id);
125 Ok(Some(ChannelApprovalResponse::Deny))
126 }
127 Err(_) => {
128 // Timeout: pop and deny. Mirrors Telegram / Slack behaviour
129 // when the operator does not tap a button in time.
130 self.pending.lock().remove(&request_id);
131 Ok(Some(ChannelApprovalResponse::Deny))
132 }
133 }
134 }
135}