zeroclaw_api/channel.rs
1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use tokio_util::sync::CancellationToken;
4
5use crate::media::MediaAttachment;
6
7// ── Channel approval types ──────────────────────────────────────
8
9/// Compact description of a tool call presented to the user for approval.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ChannelApprovalRequest {
12 pub tool_name: String,
13 pub arguments_summary: String,
14 /// Raw tool arguments for channels (e.g. ACP) that can render structured
15 /// diffs instead of a plain summary string.
16 #[serde(skip_serializing_if = "Option::is_none")]
17 pub raw_arguments: Option<serde_json::Value>,
18}
19
20/// The operator's response to a channel-presented approval prompt.
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(rename_all = "lowercase")]
23pub enum ChannelApprovalResponse {
24 /// Execute this one call.
25 Approve,
26 /// Deny this call.
27 Deny,
28 /// Execute and add tool to session-scoped allowlist.
29 #[serde(rename = "always")]
30 AlwaysApprove,
31 /// Deny this call and supply an edited replacement for the arguments.
32 #[serde(rename = "deny_with_edit")]
33 DenyWithEdit { replacement: String },
34}
35
36/// A message received from or sent to a channel
37#[derive(Debug, Clone, Default)]
38pub struct ChannelMessage {
39 pub id: String,
40 pub sender: String,
41 pub reply_target: String,
42 pub content: String,
43 pub channel: String,
44 /// ZeroClaw channel alias (the `<alias>` half of `[channels.<type>.<alias>]`)
45 /// when the platform supports multiple bot instances. Used by
46 /// session_key construction so two bots on the same platform compute
47 /// distinct session IDs and don't share conversation history. `None`
48 /// for channels that don't have an alias concept yet (webhook, cli).
49 pub channel_alias: Option<String>,
50 pub timestamp: u64,
51 /// Platform thread identifier (e.g. Slack `ts`, Discord thread ID).
52 /// When set, replies should be posted as threaded responses.
53 pub thread_ts: Option<String>,
54 /// Thread scope identifier for interruption/cancellation grouping.
55 /// Distinct from `thread_ts` (reply anchor): this is `Some` only when the message
56 /// is genuinely inside a reply thread and should be isolated from other threads.
57 /// `None` means top-level — scope is sender+channel only.
58 pub interruption_scope_id: Option<String>,
59 /// Media attachments (audio, images, video) for the media pipeline.
60 /// Channels populate this when they receive media alongside a text message.
61 /// Defaults to empty — existing channels are unaffected.
62 pub attachments: Vec<MediaAttachment>,
63 /// Email subject for reply threading.
64 pub subject: Option<String>,
65}
66
67/// Message to send through a channel
68#[derive(Debug, Clone)]
69pub struct SendMessage {
70 pub content: String,
71 pub recipient: String,
72 pub subject: Option<String>,
73 /// Platform thread identifier for threaded replies (e.g. Slack `thread_ts`).
74 pub thread_ts: Option<String>,
75 /// Optional cancellation token for interruptible delivery (e.g. multi-message mode).
76 pub cancellation_token: Option<CancellationToken>,
77 /// File attachments to send with the message.
78 /// Channels that don't support attachments ignore this field.
79 pub attachments: Vec<MediaAttachment>,
80 /// Message-ID to set as In-Reply-To header (email threading).
81 pub in_reply_to: Option<String>,
82}
83
84impl SendMessage {
85 /// Create a new message with content and recipient
86 pub fn new(content: impl Into<String>, recipient: impl Into<String>) -> Self {
87 Self {
88 content: content.into(),
89 recipient: recipient.into(),
90 subject: None,
91 thread_ts: None,
92 cancellation_token: None,
93 attachments: vec![],
94 in_reply_to: None,
95 }
96 }
97
98 /// Create a new message with content, recipient, and subject
99 pub fn with_subject(
100 content: impl Into<String>,
101 recipient: impl Into<String>,
102 subject: impl Into<String>,
103 ) -> Self {
104 Self {
105 content: content.into(),
106 recipient: recipient.into(),
107 subject: Some(subject.into()),
108 thread_ts: None,
109 cancellation_token: None,
110 attachments: vec![],
111 in_reply_to: None,
112 }
113 }
114
115 /// Set the In-Reply-To header for email threading.
116 pub fn in_reply_to(mut self, msg_id: Option<String>) -> Self {
117 self.in_reply_to = msg_id;
118 self
119 }
120
121 /// Set the subject on an existing SendMessage (builder style).
122 pub fn subject(mut self, subject: impl Into<String>) -> Self {
123 self.subject = Some(subject.into());
124 self
125 }
126
127 /// Set the thread identifier for threaded replies.
128 pub fn in_thread(mut self, thread_ts: Option<String>) -> Self {
129 self.thread_ts = thread_ts;
130 self
131 }
132
133 /// Attach a cancellation token for interruptible delivery.
134 pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
135 self.cancellation_token = Some(token);
136 self
137 }
138
139 /// Attach files to this message.
140 pub fn with_attachments(mut self, attachments: Vec<MediaAttachment>) -> Self {
141 self.attachments = attachments;
142 self
143 }
144}
145
146impl ChannelMessage {
147 /// Construct a `ChannelMessage` with all required fields set and all optional
148 /// fields zeroed. Prefer this over raw struct literals so that new optional
149 /// fields added to `ChannelMessage` in the future don't require mechanical
150 /// updates at every call site.
151 pub fn new(
152 id: impl Into<String>,
153 sender: impl Into<String>,
154 reply_target: impl Into<String>,
155 content: impl Into<String>,
156 channel: impl Into<String>,
157 timestamp: u64,
158 ) -> Self {
159 Self {
160 id: id.into(),
161 sender: sender.into(),
162 reply_target: reply_target.into(),
163 content: content.into(),
164 channel: channel.into(),
165 timestamp,
166 ..Self::default()
167 }
168 }
169}
170
171impl SendMessage {
172 /// Build a reply `SendMessage` from an inbound `ChannelMessage`.
173 ///
174 /// Sets `recipient` from `msg.reply_target`, threads via `in_reply_to` and
175 /// `thread_ts`, and prepends `Re:` to the subject when the inbound message
176 /// carried one. Safe to call from any channel handler; the `in_reply_to`
177 /// field is silently ignored by channels that don't support it.
178 pub fn reply_to(msg: &ChannelMessage, content: impl Into<String>) -> Self {
179 let mut sm = Self::new(content, &msg.reply_target)
180 .in_thread(msg.thread_ts.clone())
181 .in_reply_to(Some(msg.id.clone()));
182 if let Some(ref subj) = msg.subject {
183 let reply_subject = if subj.to_ascii_lowercase().starts_with("re:") {
184 subj.clone()
185 } else {
186 format!("Re: {}", subj)
187 };
188 sm = sm.subject(reply_subject);
189 }
190 sm
191 }
192}
193
194/// Core channel trait — implement for any messaging platform.
195///
196/// Every `Channel` is `Attributable`: the orchestrator's spawn site opens
197/// `attribution_span!(&*ch)` so log emissions from within `listen()` / `send()`
198/// inherit `channel = <type>.<alias>` from the trait object's role + alias.
199#[async_trait]
200pub trait Channel: Send + Sync + crate::attribution::Attributable {
201 /// Human-readable channel name
202 fn name(&self) -> &str;
203
204 /// Send a message through this channel
205 async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
206
207 /// Start listening for incoming messages (long-running)
208 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
209
210 /// Check if channel is healthy
211 async fn health_check(&self) -> bool {
212 true
213 }
214
215 /// Signal that the bot is processing a response (e.g. "typing" indicator).
216 async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
217 Ok(())
218 }
219
220 /// Stop any active typing indicator.
221 async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
222 Ok(())
223 }
224
225 /// Whether this channel supports progressive message updates via draft edits.
226 fn supports_draft_updates(&self) -> bool {
227 false
228 }
229
230 /// Self-loop guard for multi-agent runs.
231 ///
232 /// Returns the bot's own handle/identity on this channel
233 /// (e.g. `@my_bot` for Telegram, the bot's user ID for Discord)
234 /// when known, so the orchestrator can drop inbound events whose
235 /// `sender` matches: a bot must never respond to its own
236 /// messages, even if a misconfigured peer group lists the bot's
237 /// handle as an external peer.
238 ///
239 /// **Channels that handle inbound traffic must override this.**
240 /// The default `None` makes both layers of the orchestrator's
241 /// self-loop guard (the SDK-side `drop_self_messages` here, and
242 /// the agent-loop fallback `peers::should_drop_self_loop`) into
243 /// no-ops — both layers consult the same `self_handle`, so a
244 /// channel that returns `None` has no protection from looping on
245 /// its own outbound. Outbound-only channels (webhook, gmail-push,
246 /// voice-call) never see inbound and can keep the default. The
247 /// in-tree overrides currently cover Telegram (`bot_username`
248 /// cache), IRC (configured nickname), Discord (decoded from token),
249 /// Slack (cached `auth.test` user_id); other inbound channels
250 /// remain on the default and rely on per-impl filtering instead
251 /// of the shared guard.
252 fn self_handle(&self) -> Option<String> {
253 None
254 }
255
256 /// The exact form the bot expects to see when addressed by users on
257 /// this channel. Discord wraps the snowflake as `<@1088...>`,
258 /// Telegram presents `@bot_username`, Matrix presents
259 /// `@bot:server`, Slack wraps the user ID as `<@U02...>`. Returned
260 /// verbatim into the per-channel system prompt so the agent
261 /// recognizes its own mention without guessing, and uses the same
262 /// form to tag itself or peers in outbound replies.
263 ///
264 /// Default `None` for channels that have no inbound mention
265 /// concept (CLI, webhook, hardware, ACP elicitation). Channels
266 /// that override `self_handle` should usually override this too,
267 /// applying their platform-native mention wrapper to the handle.
268 fn self_addressed_mention(&self) -> Option<String> {
269 None
270 }
271
272 /// Whether the orchestrator should drop an inbound message as
273 /// self-authored (multi-agent self-loop guard).
274 ///
275 /// Default implementation compares `msg.sender` against
276 /// [`Self::self_handle`] case-insensitively, after stripping a
277 /// leading `@` from each side so Telegram-style handles match
278 /// regardless of which form the SDK delivers. Override only for
279 /// platforms whose identity comparison is non-string (e.g. a
280 /// numeric Discord user ID is `as_str` already; this default
281 /// works there too).
282 fn drop_self_messages(&self, msg: &ChannelMessage) -> bool {
283 let Some(handle) = self.self_handle() else {
284 return false;
285 };
286 let handle_norm = handle.trim_start_matches('@').to_ascii_lowercase();
287 let sender_norm = msg.sender.trim_start_matches('@').to_ascii_lowercase();
288 !handle_norm.is_empty() && handle_norm == sender_norm
289 }
290
291 /// Whether this channel supports multi-message streaming delivery.
292 fn supports_multi_message_streaming(&self) -> bool {
293 false
294 }
295
296 /// Minimum delay (ms) between sending each paragraph in multi-message mode.
297 fn multi_message_delay_ms(&self) -> u64 {
298 800
299 }
300
301 /// Send an initial draft message. Returns a platform-specific message ID for later edits.
302 async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> {
303 Ok(None)
304 }
305
306 /// Update a previously sent draft message with new accumulated content.
307 async fn update_draft(
308 &self,
309 _recipient: &str,
310 _message_id: &str,
311 _text: &str,
312 ) -> anyhow::Result<()> {
313 Ok(())
314 }
315
316 /// Show a progress/status update (e.g. tool execution status).
317 async fn update_draft_progress(
318 &self,
319 _recipient: &str,
320 _message_id: &str,
321 _text: &str,
322 ) -> anyhow::Result<()> {
323 Ok(())
324 }
325
326 /// Finalize a draft with the complete response (e.g. apply Markdown formatting).
327 async fn finalize_draft(
328 &self,
329 _recipient: &str,
330 _message_id: &str,
331 _text: &str,
332 ) -> anyhow::Result<()> {
333 Ok(())
334 }
335
336 /// Cancel and remove a previously sent draft message if the channel supports it.
337 async fn cancel_draft(&self, _recipient: &str, _message_id: &str) -> anyhow::Result<()> {
338 Ok(())
339 }
340
341 /// Add a reaction (emoji) to a message.
342 async fn add_reaction(
343 &self,
344 _channel_id: &str,
345 _message_id: &str,
346 _emoji: &str,
347 ) -> anyhow::Result<()> {
348 Ok(())
349 }
350
351 /// Remove a reaction (emoji) from a message previously added by this bot.
352 async fn remove_reaction(
353 &self,
354 _channel_id: &str,
355 _message_id: &str,
356 _emoji: &str,
357 ) -> anyhow::Result<()> {
358 Ok(())
359 }
360
361 /// Pin a message in the channel.
362 async fn pin_message(&self, _channel_id: &str, _message_id: &str) -> anyhow::Result<()> {
363 Ok(())
364 }
365
366 /// Unpin a previously pinned message.
367 async fn unpin_message(&self, _channel_id: &str, _message_id: &str) -> anyhow::Result<()> {
368 Ok(())
369 }
370
371 /// Redact (delete) a message from the channel.
372 async fn redact_message(
373 &self,
374 _channel_id: &str,
375 _message_id: &str,
376 _reason: Option<String>,
377 ) -> anyhow::Result<()> {
378 Ok(())
379 }
380
381 /// Request interactive tool-call approval from the channel operator.
382 ///
383 /// Returns `Ok(Some(response))` when the operator answers within the
384 /// channel's configured `approval_timeout_secs`; timeouts are surfaced
385 /// as `Deny`. Returns `Ok(None)` only for channels that do not implement
386 /// the prompt at all — the caller should fall back to its default policy
387 /// (typically auto-deny).
388 ///
389 /// Surface varies by channel:
390 /// - **Telegram** uses inline keyboard buttons.
391 /// - **Slack** Socket Mode uses Block Kit buttons; webhook fallback and
392 /// non–Socket Mode deployments use a token text reply.
393 /// - **Discord, Signal, Matrix, WhatsApp** embed a 6-character
394 /// alphanumeric token in the prompt and wait for a
395 /// `<token> approve|deny|always` reply on the same conversation.
396 async fn request_approval(
397 &self,
398 _recipient: &str,
399 _request: &ChannelApprovalRequest,
400 ) -> anyhow::Result<Option<ChannelApprovalResponse>> {
401 Ok(None)
402 }
403
404 /// Ask the user a multiple-choice question and return the chosen option's text.
405 ///
406 /// Returns `Ok(Some(answer))` if the channel handled the question natively
407 /// (e.g. ACP `session/request_permission`, Telegram inline keyboard).
408 /// Returns `Ok(None)` to signal the caller should fall back to the
409 /// generic `send` + `listen` flow. Default impl returns `None`.
410 ///
411 /// Free-form questions (no choices) are not modeled here yet — they
412 /// require the ACP elicitation RFD to land for a clean cross-channel API.
413 async fn request_choice(
414 &self,
415 _question: &str,
416 _choices: &[String],
417 _timeout: std::time::Duration,
418 ) -> anyhow::Result<Option<String>> {
419 Ok(None)
420 }
421
422 /// Whether this channel can answer free-form (no-choices) `ask_user`
423 /// questions via the standard `send` + `listen` flow.
424 ///
425 /// Channels that can only handle structured choices (e.g. ACP today, until
426 /// the elicitation RFD lands) should return `false` so callers can fail
427 /// fast with a useful error instead of timing out on `listen`.
428 fn supports_free_form_ask(&self) -> bool {
429 true
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 /// Stub channel that overrides `self_handle` so the default
438 /// `drop_self_messages` implementation can be exercised.
439 struct StubChannel {
440 handle: Option<String>,
441 }
442
443 impl crate::attribution::Attributable for StubChannel {
444 fn role(&self) -> crate::attribution::Role {
445 crate::attribution::Role::Channel(crate::attribution::ChannelKind::Webhook)
446 }
447 fn alias(&self) -> &str {
448 "stub"
449 }
450 }
451
452 #[async_trait]
453 impl Channel for StubChannel {
454 fn name(&self) -> &str {
455 "stub"
456 }
457 async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
458 Ok(())
459 }
460 async fn listen(
461 &self,
462 _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
463 ) -> anyhow::Result<()> {
464 Ok(())
465 }
466 fn self_handle(&self) -> Option<String> {
467 self.handle.clone()
468 }
469 }
470
471 fn msg_from(sender: &str) -> ChannelMessage {
472 ChannelMessage::new("1", sender, "", "hi", "stub", 0)
473 }
474
475 #[test]
476 fn channel_message_new_zeros_optional_fields() {
477 let msg = ChannelMessage::new("id1", "alice", "room-1", "hello", "slack", 42);
478 assert_eq!(msg.id, "id1");
479 assert_eq!(msg.sender, "alice");
480 assert_eq!(msg.reply_target, "room-1");
481 assert_eq!(msg.content, "hello");
482 assert_eq!(msg.channel, "slack");
483 assert_eq!(msg.timestamp, 42);
484 assert!(msg.channel_alias.is_none());
485 assert!(msg.thread_ts.is_none());
486 assert!(msg.interruption_scope_id.is_none());
487 assert!(msg.attachments.is_empty());
488 assert!(msg.subject.is_none());
489 }
490
491 #[test]
492 fn send_message_reply_to_sets_threading_fields() {
493 let inbound = ChannelMessage {
494 id: "msg-001".into(),
495 reply_target: "user@example.com".into(),
496 thread_ts: Some("thread-1".into()),
497 subject: Some("Hello there".into()),
498 ..ChannelMessage::new("msg-001", "alice", "user@example.com", "", "email", 0)
499 };
500 let reply = SendMessage::reply_to(&inbound, "Got it");
501 assert_eq!(reply.recipient, "user@example.com");
502 assert_eq!(reply.in_reply_to.as_deref(), Some("msg-001"));
503 assert_eq!(reply.thread_ts.as_deref(), Some("thread-1"));
504 assert_eq!(reply.subject.as_deref(), Some("Re: Hello there"));
505 assert_eq!(reply.content, "Got it");
506 }
507
508 #[test]
509 fn send_message_reply_to_does_not_double_re_prefix() {
510 let inbound = ChannelMessage {
511 subject: Some("Re: Already prefixed".into()),
512 ..ChannelMessage::new("msg-002", "alice", "user@example.com", "", "email", 0)
513 };
514 let reply = SendMessage::reply_to(&inbound, "");
515 assert_eq!(reply.subject.as_deref(), Some("Re: Already prefixed"));
516 }
517
518 #[test]
519 fn send_message_reply_to_no_subject_omits_subject() {
520 let inbound = ChannelMessage::new("msg-003", "alice", "room-1", "ping", "slack", 0);
521 let reply = SendMessage::reply_to(&inbound, "pong");
522 assert!(reply.subject.is_none());
523 assert_eq!(reply.in_reply_to.as_deref(), Some("msg-003"));
524 }
525
526 #[test]
527 fn drop_self_messages_default_returns_false_when_handle_unknown() {
528 let channel = StubChannel { handle: None };
529 assert!(!channel.drop_self_messages(&msg_from("@anyone")));
530 }
531
532 #[test]
533 fn drop_self_messages_matches_exact_handle() {
534 let channel = StubChannel {
535 handle: Some("@my_bot".into()),
536 };
537 assert!(channel.drop_self_messages(&msg_from("@my_bot")));
538 assert!(!channel.drop_self_messages(&msg_from("@other_bot")));
539 }
540
541 #[test]
542 fn drop_self_messages_normalizes_at_prefix_and_case() {
543 let channel = StubChannel {
544 handle: Some("My_Bot".into()),
545 };
546 // SDK delivered with @ prefix, handle stored without. Match.
547 assert!(channel.drop_self_messages(&msg_from("@my_bot")));
548 // Both with @, mixed case. Match.
549 let channel = StubChannel {
550 handle: Some("@My_Bot".into()),
551 };
552 assert!(channel.drop_self_messages(&msg_from("@MY_BOT")));
553 }
554
555 #[test]
556 fn drop_self_messages_does_not_match_empty_handle() {
557 // A handle of "@" (effectively empty after normalization) must
558 // not match every inbound message; the guard only fires when
559 // the bot has a real handle to compare against.
560 let channel = StubChannel {
561 handle: Some("@".into()),
562 };
563 assert!(!channel.drop_self_messages(&msg_from("@anyone")));
564 }
565
566 #[test]
567 fn deny_with_edit_round_trips_through_serde() {
568 let r = ChannelApprovalResponse::DenyWithEdit {
569 replacement: "new content".to_string(),
570 };
571 let json = serde_json::to_string(&r).unwrap();
572 let back: ChannelApprovalResponse = serde_json::from_str(&json).unwrap();
573 assert!(
574 matches!(back, ChannelApprovalResponse::DenyWithEdit { replacement } if replacement == "new content")
575 );
576 }
577}