Skip to main content

zeroclaw_api/
channel.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use tokio_util::sync::CancellationToken;
4
5use crate::media::MediaAttachment;
6
7// ── Channel approval types ──────────────────────────────────────
8
9/// Compact description of a tool call presented to the user for approval.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ChannelApprovalRequest {
12    pub tool_name: String,
13    pub arguments_summary: String,
14    /// Raw tool arguments for channels (e.g. ACP) that can render structured
15    /// diffs instead of a plain summary string.
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub raw_arguments: Option<serde_json::Value>,
18}
19
20/// The operator's response to a channel-presented approval prompt.
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(rename_all = "lowercase")]
23pub enum ChannelApprovalResponse {
24    /// Execute this one call.
25    Approve,
26    /// Deny this call.
27    Deny,
28    /// Execute and add tool to session-scoped allowlist.
29    #[serde(rename = "always")]
30    AlwaysApprove,
31    /// Deny this call and supply an edited replacement for the arguments.
32    #[serde(rename = "deny_with_edit")]
33    DenyWithEdit { replacement: String },
34}
35
36/// A message received from or sent to a channel
37#[derive(Debug, Clone, Default)]
38pub struct ChannelMessage {
39    pub id: String,
40    pub sender: String,
41    pub reply_target: String,
42    pub content: String,
43    pub channel: String,
44    /// ZeroClaw channel alias (the `<alias>` half of `[channels.<type>.<alias>]`)
45    /// when the platform supports multiple bot instances. Used by
46    /// session_key construction so two bots on the same platform compute
47    /// distinct session IDs and don't share conversation history. `None`
48    /// for channels that don't have an alias concept yet (webhook, cli).
49    pub channel_alias: Option<String>,
50    pub timestamp: u64,
51    /// Platform thread identifier (e.g. Slack `ts`, Discord thread ID).
52    /// When set, replies should be posted as threaded responses.
53    pub thread_ts: Option<String>,
54    /// Thread scope identifier for interruption/cancellation grouping.
55    /// Distinct from `thread_ts` (reply anchor): this is `Some` only when the message
56    /// is genuinely inside a reply thread and should be isolated from other threads.
57    /// `None` means top-level — scope is sender+channel only.
58    pub interruption_scope_id: Option<String>,
59    /// Media attachments (audio, images, video) for the media pipeline.
60    /// Channels populate this when they receive media alongside a text message.
61    /// Defaults to empty — existing channels are unaffected.
62    pub attachments: Vec<MediaAttachment>,
63    /// Email subject for reply threading.
64    pub subject: Option<String>,
65}
66
67/// Message to send through a channel
68#[derive(Debug, Clone)]
69pub struct SendMessage {
70    pub content: String,
71    pub recipient: String,
72    pub subject: Option<String>,
73    /// Platform thread identifier for threaded replies (e.g. Slack `thread_ts`).
74    pub thread_ts: Option<String>,
75    /// Optional cancellation token for interruptible delivery (e.g. multi-message mode).
76    pub cancellation_token: Option<CancellationToken>,
77    /// File attachments to send with the message.
78    /// Channels that don't support attachments ignore this field.
79    pub attachments: Vec<MediaAttachment>,
80    /// Message-ID to set as In-Reply-To header (email threading).
81    pub in_reply_to: Option<String>,
82}
83
84impl SendMessage {
85    /// Create a new message with content and recipient
86    pub fn new(content: impl Into<String>, recipient: impl Into<String>) -> Self {
87        Self {
88            content: content.into(),
89            recipient: recipient.into(),
90            subject: None,
91            thread_ts: None,
92            cancellation_token: None,
93            attachments: vec![],
94            in_reply_to: None,
95        }
96    }
97
98    /// Create a new message with content, recipient, and subject
99    pub fn with_subject(
100        content: impl Into<String>,
101        recipient: impl Into<String>,
102        subject: impl Into<String>,
103    ) -> Self {
104        Self {
105            content: content.into(),
106            recipient: recipient.into(),
107            subject: Some(subject.into()),
108            thread_ts: None,
109            cancellation_token: None,
110            attachments: vec![],
111            in_reply_to: None,
112        }
113    }
114
115    /// Set the In-Reply-To header for email threading.
116    pub fn in_reply_to(mut self, msg_id: Option<String>) -> Self {
117        self.in_reply_to = msg_id;
118        self
119    }
120
121    /// Set the subject on an existing SendMessage (builder style).
122    pub fn subject(mut self, subject: impl Into<String>) -> Self {
123        self.subject = Some(subject.into());
124        self
125    }
126
127    /// Set the thread identifier for threaded replies.
128    pub fn in_thread(mut self, thread_ts: Option<String>) -> Self {
129        self.thread_ts = thread_ts;
130        self
131    }
132
133    /// Attach a cancellation token for interruptible delivery.
134    pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
135        self.cancellation_token = Some(token);
136        self
137    }
138
139    /// Attach files to this message.
140    pub fn with_attachments(mut self, attachments: Vec<MediaAttachment>) -> Self {
141        self.attachments = attachments;
142        self
143    }
144}
145
146impl ChannelMessage {
147    /// Construct a `ChannelMessage` with all required fields set and all optional
148    /// fields zeroed. Prefer this over raw struct literals so that new optional
149    /// fields added to `ChannelMessage` in the future don't require mechanical
150    /// updates at every call site.
151    pub fn new(
152        id: impl Into<String>,
153        sender: impl Into<String>,
154        reply_target: impl Into<String>,
155        content: impl Into<String>,
156        channel: impl Into<String>,
157        timestamp: u64,
158    ) -> Self {
159        Self {
160            id: id.into(),
161            sender: sender.into(),
162            reply_target: reply_target.into(),
163            content: content.into(),
164            channel: channel.into(),
165            timestamp,
166            ..Self::default()
167        }
168    }
169}
170
171impl SendMessage {
172    /// Build a reply `SendMessage` from an inbound `ChannelMessage`.
173    ///
174    /// Sets `recipient` from `msg.reply_target`, threads via `in_reply_to` and
175    /// `thread_ts`, and prepends `Re:` to the subject when the inbound message
176    /// carried one. Safe to call from any channel handler; the `in_reply_to`
177    /// field is silently ignored by channels that don't support it.
178    pub fn reply_to(msg: &ChannelMessage, content: impl Into<String>) -> Self {
179        let mut sm = Self::new(content, &msg.reply_target)
180            .in_thread(msg.thread_ts.clone())
181            .in_reply_to(Some(msg.id.clone()));
182        if let Some(ref subj) = msg.subject {
183            let reply_subject = if subj.to_ascii_lowercase().starts_with("re:") {
184                subj.clone()
185            } else {
186                format!("Re: {}", subj)
187            };
188            sm = sm.subject(reply_subject);
189        }
190        sm
191    }
192}
193
194/// Core channel trait — implement for any messaging platform.
195///
196/// Every `Channel` is `Attributable`: the orchestrator's spawn site opens
197/// `attribution_span!(&*ch)` so log emissions from within `listen()` / `send()`
198/// inherit `channel = <type>.<alias>` from the trait object's role + alias.
199#[async_trait]
200pub trait Channel: Send + Sync + crate::attribution::Attributable {
201    /// Human-readable channel name
202    fn name(&self) -> &str;
203
204    /// Send a message through this channel
205    async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
206
207    /// Start listening for incoming messages (long-running)
208    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
209
210    /// Check if channel is healthy
211    async fn health_check(&self) -> bool {
212        true
213    }
214
215    /// Signal that the bot is processing a response (e.g. "typing" indicator).
216    async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
217        Ok(())
218    }
219
220    /// Stop any active typing indicator.
221    async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
222        Ok(())
223    }
224
225    /// Whether this channel supports progressive message updates via draft edits.
226    fn supports_draft_updates(&self) -> bool {
227        false
228    }
229
230    /// Self-loop guard for multi-agent runs.
231    ///
232    /// Returns the bot's own handle/identity on this channel
233    /// (e.g. `@my_bot` for Telegram, the bot's user ID for Discord)
234    /// when known, so the orchestrator can drop inbound events whose
235    /// `sender` matches: a bot must never respond to its own
236    /// messages, even if a misconfigured peer group lists the bot's
237    /// handle as an external peer.
238    ///
239    /// **Channels that handle inbound traffic must override this.**
240    /// The default `None` makes both layers of the orchestrator's
241    /// self-loop guard (the SDK-side `drop_self_messages` here, and
242    /// the agent-loop fallback `peers::should_drop_self_loop`) into
243    /// no-ops — both layers consult the same `self_handle`, so a
244    /// channel that returns `None` has no protection from looping on
245    /// its own outbound. Outbound-only channels (webhook, gmail-push,
246    /// voice-call) never see inbound and can keep the default. The
247    /// in-tree overrides currently cover Telegram (`bot_username`
248    /// cache), IRC (configured nickname), Discord (decoded from token),
249    /// Slack (cached `auth.test` user_id); other inbound channels
250    /// remain on the default and rely on per-impl filtering instead
251    /// of the shared guard.
252    fn self_handle(&self) -> Option<String> {
253        None
254    }
255
256    /// The exact form the bot expects to see when addressed by users on
257    /// this channel. Discord wraps the snowflake as `<@1088...>`,
258    /// Telegram presents `@bot_username`, Matrix presents
259    /// `@bot:server`, Slack wraps the user ID as `<@U02...>`. Returned
260    /// verbatim into the per-channel system prompt so the agent
261    /// recognizes its own mention without guessing, and uses the same
262    /// form to tag itself or peers in outbound replies.
263    ///
264    /// Default `None` for channels that have no inbound mention
265    /// concept (CLI, webhook, hardware, ACP elicitation). Channels
266    /// that override `self_handle` should usually override this too,
267    /// applying their platform-native mention wrapper to the handle.
268    fn self_addressed_mention(&self) -> Option<String> {
269        None
270    }
271
272    /// Whether the orchestrator should drop an inbound message as
273    /// self-authored (multi-agent self-loop guard).
274    ///
275    /// Default implementation compares `msg.sender` against
276    /// [`Self::self_handle`] case-insensitively, after stripping a
277    /// leading `@` from each side so Telegram-style handles match
278    /// regardless of which form the SDK delivers. Override only for
279    /// platforms whose identity comparison is non-string (e.g. a
280    /// numeric Discord user ID is `as_str` already; this default
281    /// works there too).
282    fn drop_self_messages(&self, msg: &ChannelMessage) -> bool {
283        let Some(handle) = self.self_handle() else {
284            return false;
285        };
286        let handle_norm = handle.trim_start_matches('@').to_ascii_lowercase();
287        let sender_norm = msg.sender.trim_start_matches('@').to_ascii_lowercase();
288        !handle_norm.is_empty() && handle_norm == sender_norm
289    }
290
291    /// Whether this channel supports multi-message streaming delivery.
292    fn supports_multi_message_streaming(&self) -> bool {
293        false
294    }
295
296    /// Minimum delay (ms) between sending each paragraph in multi-message mode.
297    fn multi_message_delay_ms(&self) -> u64 {
298        800
299    }
300
301    /// Send an initial draft message. Returns a platform-specific message ID for later edits.
302    async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> {
303        Ok(None)
304    }
305
306    /// Update a previously sent draft message with new accumulated content.
307    async fn update_draft(
308        &self,
309        _recipient: &str,
310        _message_id: &str,
311        _text: &str,
312    ) -> anyhow::Result<()> {
313        Ok(())
314    }
315
316    /// Show a progress/status update (e.g. tool execution status).
317    async fn update_draft_progress(
318        &self,
319        _recipient: &str,
320        _message_id: &str,
321        _text: &str,
322    ) -> anyhow::Result<()> {
323        Ok(())
324    }
325
326    /// Finalize a draft with the complete response (e.g. apply Markdown formatting).
327    async fn finalize_draft(
328        &self,
329        _recipient: &str,
330        _message_id: &str,
331        _text: &str,
332    ) -> anyhow::Result<()> {
333        Ok(())
334    }
335
336    /// Cancel and remove a previously sent draft message if the channel supports it.
337    async fn cancel_draft(&self, _recipient: &str, _message_id: &str) -> anyhow::Result<()> {
338        Ok(())
339    }
340
341    /// Add a reaction (emoji) to a message.
342    async fn add_reaction(
343        &self,
344        _channel_id: &str,
345        _message_id: &str,
346        _emoji: &str,
347    ) -> anyhow::Result<()> {
348        Ok(())
349    }
350
351    /// Remove a reaction (emoji) from a message previously added by this bot.
352    async fn remove_reaction(
353        &self,
354        _channel_id: &str,
355        _message_id: &str,
356        _emoji: &str,
357    ) -> anyhow::Result<()> {
358        Ok(())
359    }
360
361    /// Pin a message in the channel.
362    async fn pin_message(&self, _channel_id: &str, _message_id: &str) -> anyhow::Result<()> {
363        Ok(())
364    }
365
366    /// Unpin a previously pinned message.
367    async fn unpin_message(&self, _channel_id: &str, _message_id: &str) -> anyhow::Result<()> {
368        Ok(())
369    }
370
371    /// Redact (delete) a message from the channel.
372    async fn redact_message(
373        &self,
374        _channel_id: &str,
375        _message_id: &str,
376        _reason: Option<String>,
377    ) -> anyhow::Result<()> {
378        Ok(())
379    }
380
381    /// Request interactive tool-call approval from the channel operator.
382    ///
383    /// Returns `Ok(Some(response))` when the operator answers within the
384    /// channel's configured `approval_timeout_secs`; timeouts are surfaced
385    /// as `Deny`. Returns `Ok(None)` only for channels that do not implement
386    /// the prompt at all — the caller should fall back to its default policy
387    /// (typically auto-deny).
388    ///
389    /// Surface varies by channel:
390    /// - **Telegram** uses inline keyboard buttons.
391    /// - **Slack** Socket Mode uses Block Kit buttons; webhook fallback and
392    ///   non–Socket Mode deployments use a token text reply.
393    /// - **Discord, Signal, Matrix, WhatsApp** embed a 6-character
394    ///   alphanumeric token in the prompt and wait for a
395    ///   `<token> approve|deny|always` reply on the same conversation.
396    async fn request_approval(
397        &self,
398        _recipient: &str,
399        _request: &ChannelApprovalRequest,
400    ) -> anyhow::Result<Option<ChannelApprovalResponse>> {
401        Ok(None)
402    }
403
404    /// Ask the user a multiple-choice question and return the chosen option's text.
405    ///
406    /// Returns `Ok(Some(answer))` if the channel handled the question natively
407    /// (e.g. ACP `session/request_permission`, Telegram inline keyboard).
408    /// Returns `Ok(None)` to signal the caller should fall back to the
409    /// generic `send` + `listen` flow. Default impl returns `None`.
410    ///
411    /// Free-form questions (no choices) are not modeled here yet — they
412    /// require the ACP elicitation RFD to land for a clean cross-channel API.
413    async fn request_choice(
414        &self,
415        _question: &str,
416        _choices: &[String],
417        _timeout: std::time::Duration,
418    ) -> anyhow::Result<Option<String>> {
419        Ok(None)
420    }
421
422    /// Whether this channel can answer free-form (no-choices) `ask_user`
423    /// questions via the standard `send` + `listen` flow.
424    ///
425    /// Channels that can only handle structured choices (e.g. ACP today, until
426    /// the elicitation RFD lands) should return `false` so callers can fail
427    /// fast with a useful error instead of timing out on `listen`.
428    fn supports_free_form_ask(&self) -> bool {
429        true
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    /// Stub channel that overrides `self_handle` so the default
438    /// `drop_self_messages` implementation can be exercised.
439    struct StubChannel {
440        handle: Option<String>,
441    }
442
443    impl crate::attribution::Attributable for StubChannel {
444        fn role(&self) -> crate::attribution::Role {
445            crate::attribution::Role::Channel(crate::attribution::ChannelKind::Webhook)
446        }
447        fn alias(&self) -> &str {
448            "stub"
449        }
450    }
451
452    #[async_trait]
453    impl Channel for StubChannel {
454        fn name(&self) -> &str {
455            "stub"
456        }
457        async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
458            Ok(())
459        }
460        async fn listen(
461            &self,
462            _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
463        ) -> anyhow::Result<()> {
464            Ok(())
465        }
466        fn self_handle(&self) -> Option<String> {
467            self.handle.clone()
468        }
469    }
470
471    fn msg_from(sender: &str) -> ChannelMessage {
472        ChannelMessage::new("1", sender, "", "hi", "stub", 0)
473    }
474
475    #[test]
476    fn channel_message_new_zeros_optional_fields() {
477        let msg = ChannelMessage::new("id1", "alice", "room-1", "hello", "slack", 42);
478        assert_eq!(msg.id, "id1");
479        assert_eq!(msg.sender, "alice");
480        assert_eq!(msg.reply_target, "room-1");
481        assert_eq!(msg.content, "hello");
482        assert_eq!(msg.channel, "slack");
483        assert_eq!(msg.timestamp, 42);
484        assert!(msg.channel_alias.is_none());
485        assert!(msg.thread_ts.is_none());
486        assert!(msg.interruption_scope_id.is_none());
487        assert!(msg.attachments.is_empty());
488        assert!(msg.subject.is_none());
489    }
490
491    #[test]
492    fn send_message_reply_to_sets_threading_fields() {
493        let inbound = ChannelMessage {
494            id: "msg-001".into(),
495            reply_target: "user@example.com".into(),
496            thread_ts: Some("thread-1".into()),
497            subject: Some("Hello there".into()),
498            ..ChannelMessage::new("msg-001", "alice", "user@example.com", "", "email", 0)
499        };
500        let reply = SendMessage::reply_to(&inbound, "Got it");
501        assert_eq!(reply.recipient, "user@example.com");
502        assert_eq!(reply.in_reply_to.as_deref(), Some("msg-001"));
503        assert_eq!(reply.thread_ts.as_deref(), Some("thread-1"));
504        assert_eq!(reply.subject.as_deref(), Some("Re: Hello there"));
505        assert_eq!(reply.content, "Got it");
506    }
507
508    #[test]
509    fn send_message_reply_to_does_not_double_re_prefix() {
510        let inbound = ChannelMessage {
511            subject: Some("Re: Already prefixed".into()),
512            ..ChannelMessage::new("msg-002", "alice", "user@example.com", "", "email", 0)
513        };
514        let reply = SendMessage::reply_to(&inbound, "");
515        assert_eq!(reply.subject.as_deref(), Some("Re: Already prefixed"));
516    }
517
518    #[test]
519    fn send_message_reply_to_no_subject_omits_subject() {
520        let inbound = ChannelMessage::new("msg-003", "alice", "room-1", "ping", "slack", 0);
521        let reply = SendMessage::reply_to(&inbound, "pong");
522        assert!(reply.subject.is_none());
523        assert_eq!(reply.in_reply_to.as_deref(), Some("msg-003"));
524    }
525
526    #[test]
527    fn drop_self_messages_default_returns_false_when_handle_unknown() {
528        let channel = StubChannel { handle: None };
529        assert!(!channel.drop_self_messages(&msg_from("@anyone")));
530    }
531
532    #[test]
533    fn drop_self_messages_matches_exact_handle() {
534        let channel = StubChannel {
535            handle: Some("@my_bot".into()),
536        };
537        assert!(channel.drop_self_messages(&msg_from("@my_bot")));
538        assert!(!channel.drop_self_messages(&msg_from("@other_bot")));
539    }
540
541    #[test]
542    fn drop_self_messages_normalizes_at_prefix_and_case() {
543        let channel = StubChannel {
544            handle: Some("My_Bot".into()),
545        };
546        // SDK delivered with @ prefix, handle stored without. Match.
547        assert!(channel.drop_self_messages(&msg_from("@my_bot")));
548        // Both with @, mixed case. Match.
549        let channel = StubChannel {
550            handle: Some("@My_Bot".into()),
551        };
552        assert!(channel.drop_self_messages(&msg_from("@MY_BOT")));
553    }
554
555    #[test]
556    fn drop_self_messages_does_not_match_empty_handle() {
557        // A handle of "@" (effectively empty after normalization) must
558        // not match every inbound message; the guard only fires when
559        // the bot has a real handle to compare against.
560        let channel = StubChannel {
561            handle: Some("@".into()),
562        };
563        assert!(!channel.drop_self_messages(&msg_from("@anyone")));
564    }
565
566    #[test]
567    fn deny_with_edit_round_trips_through_serde() {
568        let r = ChannelApprovalResponse::DenyWithEdit {
569            replacement: "new content".to_string(),
570        };
571        let json = serde_json::to_string(&r).unwrap();
572        let back: ChannelApprovalResponse = serde_json::from_str(&json).unwrap();
573        assert!(
574            matches!(back, ChannelApprovalResponse::DenyWithEdit { replacement } if replacement == "new content")
575        );
576    }
577}