1use crate::ask_user::ChannelMapHandle;
9use async_trait::async_trait;
10use serde_json::json;
11use std::sync::Arc;
12use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
13use zeroclaw_api::tool::{Tool, ToolResult};
14use zeroclaw_config::policy::SecurityPolicy;
15use zeroclaw_config::policy::ToolOperation;
16
17const DEFAULT_TIMEOUT_SECS: u64 = 600;
18
19const VALID_URGENCY_LEVELS: &[&str] = &["low", "medium", "high", "critical"];
20
21pub struct EscalateToHumanTool {
23 security: Arc<SecurityPolicy>,
24 channel_map: ChannelMapHandle,
25 alert_channels: Vec<String>,
26}
27
28impl EscalateToHumanTool {
29 pub fn new(
30 security: Arc<SecurityPolicy>,
31 alert_channels: Vec<String>,
32 channel_map: ChannelMapHandle,
33 ) -> Self {
34 Self {
35 security,
36 channel_map,
37 alert_channels,
38 }
39 }
40
41 fn format_message(urgency: &str, summary: &str, context: Option<&str>) -> String {
43 let prefix = match urgency {
44 "low" => "\u{2139}\u{fe0f} [LOW]",
45 "high" => "\u{1f534} [HIGH]",
46 "critical" => "\u{1f6a8} [CRITICAL]",
47 _ => "\u{26a0}\u{fe0f} [MEDIUM]",
49 };
50
51 let mut lines = vec![
52 format!("{prefix} Agent Escalation"),
53 format!("Summary: {summary}"),
54 ];
55
56 if let Some(ctx) = context {
57 lines.push(format!("Context: {ctx}"));
58 }
59
60 lines.push("---".to_string());
61 lines.push("Reply to this message to respond.".to_string());
62
63 lines.join("\n")
64 }
65
66 async fn send_alerts(&self, text: &str) {
68 let targets: Vec<(String, Arc<dyn Channel>)> = {
70 let channels = self.channel_map.read();
71 self.alert_channels
72 .iter()
73 .filter_map(|name| {
74 if let Some(ch) = channels.get(name) {
75 Some((name.clone(), Arc::clone(ch)))
76 } else {
77 ::zeroclaw_log::record!(
78 WARN,
79 ::zeroclaw_log::Event::new(
80 module_path!(),
81 ::zeroclaw_log::Action::Note
82 )
83 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
84 .with_attrs(::serde_json::json!({"name": name})),
85 "escalate_to_human: alert channel '' not found in channel map"
86 );
87 None
88 }
89 })
90 .collect()
91 };
92 for (name, ch) in targets {
93 let msg = SendMessage::new(text, "");
94 if let Err(e) = ch.send(&msg).await {
95 ::zeroclaw_log::record!(
96 WARN,
97 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
98 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
99 .with_attrs(::serde_json::json!({"error": format!("{}", e), "name": name})),
100 "escalate_to_human: alert to channel '' failed"
101 );
102 }
103 }
104 }
105}
106
107#[async_trait]
108impl Tool for EscalateToHumanTool {
109 fn name(&self) -> &str {
110 "escalate_to_human"
111 }
112
113 fn description(&self) -> &str {
114 "Escalate a situation to a human operator with urgency routing. \
115 Sends a structured message to the active channel. High/critical urgency \
116 also notifies any channels listed in `[escalation] alert_channels`. \
117 Optionally blocks to wait for a human response."
118 }
119
120 fn parameters_schema(&self) -> serde_json::Value {
121 json!({
122 "type": "object",
123 "properties": {
124 "summary": {
125 "type": "string",
126 "description": "One-line escalation summary"
127 },
128 "context": {
129 "type": "string",
130 "description": "Detailed context for the human"
131 },
132 "urgency": {
133 "type": "string",
134 "enum": ["low", "medium", "high", "critical"],
135 "description": "Urgency level (default: medium). high/critical also notifies alert_channels."
136 },
137 "wait_for_response": {
138 "type": "boolean",
139 "description": "Block and return the human's reply (default: false)"
140 },
141 "timeout_secs": {
142 "type": "integer",
143 "description": "Seconds to wait for a response when wait_for_response is true (default: 600)"
144 }
145 },
146 "required": ["summary"]
147 })
148 }
149
150 async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
151 if let Err(e) = self
153 .security
154 .enforce_tool_operation(ToolOperation::Act, "escalate_to_human")
155 {
156 return Ok(ToolResult {
157 success: false,
158 output: String::new(),
159 error: Some(format!("Action blocked: {e}")),
160 });
161 }
162
163 let summary = args
165 .get("summary")
166 .and_then(|v| v.as_str())
167 .map(|s| s.trim())
168 .filter(|s| !s.is_empty())
169 .ok_or_else(|| {
170 ::zeroclaw_log::record!(
171 WARN,
172 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
173 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
174 .with_attrs(::serde_json::json!({"param": "summary"})),
175 "escalate: missing summary parameter"
176 );
177 anyhow::Error::msg("Missing 'summary' parameter")
178 })?
179 .to_string();
180
181 let context = args
182 .get("context")
183 .and_then(|v| v.as_str())
184 .map(|s| s.trim().to_string())
185 .filter(|s| !s.is_empty());
186
187 let urgency = args
188 .get("urgency")
189 .and_then(|v| v.as_str())
190 .unwrap_or("medium");
191
192 if !VALID_URGENCY_LEVELS.contains(&urgency) {
193 return Ok(ToolResult {
194 success: false,
195 output: String::new(),
196 error: Some(format!(
197 "Invalid urgency '{}'. Must be one of: {}",
198 urgency,
199 VALID_URGENCY_LEVELS.join(", ")
200 )),
201 });
202 }
203
204 let wait_for_response = args
205 .get("wait_for_response")
206 .and_then(|v| v.as_bool())
207 .unwrap_or(false);
208
209 let timeout_secs = args
210 .get("timeout_secs")
211 .and_then(|v| v.as_u64())
212 .unwrap_or(DEFAULT_TIMEOUT_SECS);
213
214 let text = Self::format_message(urgency, &summary, context.as_deref());
216
217 let (channel_name, channel): (String, Arc<dyn Channel>) = {
219 let channels = self.channel_map.read();
220 if channels.is_empty() {
221 return Ok(ToolResult {
222 success: false,
223 output: String::new(),
224 error: Some("No channels available yet (channels not initialized)".to_string()),
225 });
226 }
227 let (name, ch) = channels.iter().next().ok_or_else(|| {
228 ::zeroclaw_log::record!(
229 ERROR,
230 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
231 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
232 .with_attrs(::serde_json::json!({"missing": "channels"})),
233 "escalate: no channels configured"
234 );
235 anyhow::Error::msg("No channels available. Configure at least one channel.")
236 })?;
237 (name.clone(), ch.clone())
238 };
239
240 if wait_for_response && !channel.supports_free_form_ask() {
247 return Ok(ToolResult {
248 success: false,
249 output: String::new(),
250 error: Some(format!(
251 "Channel '{channel_name}' cannot receive a free-form reply, \
252 so `wait_for_response` is unsupported (awaits ACP elicitation RFD). \
253 Retry with `wait_for_response: false`."
254 )),
255 });
256 }
257
258 let msg = SendMessage::new(&text, "");
260 if let Err(e) = channel.send(&msg).await {
261 return Ok(ToolResult {
262 success: false,
263 output: String::new(),
264 error: Some(format!(
265 "Failed to send escalation to channel '{channel_name}': {e}"
266 )),
267 });
268 }
269
270 if (urgency == "high" || urgency == "critical") && !self.alert_channels.is_empty() {
272 self.send_alerts(&text).await;
273 }
274
275 if wait_for_response {
276 let (tx, mut rx) = tokio::sync::mpsc::channel::<ChannelMessage>(1);
278 let timeout = std::time::Duration::from_secs(timeout_secs);
279
280 let listen_channel = Arc::clone(&channel);
281 let listen_handle = tokio::spawn(async move { listen_channel.listen(tx).await });
282
283 let response = tokio::time::timeout(timeout, rx.recv()).await;
284 listen_handle.abort();
285
286 match response {
287 Ok(Some(msg)) => Ok(ToolResult {
288 success: true,
289 output: msg.content,
290 error: None,
291 }),
292 Ok(None) => Ok(ToolResult {
293 success: false,
294 output: "TIMEOUT".to_string(),
295 error: Some("Channel closed before receiving a response".to_string()),
296 }),
297 Err(_) => Ok(ToolResult {
298 success: false,
299 output: "TIMEOUT".to_string(),
300 error: Some(format!(
301 "No response received within {timeout_secs} seconds"
302 )),
303 }),
304 }
305 } else {
306 Ok(ToolResult {
308 success: true,
309 output: json!({
310 "status": "escalated",
311 "urgency": urgency,
312 "channel": channel_name,
313 })
314 .to_string(),
315 error: None,
316 })
317 }
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use parking_lot::RwLock;
325 use std::collections::HashMap;
326
327 struct SilentChannel {
329 channel_name: String,
330 sent: Arc<RwLock<Vec<String>>>,
331 }
332
333 impl SilentChannel {
334 fn new(name: &str) -> Self {
335 Self {
336 channel_name: name.to_string(),
337 sent: Arc::new(RwLock::new(Vec::new())),
338 }
339 }
340 }
341
342 impl ::zeroclaw_api::attribution::Attributable for SilentChannel {
343 fn role(&self) -> ::zeroclaw_api::attribution::Role {
344 ::zeroclaw_api::attribution::Role::Channel(
345 ::zeroclaw_api::attribution::ChannelKind::Webhook,
346 )
347 }
348 fn alias(&self) -> &str {
349 "test"
350 }
351 }
352
353 #[async_trait]
354 impl Channel for SilentChannel {
355 fn name(&self) -> &str {
356 &self.channel_name
357 }
358
359 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
360 self.sent.write().push(message.content.clone());
361 Ok(())
362 }
363
364 async fn listen(
365 &self,
366 _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
367 ) -> anyhow::Result<()> {
368 tokio::time::sleep(std::time::Duration::from_secs(600)).await;
370 Ok(())
371 }
372 }
373
374 struct RespondingChannel {
376 channel_name: String,
377 response: String,
378 sent: Arc<RwLock<Vec<String>>>,
379 }
380
381 impl RespondingChannel {
382 fn new(name: &str, response: &str) -> Self {
383 Self {
384 channel_name: name.to_string(),
385 response: response.to_string(),
386 sent: Arc::new(RwLock::new(Vec::new())),
387 }
388 }
389 }
390
391 impl ::zeroclaw_api::attribution::Attributable for RespondingChannel {
392 fn role(&self) -> ::zeroclaw_api::attribution::Role {
393 ::zeroclaw_api::attribution::Role::Channel(
394 ::zeroclaw_api::attribution::ChannelKind::Webhook,
395 )
396 }
397 fn alias(&self) -> &str {
398 "test"
399 }
400 }
401
402 #[async_trait]
403 impl Channel for RespondingChannel {
404 fn name(&self) -> &str {
405 &self.channel_name
406 }
407
408 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
409 self.sent.write().push(message.content.clone());
410 Ok(())
411 }
412
413 async fn listen(
414 &self,
415 tx: tokio::sync::mpsc::Sender<ChannelMessage>,
416 ) -> anyhow::Result<()> {
417 let msg = ChannelMessage {
418 id: "resp_1".to_string(),
419 sender: "human".to_string(),
420 reply_target: "human".to_string(),
421 content: self.response.clone(),
422 channel: self.channel_name.clone(),
423 channel_alias: None,
424 timestamp: 1000,
425 thread_ts: None,
426 interruption_scope_id: None,
427 attachments: vec![],
428 subject: None,
429 };
430 let _ = tx.send(msg).await;
431 Ok(())
432 }
433 }
434
435 fn make_tool_with_channels(channels: Vec<(&str, Arc<dyn Channel>)>) -> EscalateToHumanTool {
436 let tool = EscalateToHumanTool::new(
437 Arc::new(SecurityPolicy::default()),
438 vec![],
439 Arc::new(RwLock::new(HashMap::new())),
440 );
441 let map: HashMap<String, Arc<dyn Channel>> = channels
442 .into_iter()
443 .map(|(name, ch)| (name.to_string(), ch))
444 .collect();
445 *tool.channel_map.write() = map;
446 tool
447 }
448
449 #[test]
452 fn test_tool_metadata() {
453 let tool = EscalateToHumanTool::new(
454 Arc::new(SecurityPolicy::default()),
455 vec![],
456 Arc::new(RwLock::new(HashMap::new())),
457 );
458 assert_eq!(tool.name(), "escalate_to_human");
459 assert!(!tool.description().is_empty());
460 assert!(tool.description().to_lowercase().contains("escalat"));
461 }
462
463 #[test]
466 fn test_parameters_schema() {
467 let tool = EscalateToHumanTool::new(
468 Arc::new(SecurityPolicy::default()),
469 vec![],
470 Arc::new(RwLock::new(HashMap::new())),
471 );
472 let schema = tool.parameters_schema();
473 assert_eq!(schema["type"], "object");
474 assert!(schema["properties"]["summary"].is_object());
475 assert!(schema["properties"]["urgency"].is_object());
476 assert!(schema["properties"]["context"].is_object());
477 assert!(schema["properties"]["wait_for_response"].is_object());
478 assert!(schema["properties"]["timeout_secs"].is_object());
479 let required = schema["required"].as_array().unwrap();
480 assert!(required.iter().any(|v| v == "summary"));
481 assert!(!required.iter().any(|v| v == "urgency"));
483 assert!(!required.iter().any(|v| v == "context"));
484 assert!(!required.iter().any(|v| v == "wait_for_response"));
485 assert!(!required.iter().any(|v| v == "timeout_secs"));
486 }
487
488 #[tokio::test]
491 async fn test_default_urgency_is_medium() {
492 let channel = Arc::new(SilentChannel::new("test"));
493 let sent = Arc::clone(&channel.sent);
494 let tool = make_tool_with_channels(vec![("test", channel as Arc<dyn Channel>)]);
495
496 let result = tool
497 .execute(json!({ "summary": "Need help" }))
498 .await
499 .unwrap();
500
501 assert!(result.success, "error: {:?}", result.error);
502 assert!(result.output.contains("\"medium\""));
504 let messages = sent.read();
506 assert!(!messages.is_empty());
507 assert!(messages[0].contains("[MEDIUM]"));
508 }
509
510 #[test]
513 fn test_message_format_low() {
514 let msg = EscalateToHumanTool::format_message("low", "Disk space low", None);
515 assert!(msg.starts_with("\u{2139}\u{fe0f} [LOW]"));
516 assert!(msg.contains("Summary: Disk space low"));
517 assert!(msg.contains("Reply to this message to respond."));
518 }
519
520 #[test]
523 fn test_message_format_critical() {
524 let msg = EscalateToHumanTool::format_message(
525 "critical",
526 "Production down",
527 Some("Database unreachable for 5 minutes"),
528 );
529 assert!(msg.starts_with("\u{1f6a8} [CRITICAL]"));
530 assert!(msg.contains("Summary: Production down"));
531 assert!(msg.contains("Context: Database unreachable for 5 minutes"));
532 }
533
534 #[tokio::test]
537 async fn test_invalid_urgency_rejected() {
538 let tool = make_tool_with_channels(vec![(
539 "test",
540 Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
541 )]);
542
543 let result = tool
544 .execute(json!({ "summary": "Help", "urgency": "extreme" }))
545 .await
546 .unwrap();
547
548 assert!(!result.success);
549 assert!(result.error.as_deref().unwrap().contains("Invalid urgency"));
550 assert!(result.error.as_deref().unwrap().contains("extreme"));
551 }
552
553 #[tokio::test]
556 async fn test_non_blocking_returns_status() {
557 let tool = make_tool_with_channels(vec![(
558 "slack",
559 Arc::new(SilentChannel::new("slack")) as Arc<dyn Channel>,
560 )]);
561
562 let result = tool
563 .execute(json!({
564 "summary": "Need approval",
565 "urgency": "low"
566 }))
567 .await
568 .unwrap();
569
570 assert!(result.success, "error: {:?}", result.error);
571 let parsed: serde_json::Value = serde_json::from_str(&result.output).unwrap();
572 assert_eq!(parsed["status"], "escalated");
573 assert_eq!(parsed["urgency"], "low");
574 assert_eq!(parsed["channel"], "slack");
575 }
576
577 #[tokio::test]
580 async fn test_blocking_mode_returns_response() {
581 let tool = make_tool_with_channels(vec![(
582 "test",
583 Arc::new(RespondingChannel::new("test", "Approved, go ahead")) as Arc<dyn Channel>,
584 )]);
585
586 let result = tool
587 .execute(json!({
588 "summary": "Need deployment approval",
589 "wait_for_response": true,
590 "timeout_secs": 5
591 }))
592 .await
593 .unwrap();
594
595 assert!(result.success, "error: {:?}", result.error);
596 assert_eq!(result.output, "Approved, go ahead");
597 }
598
599 #[tokio::test]
602 async fn test_blocking_mode_timeout() {
603 let tool = make_tool_with_channels(vec![(
604 "test",
605 Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
606 )]);
607
608 let result = tool
609 .execute(json!({
610 "summary": "Waiting for response",
611 "wait_for_response": true,
612 "timeout_secs": 1
613 }))
614 .await
615 .unwrap();
616
617 assert!(!result.success);
618 assert_eq!(result.output, "TIMEOUT");
619 assert!(result.error.as_deref().unwrap().contains("1 seconds"));
620 }
621
622 struct StructuredOnlyChannel {
625 channel_name: String,
626 sent: Arc<RwLock<Vec<String>>>,
627 }
628
629 impl StructuredOnlyChannel {
630 fn new(name: &str) -> Self {
631 Self {
632 channel_name: name.to_string(),
633 sent: Arc::new(RwLock::new(Vec::new())),
634 }
635 }
636 }
637
638 impl ::zeroclaw_api::attribution::Attributable for StructuredOnlyChannel {
639 fn role(&self) -> ::zeroclaw_api::attribution::Role {
640 ::zeroclaw_api::attribution::Role::Channel(
641 ::zeroclaw_api::attribution::ChannelKind::Webhook,
642 )
643 }
644 fn alias(&self) -> &str {
645 "test"
646 }
647 }
648
649 #[async_trait]
650 impl Channel for StructuredOnlyChannel {
651 fn name(&self) -> &str {
652 &self.channel_name
653 }
654
655 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
656 self.sent.write().push(message.content.clone());
657 Ok(())
658 }
659
660 async fn listen(
661 &self,
662 _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
663 ) -> anyhow::Result<()> {
664 anyhow::bail!("listen not supported")
665 }
666
667 fn supports_free_form_ask(&self) -> bool {
668 false
669 }
670 }
671
672 #[tokio::test]
673 async fn wait_for_response_fails_fast_on_structured_only_channel() {
674 let stub = Arc::new(StructuredOnlyChannel::new("acp"));
677 let stub_clone: Arc<dyn Channel> = stub.clone();
678 let tool = make_tool_with_channels(vec![("acp", stub_clone)]);
679
680 let started = std::time::Instant::now();
681 let result = tool
682 .execute(json!({
683 "summary": "Need confirmation",
684 "wait_for_response": true,
685 "timeout_secs": 30,
686 }))
687 .await
688 .unwrap();
689 let elapsed = started.elapsed();
690
691 assert!(!result.success, "expected failure, got: {:?}", result);
692 let err = result.error.unwrap_or_default();
693 assert!(
694 err.contains("wait_for_response"),
695 "error should mention wait_for_response: {err}"
696 );
697 assert!(
699 elapsed < std::time::Duration::from_secs(2),
700 "expected fast-fail; took {elapsed:?}"
701 );
702 assert!(stub.sent.read().is_empty());
704 }
705
706 #[tokio::test]
707 async fn non_blocking_works_on_structured_only_channel() {
708 let stub = Arc::new(StructuredOnlyChannel::new("acp"));
711 let stub_clone: Arc<dyn Channel> = stub.clone();
712 let tool = make_tool_with_channels(vec![("acp", stub_clone)]);
713
714 let result = tool
715 .execute(json!({
716 "summary": "FYI: deploy started",
717 "urgency": "low",
718 }))
719 .await
720 .unwrap();
721
722 assert!(result.success, "error: {:?}", result.error);
723 assert_eq!(stub.sent.read().len(), 1);
724 }
725
726 #[tokio::test]
729 async fn test_high_urgency_succeeds_without_alert_channels() {
730 let tool = make_tool_with_channels(vec![(
732 "test",
733 Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
734 )]);
735
736 let result = tool
737 .execute(json!({
738 "summary": "Critical alert",
739 "urgency": "high"
740 }))
741 .await
742 .unwrap();
743
744 assert!(result.success, "error: {:?}", result.error);
745 let parsed: serde_json::Value = serde_json::from_str(&result.output).unwrap();
746 assert_eq!(parsed["status"], "escalated");
747 assert_eq!(parsed["urgency"], "high");
748 }
749}