Skip to main content

zeroclaw_api/
channel.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use tokio_util::sync::CancellationToken;
4
5use crate::media::MediaAttachment;
6
7// ── Channel approval types ──────────────────────────────────────
8
9/// Compact description of a tool call presented to the user for approval.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ChannelApprovalRequest {
12    pub tool_name: String,
13    pub arguments_summary: String,
14    /// Raw tool arguments for channels (e.g. ACP) that can render structured
15    /// diffs instead of a plain summary string.
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub raw_arguments: Option<serde_json::Value>,
18}
19
20/// The operator's response to a channel-presented approval prompt.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(rename_all = "lowercase")]
23pub enum ChannelApprovalResponse {
24    /// Execute this one call.
25    Approve,
26    /// Deny this call.
27    Deny,
28    /// Execute and add tool to session-scoped allowlist.
29    #[serde(rename = "always")]
30    AlwaysApprove,
31}
32
33/// A message received from or sent to a channel
34#[derive(Debug, Clone, Default)]
35pub struct ChannelMessage {
36    pub id: String,
37    pub sender: String,
38    pub reply_target: String,
39    pub content: String,
40    pub channel: String,
41    /// ZeroClaw channel alias (the `<alias>` half of `[channels.<type>.<alias>]`)
42    /// when the platform supports multiple bot instances. Used by
43    /// session_key construction so two bots on the same platform compute
44    /// distinct session IDs and don't share conversation history. `None`
45    /// for channels that don't have an alias concept yet (webhook, cli).
46    pub channel_alias: Option<String>,
47    pub timestamp: u64,
48    /// Platform thread identifier (e.g. Slack `ts`, Discord thread ID).
49    /// When set, replies should be posted as threaded responses.
50    pub thread_ts: Option<String>,
51    /// Thread scope identifier for interruption/cancellation grouping.
52    /// Distinct from `thread_ts` (reply anchor): this is `Some` only when the message
53    /// is genuinely inside a reply thread and should be isolated from other threads.
54    /// `None` means top-level — scope is sender+channel only.
55    pub interruption_scope_id: Option<String>,
56    /// Media attachments (audio, images, video) for the media pipeline.
57    /// Channels populate this when they receive media alongside a text message.
58    /// Defaults to empty — existing channels are unaffected.
59    pub attachments: Vec<MediaAttachment>,
60    /// Email subject for reply threading.
61    pub subject: Option<String>,
62}
63
64/// Message to send through a channel
65#[derive(Debug, Clone)]
66pub struct SendMessage {
67    pub content: String,
68    pub recipient: String,
69    pub subject: Option<String>,
70    /// Platform thread identifier for threaded replies (e.g. Slack `thread_ts`).
71    pub thread_ts: Option<String>,
72    /// Optional cancellation token for interruptible delivery (e.g. multi-message mode).
73    pub cancellation_token: Option<CancellationToken>,
74    /// File attachments to send with the message.
75    /// Channels that don't support attachments ignore this field.
76    pub attachments: Vec<MediaAttachment>,
77    /// Message-ID to set as In-Reply-To header (email threading).
78    pub in_reply_to: Option<String>,
79}
80
81impl SendMessage {
82    /// Create a new message with content and recipient
83    pub fn new(content: impl Into<String>, recipient: impl Into<String>) -> Self {
84        Self {
85            content: content.into(),
86            recipient: recipient.into(),
87            subject: None,
88            thread_ts: None,
89            cancellation_token: None,
90            attachments: vec![],
91            in_reply_to: None,
92        }
93    }
94
95    /// Create a new message with content, recipient, and subject
96    pub fn with_subject(
97        content: impl Into<String>,
98        recipient: impl Into<String>,
99        subject: impl Into<String>,
100    ) -> Self {
101        Self {
102            content: content.into(),
103            recipient: recipient.into(),
104            subject: Some(subject.into()),
105            thread_ts: None,
106            cancellation_token: None,
107            attachments: vec![],
108            in_reply_to: None,
109        }
110    }
111
112    /// Set the In-Reply-To header for email threading.
113    pub fn in_reply_to(mut self, msg_id: Option<String>) -> Self {
114        self.in_reply_to = msg_id;
115        self
116    }
117
118    /// Set the subject on an existing SendMessage (builder style).
119    pub fn subject(mut self, subject: impl Into<String>) -> Self {
120        self.subject = Some(subject.into());
121        self
122    }
123
124    /// Set the thread identifier for threaded replies.
125    pub fn in_thread(mut self, thread_ts: Option<String>) -> Self {
126        self.thread_ts = thread_ts;
127        self
128    }
129
130    /// Attach a cancellation token for interruptible delivery.
131    pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
132        self.cancellation_token = Some(token);
133        self
134    }
135
136    /// Attach files to this message.
137    pub fn with_attachments(mut self, attachments: Vec<MediaAttachment>) -> Self {
138        self.attachments = attachments;
139        self
140    }
141}
142
143impl ChannelMessage {
144    /// Construct a `ChannelMessage` with all required fields set and all optional
145    /// fields zeroed. Prefer this over raw struct literals so that new optional
146    /// fields added to `ChannelMessage` in the future don't require mechanical
147    /// updates at every call site.
148    pub fn new(
149        id: impl Into<String>,
150        sender: impl Into<String>,
151        reply_target: impl Into<String>,
152        content: impl Into<String>,
153        channel: impl Into<String>,
154        timestamp: u64,
155    ) -> Self {
156        Self {
157            id: id.into(),
158            sender: sender.into(),
159            reply_target: reply_target.into(),
160            content: content.into(),
161            channel: channel.into(),
162            timestamp,
163            ..Self::default()
164        }
165    }
166}
167
168impl SendMessage {
169    /// Build a reply `SendMessage` from an inbound `ChannelMessage`.
170    ///
171    /// Sets `recipient` from `msg.reply_target`, threads via `in_reply_to` and
172    /// `thread_ts`, and prepends `Re:` to the subject when the inbound message
173    /// carried one. Safe to call from any channel handler; the `in_reply_to`
174    /// field is silently ignored by channels that don't support it.
175    pub fn reply_to(msg: &ChannelMessage, content: impl Into<String>) -> Self {
176        let mut sm = Self::new(content, &msg.reply_target)
177            .in_thread(msg.thread_ts.clone())
178            .in_reply_to(Some(msg.id.clone()));
179        if let Some(ref subj) = msg.subject {
180            let reply_subject = if subj.to_ascii_lowercase().starts_with("re:") {
181                subj.clone()
182            } else {
183                format!("Re: {}", subj)
184            };
185            sm = sm.subject(reply_subject);
186        }
187        sm
188    }
189}
190
191/// Core channel trait — implement for any messaging platform.
192///
193/// Every `Channel` is `Attributable`: the orchestrator's spawn site opens
194/// `attribution_span!(&*ch)` so log emissions from within `listen()` / `send()`
195/// inherit `channel = <type>.<alias>` from the trait object's role + alias.
196#[async_trait]
197pub trait Channel: Send + Sync + crate::attribution::Attributable {
198    /// Human-readable channel name
199    fn name(&self) -> &str;
200
201    /// Send a message through this channel
202    async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
203
204    /// Start listening for incoming messages (long-running)
205    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
206
207    /// Check if channel is healthy
208    async fn health_check(&self) -> bool {
209        true
210    }
211
212    /// Signal that the bot is processing a response (e.g. "typing" indicator).
213    async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
214        Ok(())
215    }
216
217    /// Stop any active typing indicator.
218    async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
219        Ok(())
220    }
221
222    /// Whether this channel supports progressive message updates via draft edits.
223    fn supports_draft_updates(&self) -> bool {
224        false
225    }
226
227    /// Self-loop guard for multi-agent runs.
228    ///
229    /// Returns the bot's own handle/identity on this channel
230    /// (e.g. `@my_bot` for Telegram, the bot's user ID for Discord)
231    /// when known, so the orchestrator can drop inbound events whose
232    /// `sender` matches: a bot must never respond to its own
233    /// messages, even if a misconfigured peer group lists the bot's
234    /// handle as an external peer.
235    ///
236    /// **Channels that handle inbound traffic must override this.**
237    /// The default `None` makes both layers of the orchestrator's
238    /// self-loop guard (the SDK-side `drop_self_messages` here, and
239    /// the agent-loop fallback `peers::should_drop_self_loop`) into
240    /// no-ops — both layers consult the same `self_handle`, so a
241    /// channel that returns `None` has no protection from looping on
242    /// its own outbound. Outbound-only channels (webhook, gmail-push,
243    /// voice-call) never see inbound and can keep the default. The
244    /// in-tree overrides currently cover Telegram (`bot_username`
245    /// cache), IRC (configured nickname), Discord (decoded from token),
246    /// Slack (cached `auth.test` user_id); other inbound channels
247    /// remain on the default and rely on per-impl filtering instead
248    /// of the shared guard.
249    fn self_handle(&self) -> Option<String> {
250        None
251    }
252
253    /// The exact form the bot expects to see when addressed by users on
254    /// this channel. Discord wraps the snowflake as `<@1088...>`,
255    /// Telegram presents `@bot_username`, Matrix presents
256    /// `@bot:server`, Slack wraps the user ID as `<@U02...>`. Returned
257    /// verbatim into the per-channel system prompt so the agent
258    /// recognizes its own mention without guessing, and uses the same
259    /// form to tag itself or peers in outbound replies.
260    ///
261    /// Default `None` for channels that have no inbound mention
262    /// concept (CLI, webhook, hardware, ACP elicitation). Channels
263    /// that override `self_handle` should usually override this too,
264    /// applying their platform-native mention wrapper to the handle.
265    fn self_addressed_mention(&self) -> Option<String> {
266        None
267    }
268
269    /// Whether the orchestrator should drop an inbound message as
270    /// self-authored (multi-agent self-loop guard).
271    ///
272    /// Default implementation compares `msg.sender` against
273    /// [`Self::self_handle`] case-insensitively, after stripping a
274    /// leading `@` from each side so Telegram-style handles match
275    /// regardless of which form the SDK delivers. Override only for
276    /// platforms whose identity comparison is non-string (e.g. a
277    /// numeric Discord user ID is `as_str` already; this default
278    /// works there too).
279    fn drop_self_messages(&self, msg: &ChannelMessage) -> bool {
280        let Some(handle) = self.self_handle() else {
281            return false;
282        };
283        let handle_norm = handle.trim_start_matches('@').to_ascii_lowercase();
284        let sender_norm = msg.sender.trim_start_matches('@').to_ascii_lowercase();
285        !handle_norm.is_empty() && handle_norm == sender_norm
286    }
287
288    /// Whether this channel supports multi-message streaming delivery.
289    fn supports_multi_message_streaming(&self) -> bool {
290        false
291    }
292
293    /// Minimum delay (ms) between sending each paragraph in multi-message mode.
294    fn multi_message_delay_ms(&self) -> u64 {
295        800
296    }
297
298    /// Send an initial draft message. Returns a platform-specific message ID for later edits.
299    async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> {
300        Ok(None)
301    }
302
303    /// Update a previously sent draft message with new accumulated content.
304    async fn update_draft(
305        &self,
306        _recipient: &str,
307        _message_id: &str,
308        _text: &str,
309    ) -> anyhow::Result<()> {
310        Ok(())
311    }
312
313    /// Show a progress/status update (e.g. tool execution status).
314    async fn update_draft_progress(
315        &self,
316        _recipient: &str,
317        _message_id: &str,
318        _text: &str,
319    ) -> anyhow::Result<()> {
320        Ok(())
321    }
322
323    /// Finalize a draft with the complete response (e.g. apply Markdown formatting).
324    async fn finalize_draft(
325        &self,
326        _recipient: &str,
327        _message_id: &str,
328        _text: &str,
329    ) -> anyhow::Result<()> {
330        Ok(())
331    }
332
333    /// Cancel and remove a previously sent draft message if the channel supports it.
334    async fn cancel_draft(&self, _recipient: &str, _message_id: &str) -> anyhow::Result<()> {
335        Ok(())
336    }
337
338    /// Add a reaction (emoji) to a message.
339    async fn add_reaction(
340        &self,
341        _channel_id: &str,
342        _message_id: &str,
343        _emoji: &str,
344    ) -> anyhow::Result<()> {
345        Ok(())
346    }
347
348    /// Remove a reaction (emoji) from a message previously added by this bot.
349    async fn remove_reaction(
350        &self,
351        _channel_id: &str,
352        _message_id: &str,
353        _emoji: &str,
354    ) -> anyhow::Result<()> {
355        Ok(())
356    }
357
358    /// Pin a message in the channel.
359    async fn pin_message(&self, _channel_id: &str, _message_id: &str) -> anyhow::Result<()> {
360        Ok(())
361    }
362
363    /// Unpin a previously pinned message.
364    async fn unpin_message(&self, _channel_id: &str, _message_id: &str) -> anyhow::Result<()> {
365        Ok(())
366    }
367
368    /// Redact (delete) a message from the channel.
369    async fn redact_message(
370        &self,
371        _channel_id: &str,
372        _message_id: &str,
373        _reason: Option<String>,
374    ) -> anyhow::Result<()> {
375        Ok(())
376    }
377
378    /// Request interactive tool-call approval from the channel operator.
379    ///
380    /// Returns `Ok(Some(response))` when the operator answers within the
381    /// channel's configured `approval_timeout_secs`; timeouts are surfaced
382    /// as `Deny`. Returns `Ok(None)` only for channels that do not implement
383    /// the prompt at all — the caller should fall back to its default policy
384    /// (typically auto-deny).
385    ///
386    /// Surface varies by channel:
387    /// - **Telegram** uses inline keyboard buttons.
388    /// - **Slack** Socket Mode uses Block Kit buttons; webhook fallback and
389    ///   non–Socket Mode deployments use a token text reply.
390    /// - **Discord, Signal, Matrix, WhatsApp** embed a 6-character
391    ///   alphanumeric token in the prompt and wait for a
392    ///   `<token> approve|deny|always` reply on the same conversation.
393    async fn request_approval(
394        &self,
395        _recipient: &str,
396        _request: &ChannelApprovalRequest,
397    ) -> anyhow::Result<Option<ChannelApprovalResponse>> {
398        Ok(None)
399    }
400
401    /// Ask the user a multiple-choice question and return the chosen option's text.
402    ///
403    /// Returns `Ok(Some(answer))` if the channel handled the question natively
404    /// (e.g. ACP `session/request_permission`, Telegram inline keyboard).
405    /// Returns `Ok(None)` to signal the caller should fall back to the
406    /// generic `send` + `listen` flow. Default impl returns `None`.
407    ///
408    /// Free-form questions (no choices) are not modeled here yet — they
409    /// require the ACP elicitation RFD to land for a clean cross-channel API.
410    async fn request_choice(
411        &self,
412        _question: &str,
413        _choices: &[String],
414        _timeout: std::time::Duration,
415    ) -> anyhow::Result<Option<String>> {
416        Ok(None)
417    }
418
419    /// Whether this channel can answer free-form (no-choices) `ask_user`
420    /// questions via the standard `send` + `listen` flow.
421    ///
422    /// Channels that can only handle structured choices (e.g. ACP today, until
423    /// the elicitation RFD lands) should return `false` so callers can fail
424    /// fast with a useful error instead of timing out on `listen`.
425    fn supports_free_form_ask(&self) -> bool {
426        true
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433
434    /// Stub channel that overrides `self_handle` so the default
435    /// `drop_self_messages` implementation can be exercised.
436    struct StubChannel {
437        handle: Option<String>,
438    }
439
440    impl crate::attribution::Attributable for StubChannel {
441        fn role(&self) -> crate::attribution::Role {
442            crate::attribution::Role::Channel(crate::attribution::ChannelKind::Webhook)
443        }
444        fn alias(&self) -> &str {
445            "stub"
446        }
447    }
448
449    #[async_trait]
450    impl Channel for StubChannel {
451        fn name(&self) -> &str {
452            "stub"
453        }
454        async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
455            Ok(())
456        }
457        async fn listen(
458            &self,
459            _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
460        ) -> anyhow::Result<()> {
461            Ok(())
462        }
463        fn self_handle(&self) -> Option<String> {
464            self.handle.clone()
465        }
466    }
467
468    fn msg_from(sender: &str) -> ChannelMessage {
469        ChannelMessage::new("1", sender, "", "hi", "stub", 0)
470    }
471
472    #[test]
473    fn channel_message_new_zeros_optional_fields() {
474        let msg = ChannelMessage::new("id1", "alice", "room-1", "hello", "slack", 42);
475        assert_eq!(msg.id, "id1");
476        assert_eq!(msg.sender, "alice");
477        assert_eq!(msg.reply_target, "room-1");
478        assert_eq!(msg.content, "hello");
479        assert_eq!(msg.channel, "slack");
480        assert_eq!(msg.timestamp, 42);
481        assert!(msg.channel_alias.is_none());
482        assert!(msg.thread_ts.is_none());
483        assert!(msg.interruption_scope_id.is_none());
484        assert!(msg.attachments.is_empty());
485        assert!(msg.subject.is_none());
486    }
487
488    #[test]
489    fn send_message_reply_to_sets_threading_fields() {
490        let inbound = ChannelMessage {
491            id: "msg-001".into(),
492            reply_target: "user@example.com".into(),
493            thread_ts: Some("thread-1".into()),
494            subject: Some("Hello there".into()),
495            ..ChannelMessage::new("msg-001", "alice", "user@example.com", "", "email", 0)
496        };
497        let reply = SendMessage::reply_to(&inbound, "Got it");
498        assert_eq!(reply.recipient, "user@example.com");
499        assert_eq!(reply.in_reply_to.as_deref(), Some("msg-001"));
500        assert_eq!(reply.thread_ts.as_deref(), Some("thread-1"));
501        assert_eq!(reply.subject.as_deref(), Some("Re: Hello there"));
502        assert_eq!(reply.content, "Got it");
503    }
504
505    #[test]
506    fn send_message_reply_to_does_not_double_re_prefix() {
507        let inbound = ChannelMessage {
508            subject: Some("Re: Already prefixed".into()),
509            ..ChannelMessage::new("msg-002", "alice", "user@example.com", "", "email", 0)
510        };
511        let reply = SendMessage::reply_to(&inbound, "");
512        assert_eq!(reply.subject.as_deref(), Some("Re: Already prefixed"));
513    }
514
515    #[test]
516    fn send_message_reply_to_no_subject_omits_subject() {
517        let inbound = ChannelMessage::new("msg-003", "alice", "room-1", "ping", "slack", 0);
518        let reply = SendMessage::reply_to(&inbound, "pong");
519        assert!(reply.subject.is_none());
520        assert_eq!(reply.in_reply_to.as_deref(), Some("msg-003"));
521    }
522
523    #[test]
524    fn drop_self_messages_default_returns_false_when_handle_unknown() {
525        let channel = StubChannel { handle: None };
526        assert!(!channel.drop_self_messages(&msg_from("@anyone")));
527    }
528
529    #[test]
530    fn drop_self_messages_matches_exact_handle() {
531        let channel = StubChannel {
532            handle: Some("@my_bot".into()),
533        };
534        assert!(channel.drop_self_messages(&msg_from("@my_bot")));
535        assert!(!channel.drop_self_messages(&msg_from("@other_bot")));
536    }
537
538    #[test]
539    fn drop_self_messages_normalizes_at_prefix_and_case() {
540        let channel = StubChannel {
541            handle: Some("My_Bot".into()),
542        };
543        // SDK delivered with @ prefix, handle stored without. Match.
544        assert!(channel.drop_self_messages(&msg_from("@my_bot")));
545        // Both with @, mixed case. Match.
546        let channel = StubChannel {
547            handle: Some("@My_Bot".into()),
548        };
549        assert!(channel.drop_self_messages(&msg_from("@MY_BOT")));
550    }
551
552    #[test]
553    fn drop_self_messages_does_not_match_empty_handle() {
554        // A handle of "@" (effectively empty after normalization) must
555        // not match every inbound message; the guard only fires when
556        // the bot has a real handle to compare against.
557        let channel = StubChannel {
558            handle: Some("@".into()),
559        };
560        assert!(!channel.drop_self_messages(&msg_from("@anyone")));
561    }
562}