Skip to main content

zeroclaw_runtime/tools/
send_message_to_peer.rs

1//! Agent-loop tool that sends a message to a configured peer on a
2//! shared channel.
3//!
4//! Validates the target against [`crate::peers::ResolvedPeers`] for
5//! the calling agent on the requested channel: peers must mutually
6//! opt in via a `[peer_groups.<name>]` block whose `agents` lists
7//! both, OR appear on the group's `external_peers` list, before this
8//! tool will deliver. Cross-channel sends from outside the resolver's
9//! authorization surface are rejected.
10//!
11//! Delivery splits by target type:
12//!
13//! - **Agent-alias targets** route in-process via
14//!   [`crate::agent::loop_::process_message`]: alpha calls
15//!   `send_message_to_peer(target = "beta", ...)` and beta's agent
16//!   loop runs the message. The two agents share the channel's bot
17//!   identity, so an outbound to the channel would loop the bot's
18//!   own handle back through inbound; the in-process path avoids
19//!   that and lets the orchestrator deliver beta's reply (if any)
20//!   through the same channel beta is configured on.
21//!
22//!   This path is fire-and-forget: the recipient runs on a detached
23//!   `tokio::spawn`, so the sender's `ToolResult.success = true`
24//!   means "accepted for processing", not "completed". Recipient
25//!   errors do NOT surface to the sender; they are emitted via
26//!   `tracing::warn!` inside the spawned task and via the recipient
27//!   agent's own observability (audit log, runtime trace, channel
28//!   reply). Observers diagnosing a missing peer message should look
29//!   at the recipient's spans, not the sender's tool output.
30//! - **External peers** (humans, external bots) route through
31//!   [`crate::cron::scheduler::deliver_announcement`] with the
32//!   external username as the platform target. The channel registry
33//!   the binary registers at startup forwards the send to the live
34//!   channel instance. This path is synchronous: the
35//!   `deliver_announcement` future resolves before the tool returns,
36//!   so a `success = false` here genuinely reflects a delivery
37//!   failure.
38
39use crate::cron::scheduler::deliver_announcement;
40use crate::peers::resolve_peer_set;
41use anyhow::Result;
42use async_trait::async_trait;
43use serde_json::json;
44use std::sync::Arc;
45use zeroclaw_api::tool::{Tool, ToolResult};
46use zeroclaw_config::schema::Config;
47
48/// Send a message to a peer on a shared channel. Bound to a single
49/// calling agent's alias; the tool validates every send against that
50/// agent's resolved peer set.
51pub struct SendMessageToPeerTool {
52    config: Arc<Config>,
53    sender_alias: String,
54}
55
56impl SendMessageToPeerTool {
57    pub fn new(config: Arc<Config>, sender_alias: impl Into<String>) -> Self {
58        Self {
59            config,
60            sender_alias: sender_alias.into(),
61        }
62    }
63}
64
65#[async_trait]
66impl Tool for SendMessageToPeerTool {
67    fn name(&self) -> &str {
68        "send_message_to_peer"
69    }
70
71    fn description(&self) -> &str {
72        "Send a message to a peer agent or external peer (human, external bot) \
73         on a shared channel. The target must be a member of a peer group both \
74         this agent and the target agree on (or an external peer listed on the \
75         shared group's `external_peers`). Cross-agent sends to non-peers are \
76         rejected at the tool boundary; the channel send only happens after \
77         the peer-set check passes."
78    }
79
80    fn parameters_schema(&self) -> serde_json::Value {
81        json!({
82            "type": "object",
83            "properties": {
84                "channel": {
85                    "type": "string",
86                    "description": "Channel ref to deliver on (e.g. 'telegram.prod'). Must be one of the agent's configured channels and a channel the target peer also listens on."
87                },
88                "target": {
89                    "type": "string",
90                    "description": "Recipient identifier — a peer agent's alias or an external peer's username (e.g. '@operator')."
91                },
92                "message": {
93                    "type": "string",
94                    "description": "The message body to deliver."
95                }
96            },
97            "required": ["channel", "target", "message"]
98        })
99    }
100
101    async fn execute(&self, args: serde_json::Value) -> Result<ToolResult> {
102        let channel = args
103            .get("channel")
104            .and_then(|v| v.as_str())
105            .map(str::trim)
106            .filter(|v| !v.is_empty())
107            .ok_or_else(|| {
108                ::zeroclaw_log::record!(
109                    WARN,
110                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
111                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
112                        .with_attrs(::serde_json::json!({"param": "channel"})),
113                    "tool argument validation failed"
114                );
115
116                anyhow::Error::msg("Missing or empty 'channel' parameter")
117            })?
118            .to_string();
119        let target = args
120            .get("target")
121            .and_then(|v| v.as_str())
122            .map(str::trim)
123            .filter(|v| !v.is_empty())
124            .ok_or_else(|| {
125                ::zeroclaw_log::record!(
126                    WARN,
127                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
128                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
129                        .with_attrs(::serde_json::json!({"param": "target"})),
130                    "tool argument validation failed"
131                );
132
133                anyhow::Error::msg("Missing or empty 'target' parameter")
134            })?
135            .to_string();
136        let message = args
137            .get("message")
138            .and_then(|v| v.as_str())
139            .map(str::trim)
140            .filter(|v| !v.is_empty())
141            .ok_or_else(|| {
142                ::zeroclaw_log::record!(
143                    WARN,
144                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
145                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
146                        .with_attrs(::serde_json::json!({"param": "message"})),
147                    "tool argument validation failed"
148                );
149
150                anyhow::Error::msg("Missing or empty 'message' parameter")
151            })?
152            .to_string();
153
154        let fallback_channel_type = channel.split_once('.').map(|(t, _)| t);
155        let resolved = resolve_peer_set(&self.config, &self.sender_alias);
156
157        if !resolved.is_known_peer(&channel, &target)
158            && !fallback_channel_type
159                .is_some_and(|channel_type| resolved.is_known_peer(channel_type, &target))
160        {
161            return Ok(ToolResult {
162                success: false,
163                output: String::new(),
164                error: Some(format!(
165                    "target {target:?} is not on agent {alias:?}'s resolved peer set for channel {channel:?}; \
166                     add a [peer_groups.<name>] entry that lists both this agent and the target before sending",
167                    alias = self.sender_alias,
168                )),
169            });
170        }
171
172        // The agent must itself listen on the channel — the target may
173        // be reachable on it via a peer group, but a sender can't
174        // dispatch on a channel it isn't configured for.
175        let agent_listens_on_channel = self
176            .config
177            .agents
178            .get(&self.sender_alias)
179            .map(|a| a.channels.iter().any(|c| c.as_str() == channel.as_str()))
180            .unwrap_or(false);
181        if !agent_listens_on_channel {
182            return Ok(ToolResult {
183                success: false,
184                output: String::new(),
185                error: Some(format!(
186                    "agent {alias:?} does not list channel {channel:?} on its `channels`; \
187                     add the channel ref to [agents.{alias}.channels] before sending",
188                    alias = self.sender_alias,
189                )),
190            });
191        }
192
193        // Agent-alias targets route in-process. The channel's bot
194        // identity is shared between alpha and beta, so an outbound
195        // to the channel would loop right back into inbound and the
196        // self-loop guard would drop it. Agent-to-agent messaging is
197        // process-internal by design; the channel registry only sees
198        // sends with external recipients.
199        let target_norm = target.trim_start_matches('@').to_ascii_lowercase();
200        let target_is_agent = self
201            .config
202            .agents
203            .keys()
204            .any(|alias| alias.to_ascii_lowercase() == target_norm);
205
206        if target_is_agent {
207            // The target's resolved alias may differ in case from the
208            // raw input ("@Beta" -> "beta"). Look up the canonical
209            // alias once so the agent loop's `agent_alias` field
210            // matches the [agents.<alias>] config key.
211            let canonical = self
212                .config
213                .agents
214                .keys()
215                .find(|alias| alias.to_ascii_lowercase() == target_norm)
216                .cloned()
217                .unwrap_or_else(|| target.clone());
218
219            // Fire-and-forget: agent-to-agent peer messages do not
220            // synchronously block the sender on the recipient's full
221            // turn (that's what the SubAgent surface is for). The
222            // recipient processes on its own event loop and surfaces
223            // its result via its own observability.
224            let cfg = (*self.config).clone();
225            let sender = self.sender_alias.clone();
226            let recipient_alias = canonical.clone();
227            let body = message.clone();
228            tokio::spawn(async move {
229                if let Err(e) =
230                    crate::agent::loop_::process_message(cfg, &recipient_alias, &body, None).await
231                {
232                    ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"sender": sender, "recipient": recipient_alias, "error": format!("{}", e)})), "peer-message in-process delivery failed");
233                }
234            });
235
236            return Ok(ToolResult {
237                success: true,
238                output: format!(
239                    "accepted for in-process delivery to peer agent {canonical:?} (recipient runs detached; observe its agent loop for the actual outcome)"
240                ),
241                error: None,
242            });
243        }
244
245        match deliver_announcement(&self.config, &channel, &target, None, &message).await {
246            Ok(()) => Ok(ToolResult {
247                success: true,
248                output: format!("delivered to external peer {target:?} on {channel}"),
249                error: None,
250            }),
251            Err(e) => Ok(ToolResult {
252                success: false,
253                output: String::new(),
254                error: Some(format!("delivery failed: {e:#}")),
255            }),
256        }
257    }
258}