zeroclaw_api/channel.rs
1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use tokio_util::sync::CancellationToken;
4
5use crate::media::MediaAttachment;
6
7// ── Channel approval types ──────────────────────────────────────
8
9/// Compact description of a tool call presented to the user for approval.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ChannelApprovalRequest {
12 pub tool_name: String,
13 pub arguments_summary: String,
14 /// Raw tool arguments for channels (e.g. ACP) that can render structured
15 /// diffs instead of a plain summary string.
16 #[serde(skip_serializing_if = "Option::is_none")]
17 pub raw_arguments: Option<serde_json::Value>,
18}
19
20/// The operator's response to a channel-presented approval prompt.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(rename_all = "lowercase")]
23pub enum ChannelApprovalResponse {
24 /// Execute this one call.
25 Approve,
26 /// Deny this call.
27 Deny,
28 /// Execute and add tool to session-scoped allowlist.
29 #[serde(rename = "always")]
30 AlwaysApprove,
31}
32
33/// A message received from or sent to a channel
34#[derive(Debug, Clone, Default)]
35pub struct ChannelMessage {
36 pub id: String,
37 pub sender: String,
38 pub reply_target: String,
39 pub content: String,
40 pub channel: String,
41 /// ZeroClaw channel alias (the `<alias>` half of `[channels.<type>.<alias>]`)
42 /// when the platform supports multiple bot instances. Used by
43 /// session_key construction so two bots on the same platform compute
44 /// distinct session IDs and don't share conversation history. `None`
45 /// for channels that don't have an alias concept yet (webhook, cli).
46 pub channel_alias: Option<String>,
47 pub timestamp: u64,
48 /// Platform thread identifier (e.g. Slack `ts`, Discord thread ID).
49 /// When set, replies should be posted as threaded responses.
50 pub thread_ts: Option<String>,
51 /// Thread scope identifier for interruption/cancellation grouping.
52 /// Distinct from `thread_ts` (reply anchor): this is `Some` only when the message
53 /// is genuinely inside a reply thread and should be isolated from other threads.
54 /// `None` means top-level — scope is sender+channel only.
55 pub interruption_scope_id: Option<String>,
56 /// Media attachments (audio, images, video) for the media pipeline.
57 /// Channels populate this when they receive media alongside a text message.
58 /// Defaults to empty — existing channels are unaffected.
59 pub attachments: Vec<MediaAttachment>,
60 /// Email subject for reply threading.
61 pub subject: Option<String>,
62}
63
64/// Message to send through a channel
65#[derive(Debug, Clone)]
66pub struct SendMessage {
67 pub content: String,
68 pub recipient: String,
69 pub subject: Option<String>,
70 /// Platform thread identifier for threaded replies (e.g. Slack `thread_ts`).
71 pub thread_ts: Option<String>,
72 /// Optional cancellation token for interruptible delivery (e.g. multi-message mode).
73 pub cancellation_token: Option<CancellationToken>,
74 /// File attachments to send with the message.
75 /// Channels that don't support attachments ignore this field.
76 pub attachments: Vec<MediaAttachment>,
77 /// Message-ID to set as In-Reply-To header (email threading).
78 pub in_reply_to: Option<String>,
79}
80
81impl SendMessage {
82 /// Create a new message with content and recipient
83 pub fn new(content: impl Into<String>, recipient: impl Into<String>) -> Self {
84 Self {
85 content: content.into(),
86 recipient: recipient.into(),
87 subject: None,
88 thread_ts: None,
89 cancellation_token: None,
90 attachments: vec![],
91 in_reply_to: None,
92 }
93 }
94
95 /// Create a new message with content, recipient, and subject
96 pub fn with_subject(
97 content: impl Into<String>,
98 recipient: impl Into<String>,
99 subject: impl Into<String>,
100 ) -> Self {
101 Self {
102 content: content.into(),
103 recipient: recipient.into(),
104 subject: Some(subject.into()),
105 thread_ts: None,
106 cancellation_token: None,
107 attachments: vec![],
108 in_reply_to: None,
109 }
110 }
111
112 /// Set the In-Reply-To header for email threading.
113 pub fn in_reply_to(mut self, msg_id: Option<String>) -> Self {
114 self.in_reply_to = msg_id;
115 self
116 }
117
118 /// Set the subject on an existing SendMessage (builder style).
119 pub fn subject(mut self, subject: impl Into<String>) -> Self {
120 self.subject = Some(subject.into());
121 self
122 }
123
124 /// Set the thread identifier for threaded replies.
125 pub fn in_thread(mut self, thread_ts: Option<String>) -> Self {
126 self.thread_ts = thread_ts;
127 self
128 }
129
130 /// Attach a cancellation token for interruptible delivery.
131 pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
132 self.cancellation_token = Some(token);
133 self
134 }
135
136 /// Attach files to this message.
137 pub fn with_attachments(mut self, attachments: Vec<MediaAttachment>) -> Self {
138 self.attachments = attachments;
139 self
140 }
141}
142
143impl ChannelMessage {
144 /// Construct a `ChannelMessage` with all required fields set and all optional
145 /// fields zeroed. Prefer this over raw struct literals so that new optional
146 /// fields added to `ChannelMessage` in the future don't require mechanical
147 /// updates at every call site.
148 pub fn new(
149 id: impl Into<String>,
150 sender: impl Into<String>,
151 reply_target: impl Into<String>,
152 content: impl Into<String>,
153 channel: impl Into<String>,
154 timestamp: u64,
155 ) -> Self {
156 Self {
157 id: id.into(),
158 sender: sender.into(),
159 reply_target: reply_target.into(),
160 content: content.into(),
161 channel: channel.into(),
162 timestamp,
163 ..Self::default()
164 }
165 }
166}
167
168impl SendMessage {
169 /// Build a reply `SendMessage` from an inbound `ChannelMessage`.
170 ///
171 /// Sets `recipient` from `msg.reply_target`, threads via `in_reply_to` and
172 /// `thread_ts`, and prepends `Re:` to the subject when the inbound message
173 /// carried one. Safe to call from any channel handler; the `in_reply_to`
174 /// field is silently ignored by channels that don't support it.
175 pub fn reply_to(msg: &ChannelMessage, content: impl Into<String>) -> Self {
176 let mut sm = Self::new(content, &msg.reply_target)
177 .in_thread(msg.thread_ts.clone())
178 .in_reply_to(Some(msg.id.clone()));
179 if let Some(ref subj) = msg.subject {
180 let reply_subject = if subj.to_ascii_lowercase().starts_with("re:") {
181 subj.clone()
182 } else {
183 format!("Re: {}", subj)
184 };
185 sm = sm.subject(reply_subject);
186 }
187 sm
188 }
189}
190
191/// Core channel trait — implement for any messaging platform.
192///
193/// Every `Channel` is `Attributable`: the orchestrator's spawn site opens
194/// `attribution_span!(&*ch)` so log emissions from within `listen()` / `send()`
195/// inherit `channel = <type>.<alias>` from the trait object's role + alias.
196#[async_trait]
197pub trait Channel: Send + Sync + crate::attribution::Attributable {
198 /// Human-readable channel name
199 fn name(&self) -> &str;
200
201 /// Send a message through this channel
202 async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
203
204 /// Start listening for incoming messages (long-running)
205 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
206
207 /// Check if channel is healthy
208 async fn health_check(&self) -> bool {
209 true
210 }
211
212 /// Signal that the bot is processing a response (e.g. "typing" indicator).
213 async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
214 Ok(())
215 }
216
217 /// Stop any active typing indicator.
218 async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
219 Ok(())
220 }
221
222 /// Whether this channel supports progressive message updates via draft edits.
223 fn supports_draft_updates(&self) -> bool {
224 false
225 }
226
227 /// Self-loop guard for multi-agent runs.
228 ///
229 /// Returns the bot's own handle/identity on this channel
230 /// (e.g. `@my_bot` for Telegram, the bot's user ID for Discord)
231 /// when known, so the orchestrator can drop inbound events whose
232 /// `sender` matches: a bot must never respond to its own
233 /// messages, even if a misconfigured peer group lists the bot's
234 /// handle as an external peer.
235 ///
236 /// **Channels that handle inbound traffic must override this.**
237 /// The default `None` makes both layers of the orchestrator's
238 /// self-loop guard (the SDK-side `drop_self_messages` here, and
239 /// the agent-loop fallback `peers::should_drop_self_loop`) into
240 /// no-ops — both layers consult the same `self_handle`, so a
241 /// channel that returns `None` has no protection from looping on
242 /// its own outbound. Outbound-only channels (webhook, gmail-push,
243 /// voice-call) never see inbound and can keep the default. The
244 /// in-tree overrides currently cover Telegram (`bot_username`
245 /// cache), IRC (configured nickname), Discord (decoded from token),
246 /// Slack (cached `auth.test` user_id); other inbound channels
247 /// remain on the default and rely on per-impl filtering instead
248 /// of the shared guard.
249 fn self_handle(&self) -> Option<String> {
250 None
251 }
252
253 /// The exact form the bot expects to see when addressed by users on
254 /// this channel. Discord wraps the snowflake as `<@1088...>`,
255 /// Telegram presents `@bot_username`, Matrix presents
256 /// `@bot:server`, Slack wraps the user ID as `<@U02...>`. Returned
257 /// verbatim into the per-channel system prompt so the agent
258 /// recognizes its own mention without guessing, and uses the same
259 /// form to tag itself or peers in outbound replies.
260 ///
261 /// Default `None` for channels that have no inbound mention
262 /// concept (CLI, webhook, hardware, ACP elicitation). Channels
263 /// that override `self_handle` should usually override this too,
264 /// applying their platform-native mention wrapper to the handle.
265 fn self_addressed_mention(&self) -> Option<String> {
266 None
267 }
268
269 /// Whether the orchestrator should drop an inbound message as
270 /// self-authored (multi-agent self-loop guard).
271 ///
272 /// Default implementation compares `msg.sender` against
273 /// [`Self::self_handle`] case-insensitively, after stripping a
274 /// leading `@` from each side so Telegram-style handles match
275 /// regardless of which form the SDK delivers. Override only for
276 /// platforms whose identity comparison is non-string (e.g. a
277 /// numeric Discord user ID is `as_str` already; this default
278 /// works there too).
279 fn drop_self_messages(&self, msg: &ChannelMessage) -> bool {
280 let Some(handle) = self.self_handle() else {
281 return false;
282 };
283 let handle_norm = handle.trim_start_matches('@').to_ascii_lowercase();
284 let sender_norm = msg.sender.trim_start_matches('@').to_ascii_lowercase();
285 !handle_norm.is_empty() && handle_norm == sender_norm
286 }
287
288 /// Whether this channel supports multi-message streaming delivery.
289 fn supports_multi_message_streaming(&self) -> bool {
290 false
291 }
292
293 /// Minimum delay (ms) between sending each paragraph in multi-message mode.
294 fn multi_message_delay_ms(&self) -> u64 {
295 800
296 }
297
298 /// Send an initial draft message. Returns a platform-specific message ID for later edits.
299 async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> {
300 Ok(None)
301 }
302
303 /// Update a previously sent draft message with new accumulated content.
304 async fn update_draft(
305 &self,
306 _recipient: &str,
307 _message_id: &str,
308 _text: &str,
309 ) -> anyhow::Result<()> {
310 Ok(())
311 }
312
313 /// Show a progress/status update (e.g. tool execution status).
314 async fn update_draft_progress(
315 &self,
316 _recipient: &str,
317 _message_id: &str,
318 _text: &str,
319 ) -> anyhow::Result<()> {
320 Ok(())
321 }
322
323 /// Finalize a draft with the complete response (e.g. apply Markdown formatting).
324 async fn finalize_draft(
325 &self,
326 _recipient: &str,
327 _message_id: &str,
328 _text: &str,
329 ) -> anyhow::Result<()> {
330 Ok(())
331 }
332
333 /// Cancel and remove a previously sent draft message if the channel supports it.
334 async fn cancel_draft(&self, _recipient: &str, _message_id: &str) -> anyhow::Result<()> {
335 Ok(())
336 }
337
338 /// Add a reaction (emoji) to a message.
339 async fn add_reaction(
340 &self,
341 _channel_id: &str,
342 _message_id: &str,
343 _emoji: &str,
344 ) -> anyhow::Result<()> {
345 Ok(())
346 }
347
348 /// Remove a reaction (emoji) from a message previously added by this bot.
349 async fn remove_reaction(
350 &self,
351 _channel_id: &str,
352 _message_id: &str,
353 _emoji: &str,
354 ) -> anyhow::Result<()> {
355 Ok(())
356 }
357
358 /// Pin a message in the channel.
359 async fn pin_message(&self, _channel_id: &str, _message_id: &str) -> anyhow::Result<()> {
360 Ok(())
361 }
362
363 /// Unpin a previously pinned message.
364 async fn unpin_message(&self, _channel_id: &str, _message_id: &str) -> anyhow::Result<()> {
365 Ok(())
366 }
367
368 /// Redact (delete) a message from the channel.
369 async fn redact_message(
370 &self,
371 _channel_id: &str,
372 _message_id: &str,
373 _reason: Option<String>,
374 ) -> anyhow::Result<()> {
375 Ok(())
376 }
377
378 /// Request interactive tool-call approval from the channel operator.
379 ///
380 /// Returns `Ok(Some(response))` when the operator answers within the
381 /// channel's configured `approval_timeout_secs`; timeouts are surfaced
382 /// as `Deny`. Returns `Ok(None)` only for channels that do not implement
383 /// the prompt at all — the caller should fall back to its default policy
384 /// (typically auto-deny).
385 ///
386 /// Surface varies by channel:
387 /// - **Telegram** uses inline keyboard buttons.
388 /// - **Slack** Socket Mode uses Block Kit buttons; webhook fallback and
389 /// non–Socket Mode deployments use a token text reply.
390 /// - **Discord, Signal, Matrix, WhatsApp** embed a 6-character
391 /// alphanumeric token in the prompt and wait for a
392 /// `<token> approve|deny|always` reply on the same conversation.
393 async fn request_approval(
394 &self,
395 _recipient: &str,
396 _request: &ChannelApprovalRequest,
397 ) -> anyhow::Result<Option<ChannelApprovalResponse>> {
398 Ok(None)
399 }
400
401 /// Ask the user a multiple-choice question and return the chosen option's text.
402 ///
403 /// Returns `Ok(Some(answer))` if the channel handled the question natively
404 /// (e.g. ACP `session/request_permission`, Telegram inline keyboard).
405 /// Returns `Ok(None)` to signal the caller should fall back to the
406 /// generic `send` + `listen` flow. Default impl returns `None`.
407 ///
408 /// Free-form questions (no choices) are not modeled here yet — they
409 /// require the ACP elicitation RFD to land for a clean cross-channel API.
410 async fn request_choice(
411 &self,
412 _question: &str,
413 _choices: &[String],
414 _timeout: std::time::Duration,
415 ) -> anyhow::Result<Option<String>> {
416 Ok(None)
417 }
418
419 /// Whether this channel can answer free-form (no-choices) `ask_user`
420 /// questions via the standard `send` + `listen` flow.
421 ///
422 /// Channels that can only handle structured choices (e.g. ACP today, until
423 /// the elicitation RFD lands) should return `false` so callers can fail
424 /// fast with a useful error instead of timing out on `listen`.
425 fn supports_free_form_ask(&self) -> bool {
426 true
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433
434 /// Stub channel that overrides `self_handle` so the default
435 /// `drop_self_messages` implementation can be exercised.
436 struct StubChannel {
437 handle: Option<String>,
438 }
439
440 impl crate::attribution::Attributable for StubChannel {
441 fn role(&self) -> crate::attribution::Role {
442 crate::attribution::Role::Channel(crate::attribution::ChannelKind::Webhook)
443 }
444 fn alias(&self) -> &str {
445 "stub"
446 }
447 }
448
449 #[async_trait]
450 impl Channel for StubChannel {
451 fn name(&self) -> &str {
452 "stub"
453 }
454 async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
455 Ok(())
456 }
457 async fn listen(
458 &self,
459 _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
460 ) -> anyhow::Result<()> {
461 Ok(())
462 }
463 fn self_handle(&self) -> Option<String> {
464 self.handle.clone()
465 }
466 }
467
468 fn msg_from(sender: &str) -> ChannelMessage {
469 ChannelMessage::new("1", sender, "", "hi", "stub", 0)
470 }
471
472 #[test]
473 fn channel_message_new_zeros_optional_fields() {
474 let msg = ChannelMessage::new("id1", "alice", "room-1", "hello", "slack", 42);
475 assert_eq!(msg.id, "id1");
476 assert_eq!(msg.sender, "alice");
477 assert_eq!(msg.reply_target, "room-1");
478 assert_eq!(msg.content, "hello");
479 assert_eq!(msg.channel, "slack");
480 assert_eq!(msg.timestamp, 42);
481 assert!(msg.channel_alias.is_none());
482 assert!(msg.thread_ts.is_none());
483 assert!(msg.interruption_scope_id.is_none());
484 assert!(msg.attachments.is_empty());
485 assert!(msg.subject.is_none());
486 }
487
488 #[test]
489 fn send_message_reply_to_sets_threading_fields() {
490 let inbound = ChannelMessage {
491 id: "msg-001".into(),
492 reply_target: "user@example.com".into(),
493 thread_ts: Some("thread-1".into()),
494 subject: Some("Hello there".into()),
495 ..ChannelMessage::new("msg-001", "alice", "user@example.com", "", "email", 0)
496 };
497 let reply = SendMessage::reply_to(&inbound, "Got it");
498 assert_eq!(reply.recipient, "user@example.com");
499 assert_eq!(reply.in_reply_to.as_deref(), Some("msg-001"));
500 assert_eq!(reply.thread_ts.as_deref(), Some("thread-1"));
501 assert_eq!(reply.subject.as_deref(), Some("Re: Hello there"));
502 assert_eq!(reply.content, "Got it");
503 }
504
505 #[test]
506 fn send_message_reply_to_does_not_double_re_prefix() {
507 let inbound = ChannelMessage {
508 subject: Some("Re: Already prefixed".into()),
509 ..ChannelMessage::new("msg-002", "alice", "user@example.com", "", "email", 0)
510 };
511 let reply = SendMessage::reply_to(&inbound, "");
512 assert_eq!(reply.subject.as_deref(), Some("Re: Already prefixed"));
513 }
514
515 #[test]
516 fn send_message_reply_to_no_subject_omits_subject() {
517 let inbound = ChannelMessage::new("msg-003", "alice", "room-1", "ping", "slack", 0);
518 let reply = SendMessage::reply_to(&inbound, "pong");
519 assert!(reply.subject.is_none());
520 assert_eq!(reply.in_reply_to.as_deref(), Some("msg-003"));
521 }
522
523 #[test]
524 fn drop_self_messages_default_returns_false_when_handle_unknown() {
525 let channel = StubChannel { handle: None };
526 assert!(!channel.drop_self_messages(&msg_from("@anyone")));
527 }
528
529 #[test]
530 fn drop_self_messages_matches_exact_handle() {
531 let channel = StubChannel {
532 handle: Some("@my_bot".into()),
533 };
534 assert!(channel.drop_self_messages(&msg_from("@my_bot")));
535 assert!(!channel.drop_self_messages(&msg_from("@other_bot")));
536 }
537
538 #[test]
539 fn drop_self_messages_normalizes_at_prefix_and_case() {
540 let channel = StubChannel {
541 handle: Some("My_Bot".into()),
542 };
543 // SDK delivered with @ prefix, handle stored without. Match.
544 assert!(channel.drop_self_messages(&msg_from("@my_bot")));
545 // Both with @, mixed case. Match.
546 let channel = StubChannel {
547 handle: Some("@My_Bot".into()),
548 };
549 assert!(channel.drop_self_messages(&msg_from("@MY_BOT")));
550 }
551
552 #[test]
553 fn drop_self_messages_does_not_match_empty_handle() {
554 // A handle of "@" (effectively empty after normalization) must
555 // not match every inbound message; the guard only fires when
556 // the bot has a real handle to compare against.
557 let channel = StubChannel {
558 handle: Some("@".into()),
559 };
560 assert!(!channel.drop_self_messages(&msg_from("@anyone")));
561 }
562}