1use crate::ask_user::ChannelMapHandle;
9use async_trait::async_trait;
10use serde_json::json;
11use std::sync::Arc;
12use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
13use zeroclaw_api::tool::{Tool, ToolResult};
14use zeroclaw_config::policy::SecurityPolicy;
15use zeroclaw_config::policy::ToolOperation;
16
17const DEFAULT_TIMEOUT_SECS: u64 = 600;
18
19const VALID_URGENCY_LEVELS: &[&str] = &["low", "medium", "high", "critical"];
20
21pub struct EscalateToHumanTool {
23 security: Arc<SecurityPolicy>,
24 channel_map: ChannelMapHandle,
25 alert_channels: Vec<String>,
26}
27
28impl EscalateToHumanTool {
29 pub fn new(
30 security: Arc<SecurityPolicy>,
31 alert_channels: Vec<String>,
32 channel_map: ChannelMapHandle,
33 ) -> Self {
34 Self {
35 security,
36 channel_map,
37 alert_channels,
38 }
39 }
40
41 fn format_message(urgency: &str, summary: &str, context: Option<&str>) -> String {
43 let prefix = match urgency {
44 "low" => "\u{2139}\u{fe0f} [LOW]",
45 "high" => "\u{1f534} [HIGH]",
46 "critical" => "\u{1f6a8} [CRITICAL]",
47 _ => "\u{26a0}\u{fe0f} [MEDIUM]",
49 };
50
51 let mut lines = vec![
52 format!("{prefix} Agent Escalation"),
53 format!("Summary: {summary}"),
54 ];
55
56 if let Some(ctx) = context {
57 lines.push(format!("Context: {ctx}"));
58 }
59
60 lines.push("---".to_string());
61 lines.push("Reply to this message to respond.".to_string());
62
63 lines.join("\n")
64 }
65
66 async fn send_alerts(&self, text: &str) {
68 let targets: Vec<(String, Arc<dyn Channel>)> = {
70 let channels = self.channel_map.read();
71 self.alert_channels
72 .iter()
73 .filter_map(|name| {
74 if let Some(ch) = channels.get(name) {
75 Some((name.clone(), Arc::clone(ch)))
76 } else {
77 ::zeroclaw_log::record!(
78 WARN,
79 ::zeroclaw_log::Event::new(
80 module_path!(),
81 ::zeroclaw_log::Action::Note
82 )
83 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
84 .with_attrs(::serde_json::json!({"name": name})),
85 "escalate_to_human: alert channel '' not found in channel map"
86 );
87 None
88 }
89 })
90 .collect()
91 };
92 for (name, ch) in targets {
93 let msg = SendMessage::new(text, "");
94 if let Err(e) = ch.send(&msg).await {
95 ::zeroclaw_log::record!(
96 WARN,
97 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
98 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
99 .with_attrs(::serde_json::json!({"error": format!("{}", e), "name": name})),
100 "escalate_to_human: alert to channel '' failed"
101 );
102 }
103 }
104 }
105}
106
107#[async_trait]
108impl Tool for EscalateToHumanTool {
109 fn name(&self) -> &str {
110 "escalate_to_human"
111 }
112
113 fn description(&self) -> &str {
114 "Escalate a situation to a human operator with urgency routing. \
115 Sends a structured message to the active channel. High/critical urgency \
116 also notifies any channels listed in `[escalation] alert_channels`. \
117 Optionally blocks to wait for a human response."
118 }
119
120 fn parameters_schema(&self) -> serde_json::Value {
121 json!({
122 "type": "object",
123 "properties": {
124 "summary": {
125 "type": "string",
126 "description": "One-line escalation summary"
127 },
128 "context": {
129 "type": "string",
130 "description": "Detailed context for the human"
131 },
132 "urgency": {
133 "type": "string",
134 "enum": ["low", "medium", "high", "critical"],
135 "description": "Urgency level (default: medium). high/critical also notifies alert_channels."
136 },
137 "wait_for_response": {
138 "type": "boolean",
139 "description": "Block and return the human's reply (default: false)"
140 },
141 "timeout_secs": {
142 "type": "integer",
143 "description": "Seconds to wait for a response when wait_for_response is true (default: 600)"
144 }
145 },
146 "required": ["summary"]
147 })
148 }
149
150 async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
151 if let Err(e) = self
153 .security
154 .enforce_tool_operation(ToolOperation::Act, "escalate_to_human")
155 {
156 return Ok(ToolResult {
157 success: false,
158 output: String::new(),
159 error: Some(format!("Action blocked: {e}")),
160 });
161 }
162
163 let summary = args
165 .get("summary")
166 .and_then(|v| v.as_str())
167 .map(|s| s.trim())
168 .filter(|s| !s.is_empty())
169 .ok_or_else(|| {
170 ::zeroclaw_log::record!(
171 WARN,
172 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
173 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
174 .with_attrs(::serde_json::json!({"param": "summary"})),
175 "escalate: missing summary parameter"
176 );
177 anyhow::Error::msg("Missing 'summary' parameter")
178 })?
179 .to_string();
180
181 let context = args
182 .get("context")
183 .and_then(|v| v.as_str())
184 .map(|s| s.trim().to_string())
185 .filter(|s| !s.is_empty());
186
187 let urgency = args
188 .get("urgency")
189 .and_then(|v| v.as_str())
190 .unwrap_or("medium");
191
192 if !VALID_URGENCY_LEVELS.contains(&urgency) {
193 return Ok(ToolResult {
194 success: false,
195 output: String::new(),
196 error: Some(format!(
197 "Invalid urgency '{}'. Must be one of: {}",
198 urgency,
199 VALID_URGENCY_LEVELS.join(", ")
200 )),
201 });
202 }
203
204 let wait_for_response = args
205 .get("wait_for_response")
206 .and_then(|v| v.as_bool())
207 .unwrap_or(false);
208
209 let timeout_secs = args
210 .get("timeout_secs")
211 .and_then(|v| v.as_u64())
212 .unwrap_or(DEFAULT_TIMEOUT_SECS);
213
214 let text = Self::format_message(urgency, &summary, context.as_deref());
216
217 let (channel_name, channel): (String, Arc<dyn Channel>) = {
219 let channels = self.channel_map.read();
220 if channels.is_empty() {
221 return Ok(ToolResult {
222 success: false,
223 output: String::new(),
224 error: Some("No channels available yet (channels not initialized)".to_string()),
225 });
226 }
227 let (name, ch) = channels.iter().next().ok_or_else(|| {
228 ::zeroclaw_log::record!(
229 ERROR,
230 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
231 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
232 .with_attrs(::serde_json::json!({"missing": "channels"})),
233 "escalate: no channels configured"
234 );
235 anyhow::Error::msg("No channels available. Configure at least one channel.")
236 })?;
237 (name.clone(), ch.clone())
238 };
239
240 if wait_for_response && !channel.supports_free_form_ask() {
247 return Ok(ToolResult {
248 success: false,
249 output: String::new(),
250 error: Some(format!(
251 "Channel '{channel_name}' cannot receive a free-form reply, \
252 so `wait_for_response` is unsupported (awaits ACP elicitation RFD). \
253 Retry with `wait_for_response: false`."
254 )),
255 });
256 }
257
258 let msg = SendMessage::new(&text, "");
260 if let Err(e) = channel.send(&msg).await {
261 return Ok(ToolResult {
262 success: false,
263 output: String::new(),
264 error: Some(format!(
265 "Failed to send escalation to channel '{channel_name}': {e}"
266 )),
267 });
268 }
269
270 if (urgency == "high" || urgency == "critical") && !self.alert_channels.is_empty() {
272 self.send_alerts(&text).await;
273 }
274
275 if wait_for_response {
276 let (tx, mut rx) = tokio::sync::mpsc::channel::<ChannelMessage>(1);
278 let timeout = std::time::Duration::from_secs(timeout_secs);
279
280 let listen_channel = Arc::clone(&channel);
281 let listen_handle =
282 zeroclaw_spawn::spawn!(async move { listen_channel.listen(tx).await });
283
284 let response = tokio::time::timeout(timeout, rx.recv()).await;
285 listen_handle.abort();
286
287 match response {
288 Ok(Some(msg)) => Ok(ToolResult {
289 success: true,
290 output: msg.content,
291 error: None,
292 }),
293 Ok(None) => Ok(ToolResult {
294 success: false,
295 output: "TIMEOUT".to_string(),
296 error: Some("Channel closed before receiving a response".to_string()),
297 }),
298 Err(_) => Ok(ToolResult {
299 success: false,
300 output: "TIMEOUT".to_string(),
301 error: Some(format!(
302 "No response received within {timeout_secs} seconds"
303 )),
304 }),
305 }
306 } else {
307 Ok(ToolResult {
309 success: true,
310 output: json!({
311 "status": "escalated",
312 "urgency": urgency,
313 "channel": channel_name,
314 })
315 .to_string(),
316 error: None,
317 })
318 }
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use parking_lot::RwLock;
326 use std::collections::HashMap;
327
328 struct SilentChannel {
330 channel_name: String,
331 sent: Arc<RwLock<Vec<String>>>,
332 }
333
334 impl SilentChannel {
335 fn new(name: &str) -> Self {
336 Self {
337 channel_name: name.to_string(),
338 sent: Arc::new(RwLock::new(Vec::new())),
339 }
340 }
341 }
342
343 impl ::zeroclaw_api::attribution::Attributable for SilentChannel {
344 fn role(&self) -> ::zeroclaw_api::attribution::Role {
345 ::zeroclaw_api::attribution::Role::Channel(
346 ::zeroclaw_api::attribution::ChannelKind::Webhook,
347 )
348 }
349 fn alias(&self) -> &str {
350 "test"
351 }
352 }
353
354 #[async_trait]
355 impl Channel for SilentChannel {
356 fn name(&self) -> &str {
357 &self.channel_name
358 }
359
360 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
361 self.sent.write().push(message.content.clone());
362 Ok(())
363 }
364
365 async fn listen(
366 &self,
367 _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
368 ) -> anyhow::Result<()> {
369 tokio::time::sleep(std::time::Duration::from_secs(600)).await;
371 Ok(())
372 }
373 }
374
375 struct RespondingChannel {
377 channel_name: String,
378 response: String,
379 sent: Arc<RwLock<Vec<String>>>,
380 }
381
382 impl RespondingChannel {
383 fn new(name: &str, response: &str) -> Self {
384 Self {
385 channel_name: name.to_string(),
386 response: response.to_string(),
387 sent: Arc::new(RwLock::new(Vec::new())),
388 }
389 }
390 }
391
392 impl ::zeroclaw_api::attribution::Attributable for RespondingChannel {
393 fn role(&self) -> ::zeroclaw_api::attribution::Role {
394 ::zeroclaw_api::attribution::Role::Channel(
395 ::zeroclaw_api::attribution::ChannelKind::Webhook,
396 )
397 }
398 fn alias(&self) -> &str {
399 "test"
400 }
401 }
402
403 #[async_trait]
404 impl Channel for RespondingChannel {
405 fn name(&self) -> &str {
406 &self.channel_name
407 }
408
409 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
410 self.sent.write().push(message.content.clone());
411 Ok(())
412 }
413
414 async fn listen(
415 &self,
416 tx: tokio::sync::mpsc::Sender<ChannelMessage>,
417 ) -> anyhow::Result<()> {
418 let msg = ChannelMessage {
419 id: "resp_1".to_string(),
420 sender: "human".to_string(),
421 reply_target: "human".to_string(),
422 content: self.response.clone(),
423 channel: self.channel_name.clone(),
424 channel_alias: None,
425 timestamp: 1000,
426 thread_ts: None,
427 interruption_scope_id: None,
428 attachments: vec![],
429 subject: None,
430 };
431 let _ = tx.send(msg).await;
432 Ok(())
433 }
434 }
435
436 fn make_tool_with_channels(channels: Vec<(&str, Arc<dyn Channel>)>) -> EscalateToHumanTool {
437 let tool = EscalateToHumanTool::new(
438 Arc::new(SecurityPolicy::default()),
439 vec![],
440 Arc::new(RwLock::new(HashMap::new())),
441 );
442 let map: HashMap<String, Arc<dyn Channel>> = channels
443 .into_iter()
444 .map(|(name, ch)| (name.to_string(), ch))
445 .collect();
446 *tool.channel_map.write() = map;
447 tool
448 }
449
450 #[test]
453 fn test_tool_metadata() {
454 let tool = EscalateToHumanTool::new(
455 Arc::new(SecurityPolicy::default()),
456 vec![],
457 Arc::new(RwLock::new(HashMap::new())),
458 );
459 assert_eq!(tool.name(), "escalate_to_human");
460 assert!(!tool.description().is_empty());
461 assert!(tool.description().to_lowercase().contains("escalat"));
462 }
463
464 #[test]
467 fn test_parameters_schema() {
468 let tool = EscalateToHumanTool::new(
469 Arc::new(SecurityPolicy::default()),
470 vec![],
471 Arc::new(RwLock::new(HashMap::new())),
472 );
473 let schema = tool.parameters_schema();
474 assert_eq!(schema["type"], "object");
475 assert!(schema["properties"]["summary"].is_object());
476 assert!(schema["properties"]["urgency"].is_object());
477 assert!(schema["properties"]["context"].is_object());
478 assert!(schema["properties"]["wait_for_response"].is_object());
479 assert!(schema["properties"]["timeout_secs"].is_object());
480 let required = schema["required"].as_array().unwrap();
481 assert!(required.iter().any(|v| v == "summary"));
482 assert!(!required.iter().any(|v| v == "urgency"));
484 assert!(!required.iter().any(|v| v == "context"));
485 assert!(!required.iter().any(|v| v == "wait_for_response"));
486 assert!(!required.iter().any(|v| v == "timeout_secs"));
487 }
488
489 #[tokio::test]
492 async fn test_default_urgency_is_medium() {
493 let channel = Arc::new(SilentChannel::new("test"));
494 let sent = Arc::clone(&channel.sent);
495 let tool = make_tool_with_channels(vec![("test", channel as Arc<dyn Channel>)]);
496
497 let result = tool
498 .execute(json!({ "summary": "Need help" }))
499 .await
500 .unwrap();
501
502 assert!(result.success, "error: {:?}", result.error);
503 assert!(result.output.contains("\"medium\""));
505 let messages = sent.read();
507 assert!(!messages.is_empty());
508 assert!(messages[0].contains("[MEDIUM]"));
509 }
510
511 #[test]
514 fn test_message_format_low() {
515 let msg = EscalateToHumanTool::format_message("low", "Disk space low", None);
516 assert!(msg.starts_with("\u{2139}\u{fe0f} [LOW]"));
517 assert!(msg.contains("Summary: Disk space low"));
518 assert!(msg.contains("Reply to this message to respond."));
519 }
520
521 #[test]
524 fn test_message_format_critical() {
525 let msg = EscalateToHumanTool::format_message(
526 "critical",
527 "Production down",
528 Some("Database unreachable for 5 minutes"),
529 );
530 assert!(msg.starts_with("\u{1f6a8} [CRITICAL]"));
531 assert!(msg.contains("Summary: Production down"));
532 assert!(msg.contains("Context: Database unreachable for 5 minutes"));
533 }
534
535 #[tokio::test]
538 async fn test_invalid_urgency_rejected() {
539 let tool = make_tool_with_channels(vec![(
540 "test",
541 Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
542 )]);
543
544 let result = tool
545 .execute(json!({ "summary": "Help", "urgency": "extreme" }))
546 .await
547 .unwrap();
548
549 assert!(!result.success);
550 assert!(result.error.as_deref().unwrap().contains("Invalid urgency"));
551 assert!(result.error.as_deref().unwrap().contains("extreme"));
552 }
553
554 #[tokio::test]
557 async fn test_non_blocking_returns_status() {
558 let tool = make_tool_with_channels(vec![(
559 "slack",
560 Arc::new(SilentChannel::new("slack")) as Arc<dyn Channel>,
561 )]);
562
563 let result = tool
564 .execute(json!({
565 "summary": "Need approval",
566 "urgency": "low"
567 }))
568 .await
569 .unwrap();
570
571 assert!(result.success, "error: {:?}", result.error);
572 let parsed: serde_json::Value = serde_json::from_str(&result.output).unwrap();
573 assert_eq!(parsed["status"], "escalated");
574 assert_eq!(parsed["urgency"], "low");
575 assert_eq!(parsed["channel"], "slack");
576 }
577
578 #[tokio::test]
581 async fn test_blocking_mode_returns_response() {
582 let tool = make_tool_with_channels(vec![(
583 "test",
584 Arc::new(RespondingChannel::new("test", "Approved, go ahead")) as Arc<dyn Channel>,
585 )]);
586
587 let result = tool
588 .execute(json!({
589 "summary": "Need deployment approval",
590 "wait_for_response": true,
591 "timeout_secs": 5
592 }))
593 .await
594 .unwrap();
595
596 assert!(result.success, "error: {:?}", result.error);
597 assert_eq!(result.output, "Approved, go ahead");
598 }
599
600 #[tokio::test]
603 async fn test_blocking_mode_timeout() {
604 let tool = make_tool_with_channels(vec![(
605 "test",
606 Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
607 )]);
608
609 let result = tool
610 .execute(json!({
611 "summary": "Waiting for response",
612 "wait_for_response": true,
613 "timeout_secs": 1
614 }))
615 .await
616 .unwrap();
617
618 assert!(!result.success);
619 assert_eq!(result.output, "TIMEOUT");
620 assert!(result.error.as_deref().unwrap().contains("1 seconds"));
621 }
622
623 struct StructuredOnlyChannel {
626 channel_name: String,
627 sent: Arc<RwLock<Vec<String>>>,
628 }
629
630 impl StructuredOnlyChannel {
631 fn new(name: &str) -> Self {
632 Self {
633 channel_name: name.to_string(),
634 sent: Arc::new(RwLock::new(Vec::new())),
635 }
636 }
637 }
638
639 impl ::zeroclaw_api::attribution::Attributable for StructuredOnlyChannel {
640 fn role(&self) -> ::zeroclaw_api::attribution::Role {
641 ::zeroclaw_api::attribution::Role::Channel(
642 ::zeroclaw_api::attribution::ChannelKind::Webhook,
643 )
644 }
645 fn alias(&self) -> &str {
646 "test"
647 }
648 }
649
650 #[async_trait]
651 impl Channel for StructuredOnlyChannel {
652 fn name(&self) -> &str {
653 &self.channel_name
654 }
655
656 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
657 self.sent.write().push(message.content.clone());
658 Ok(())
659 }
660
661 async fn listen(
662 &self,
663 _tx: tokio::sync::mpsc::Sender<ChannelMessage>,
664 ) -> anyhow::Result<()> {
665 anyhow::bail!("listen not supported")
666 }
667
668 fn supports_free_form_ask(&self) -> bool {
669 false
670 }
671 }
672
673 #[tokio::test]
674 async fn wait_for_response_fails_fast_on_structured_only_channel() {
675 let stub = Arc::new(StructuredOnlyChannel::new("acp"));
678 let stub_clone: Arc<dyn Channel> = stub.clone();
679 let tool = make_tool_with_channels(vec![("acp", stub_clone)]);
680
681 let started = std::time::Instant::now();
682 let result = tool
683 .execute(json!({
684 "summary": "Need confirmation",
685 "wait_for_response": true,
686 "timeout_secs": 30,
687 }))
688 .await
689 .unwrap();
690 let elapsed = started.elapsed();
691
692 assert!(!result.success, "expected failure, got: {:?}", result);
693 let err = result.error.unwrap_or_default();
694 assert!(
695 err.contains("wait_for_response"),
696 "error should mention wait_for_response: {err}"
697 );
698 assert!(
700 elapsed < std::time::Duration::from_secs(2),
701 "expected fast-fail; took {elapsed:?}"
702 );
703 assert!(stub.sent.read().is_empty());
705 }
706
707 #[tokio::test]
708 async fn non_blocking_works_on_structured_only_channel() {
709 let stub = Arc::new(StructuredOnlyChannel::new("acp"));
712 let stub_clone: Arc<dyn Channel> = stub.clone();
713 let tool = make_tool_with_channels(vec![("acp", stub_clone)]);
714
715 let result = tool
716 .execute(json!({
717 "summary": "FYI: deploy started",
718 "urgency": "low",
719 }))
720 .await
721 .unwrap();
722
723 assert!(result.success, "error: {:?}", result.error);
724 assert_eq!(stub.sent.read().len(), 1);
725 }
726
727 #[tokio::test]
730 async fn test_high_urgency_succeeds_without_alert_channels() {
731 let tool = make_tool_with_channels(vec![(
733 "test",
734 Arc::new(SilentChannel::new("test")) as Arc<dyn Channel>,
735 )]);
736
737 let result = tool
738 .execute(json!({
739 "summary": "Critical alert",
740 "urgency": "high"
741 }))
742 .await
743 .unwrap();
744
745 assert!(result.success, "error: {:?}", result.error);
746 let parsed: serde_json::Value = serde_json::from_str(&result.output).unwrap();
747 assert_eq!(parsed["status"], "escalated");
748 assert_eq!(parsed["urgency"], "high");
749 }
750}