Skip to main content

zeroclaw_gateway/
ws_approval.rs

1//! WebSocket-backed [`Channel`] implementation that surfaces tool approval
2//! prompts to the gateway client and waits for the operator's decision.
3//!
4//! The agent's tool loop calls
5//! [`Channel::request_approval`](zeroclaw_api::channel::Channel::request_approval)
6//! whenever a supervised-mode tool needs operator consent. This struct mints
7//! a `request_id`, emits a [`TurnEvent::ApprovalRequest`] that the existing
8//! forward loop serialises onto the wire, and parks on a oneshot until the
9//! matching `approval_response` frame arrives.
10//!
11//! The pending-request map is shared with the connection's receive loop; on
12//! `approval_response` the loop pops the oneshot sender keyed by `request_id`
13//! and resolves the agent's pending future. If the operator does not respond
14//! within `timeout_secs` the wait yields `Deny`, matching the policy of every
15//! other channel that implements `request_approval`.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use parking_lot::Mutex;
23use tokio::sync::{mpsc, oneshot};
24use uuid::Uuid;
25use zeroclaw_api::agent::TurnEvent;
26use zeroclaw_api::channel::{
27    Channel, ChannelApprovalRequest, ChannelApprovalResponse, ChannelMessage, SendMessage,
28};
29
30/// Shared map keyed by `request_id`. Consumed by the receive loop to resolve
31/// the oneshot when an `approval_response` frame arrives.
32pub type PendingApprovals = Arc<Mutex<HashMap<String, oneshot::Sender<ChannelApprovalResponse>>>>;
33
34/// Construct an empty pending-approvals registry for a fresh connection.
35pub fn new_pending_approvals() -> PendingApprovals {
36    Arc::new(Mutex::new(HashMap::new()))
37}
38
39/// `Channel` implementation that emits approval frames over a connection's
40/// existing `event_tx` and parks on a oneshot until the matching response
41/// arrives or `timeout` elapses.
42pub struct WsApprovalChannel {
43    event_tx: mpsc::Sender<TurnEvent>,
44    pending: PendingApprovals,
45    timeout: Duration,
46}
47
48impl WsApprovalChannel {
49    pub fn new(
50        event_tx: mpsc::Sender<TurnEvent>,
51        pending: PendingApprovals,
52        timeout: Duration,
53    ) -> Self {
54        Self {
55            event_tx,
56            pending,
57            timeout,
58        }
59    }
60}
61
62impl ::zeroclaw_api::attribution::Attributable for WsApprovalChannel {
63    fn role(&self) -> ::zeroclaw_api::attribution::Role {
64        ::zeroclaw_api::attribution::Role::Channel(
65            ::zeroclaw_api::attribution::ChannelKind::Webhook,
66        )
67    }
68    fn alias(&self) -> &str {
69        "ws_approval"
70    }
71}
72
73#[async_trait]
74impl Channel for WsApprovalChannel {
75    fn name(&self) -> &str {
76        "ws"
77    }
78
79    async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
80        // The gateway WS path streams agent output via TurnEvent::Chunk /
81        // ::Thinking / ::ToolCall / ::ToolResult; it does not deliver
82        // free-form `send()` messages. Returning Ok here keeps any caller
83        // that probes for a generic delivery target from erroring out.
84        Ok(())
85    }
86
87    async fn listen(&self, _tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
88        // The gateway WS path does not act as a message source for the
89        // channel orchestrator; turns are driven directly by the WS
90        // handler loop. Listen is a no-op for this transport.
91        Ok(())
92    }
93
94    async fn request_approval(
95        &self,
96        _recipient: &str,
97        request: &ChannelApprovalRequest,
98    ) -> anyhow::Result<Option<ChannelApprovalResponse>> {
99        let request_id = Uuid::new_v4().to_string();
100        let (tx, rx) = oneshot::channel();
101        self.pending.lock().insert(request_id.clone(), tx);
102
103        let event = TurnEvent::ApprovalRequest {
104            request_id: request_id.clone(),
105            tool_name: request.tool_name.clone(),
106            arguments_summary: request.arguments_summary.clone(),
107            timeout_secs: self.timeout.as_secs(),
108        };
109        if self.event_tx.send(event).await.is_err() {
110            // Forward task has gone away; the WS is closing. Clean up the
111            // pending entry and let the agent's caller treat this the same
112            // as any other channel that returns None: fall through to
113            // auto-deny per ApprovalManager policy.
114            self.pending.lock().remove(&request_id);
115            return Ok(None);
116        }
117
118        match tokio::time::timeout(self.timeout, rx).await {
119            Ok(Ok(decision)) => Ok(Some(decision)),
120            Ok(Err(_)) => {
121                // Sender dropped without responding (connection closed
122                // mid-prompt). Treat as deny rather than None so the agent
123                // does not silently fall back to "no channel handled this".
124                self.pending.lock().remove(&request_id);
125                Ok(Some(ChannelApprovalResponse::Deny))
126            }
127            Err(_) => {
128                // Timeout: pop and deny. Mirrors Telegram / Slack behaviour
129                // when the operator does not tap a button in time.
130                self.pending.lock().remove(&request_id);
131                Ok(Some(ChannelApprovalResponse::Deny))
132            }
133        }
134    }
135}