1use anyhow::Result;
2use chrono::Utc;
3use std::future::Future;
4use std::path::PathBuf;
5use tokio::task::JoinHandle;
6use tokio::time::Duration;
7use zeroclaw_config::schema::Config;
8use zeroclaw_memory::{MEMORY_CONTEXT_CLOSE, MEMORY_CONTEXT_OPEN};
9
10const STATUS_FLUSH_SECONDS: u64 = 5;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum DaemonExit {
19 Shutdown,
20 Reload,
21}
22
23async fn wait_for_exit_signal(
33 mut reload_rx: tokio::sync::watch::Receiver<bool>,
34) -> Result<DaemonExit> {
35 #[cfg(unix)]
36 {
37 use tokio::signal::unix::{SignalKind, signal};
38
39 let mut sigint = signal(SignalKind::interrupt())?;
40 let mut sigterm = signal(SignalKind::terminate())?;
41 let mut sighup = signal(SignalKind::hangup())?;
42
43 loop {
44 tokio::select! {
45 _ = sigint.recv() => {
46 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGINT, shutting down...");
47 return Ok(DaemonExit::Shutdown);
48 }
49 _ = sigterm.recv() => {
50 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGTERM, shutting down...");
51 return Ok(DaemonExit::Shutdown);
52 }
53 _ = sighup.recv() => {
54 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGHUP, ignoring (daemon stays running)");
55 }
56 changed = reload_rx.changed() => {
57 if changed.is_err() {
58 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "Reload sender dropped; shutting down");
62 return Ok(DaemonExit::Shutdown);
63 }
64 if *reload_rx.borrow_and_update() {
65 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Reload requested via /admin/reload");
66 return Ok(DaemonExit::Reload);
67 }
68 }
69 }
70 }
71 }
72
73 #[cfg(not(unix))]
74 {
75 loop {
76 tokio::select! {
77 res = tokio::signal::ctrl_c() => {
78 res?;
79 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received Ctrl+C, shutting down...");
80 return Ok(DaemonExit::Shutdown);
81 }
82 changed = reload_rx.changed() => {
83 if changed.is_err() {
84 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "Reload sender dropped; shutting down");
85 return Ok(DaemonExit::Shutdown);
86 }
87 if *reload_rx.borrow_and_update() {
88 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Reload requested via /admin/reload");
89 return Ok(DaemonExit::Reload);
90 }
91 }
92 }
93 }
94 }
95}
96
97#[allow(clippy::type_complexity)]
100pub struct DaemonSubsystems {
101 pub gateway_start: Option<
105 Box<
106 dyn Fn(
107 String,
108 u16,
109 Config,
110 Option<tokio::sync::broadcast::Sender<serde_json::Value>>,
111 Option<tokio::sync::watch::Sender<bool>>,
112 ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send>>
113 + Send
114 + Sync,
115 >,
116 >,
117 pub channels_start: Option<
121 Box<
122 dyn Fn(
123 Config,
124 tokio_util::sync::CancellationToken,
125 ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send>>
126 + Send
127 + Sync,
128 >,
129 >,
130 pub mqtt_start: Option<
132 Box<
133 dyn Fn(
134 zeroclaw_config::schema::MqttConfig,
135 ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send>>
136 + Send
137 + Sync,
138 >,
139 >,
140}
141
142pub async fn run(
143 config: Config,
144 host: String,
145 port: u16,
146 subsystems: DaemonSubsystems,
147) -> Result<DaemonExit> {
148 let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1);
149 let max_backoff = config
150 .reliability
151 .channel_max_backoff_secs
152 .max(initial_backoff);
153
154 crate::health::mark_component_ok("daemon");
155
156 let (event_tx, _rx) = tokio::sync::broadcast::channel::<serde_json::Value>(256);
159
160 if config.heartbeat.enabled {
161 let _ = crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.data_dir)
162 .await;
163 }
164
165 let mut handles: Vec<JoinHandle<()>> = vec![spawn_state_writer(config.clone())];
166
167 let (reload_tx, reload_rx) = tokio::sync::watch::channel::<bool>(false);
170
171 if let Some(gateway_start) = subsystems.gateway_start {
172 let gateway_cfg = config.clone();
173 let gateway_host = host.clone();
174 let gateway_event_tx = event_tx.clone();
175 let gateway_reload_tx = reload_tx.clone();
176 let gateway_start = std::sync::Arc::new(gateway_start);
177 handles.push(spawn_component_supervisor(
178 "gateway",
179 initial_backoff,
180 max_backoff,
181 move || {
182 let cfg = gateway_cfg.clone();
183 let host = gateway_host.clone();
184 let tx = gateway_event_tx.clone();
185 let reload = gateway_reload_tx.clone();
186 let start = gateway_start.clone();
187 async move { start(host, port, cfg, Some(tx), Some(reload)).await }
188 },
189 ));
190 }
191
192 let channels_cancel = tokio_util::sync::CancellationToken::new();
193
194 if let Some(channels_start) = subsystems.channels_start {
195 if has_supervised_channels(&config) {
196 let channels_cfg = config.clone();
197 let channels_start = std::sync::Arc::new(channels_start);
198 let cancel_for_supervisor = channels_cancel.clone();
199 handles.push(spawn_component_supervisor(
200 "channels",
201 initial_backoff,
202 max_backoff,
203 move || {
204 let cfg = channels_cfg.clone();
205 let start = channels_start.clone();
206 let cancel = cancel_for_supervisor.clone();
207 async move { start(cfg, cancel).await }
208 },
209 ));
210 } else {
211 crate::health::mark_component_ok("channels");
212 ::zeroclaw_log::record!(
213 INFO,
214 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
215 "No channels configured; channel supervisor disabled"
216 );
217 }
218 } else {
219 crate::health::mark_component_ok("channels");
220 ::zeroclaw_log::record!(
221 INFO,
222 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
223 "Channels subsystem not wired; channel supervisor disabled"
224 );
225 }
226
227 if let Some(mqtt_start) = subsystems.mqtt_start {
229 let active_mqtt: std::collections::HashSet<String> = config
230 .agents
231 .values()
232 .filter(|a| a.enabled)
233 .flat_map(|a| a.channels.iter().map(|c| c.as_str().to_string()))
234 .collect();
235 let mut mqtt_started = false;
236 for (alias, mqtt_config) in &config.channels.mqtt {
237 if !active_mqtt.contains(&format!("mqtt.{alias}")) {
238 continue;
239 }
240 let mqtt_cfg = mqtt_config.clone();
241 let mqtt_start = std::sync::Arc::new(mqtt_start);
242 handles.push(spawn_component_supervisor(
243 "mqtt",
244 initial_backoff,
245 max_backoff,
246 move || {
247 let cfg = mqtt_cfg.clone();
248 let start = mqtt_start.clone();
249 async move { start(cfg).await }
250 },
251 ));
252 mqtt_started = true;
253 break;
254 }
255 if !mqtt_started {
256 crate::health::mark_component_ok("mqtt");
257 }
258 } else {
259 crate::health::mark_component_ok("mqtt");
260 }
261
262 if config.heartbeat.enabled {
263 let heartbeat_cfg = config.clone();
264 handles.push(spawn_component_supervisor(
265 "heartbeat",
266 initial_backoff,
267 max_backoff,
268 move || {
269 let cfg = heartbeat_cfg.clone();
270 async move { Box::pin(run_heartbeat_worker(cfg)).await }
271 },
272 ));
273 }
274
275 if config.scheduler.enabled {
276 let scheduler_cfg = config.clone();
277 let scheduler_event_tx = event_tx.clone();
278 handles.push(spawn_component_supervisor(
279 "scheduler",
280 initial_backoff,
281 max_backoff,
282 move || {
283 let cfg = scheduler_cfg.clone();
284 let tx = scheduler_event_tx.clone();
285 async move { Box::pin(crate::cron::scheduler::run(cfg, Some(tx))).await }
286 },
287 ));
288 } else {
289 crate::health::mark_component_ok("scheduler");
290 ::zeroclaw_log::record!(
291 INFO,
292 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
293 "Cron disabled; scheduler supervisor not started"
294 );
295 }
296
297 println!("🧠 ZeroClaw daemon started");
298 println!(" Gateway: http://{host}:{port}");
299 println!(" Components: gateway, channels, heartbeat, scheduler");
300 if config.gateway.require_pairing {
301 println!(" Pairing: enabled (code appears in gateway output above)");
302 }
303 println!(" Ctrl+C or SIGTERM to stop");
304
305 let exit = wait_for_exit_signal(reload_rx).await?;
307 crate::health::mark_component_error(
308 "daemon",
309 match exit {
310 DaemonExit::Shutdown => "shutdown requested",
311 DaemonExit::Reload => "reload requested",
312 },
313 );
314
315 channels_cancel.cancel();
319 for handle in &handles {
320 handle.abort();
321 }
322 for handle in handles {
323 let _ = handle.await;
324 }
325
326 #[cfg(all(target_os = "linux", target_env = "gnu"))]
327 unsafe {
328 libc::malloc_trim(0);
329 }
330
331 Ok(exit)
332}
333
334pub fn state_file_path(config: &Config) -> PathBuf {
335 config
336 .config_path
337 .parent()
338 .map_or_else(|| PathBuf::from("."), PathBuf::from)
339 .join("daemon_state.json")
340}
341
342fn spawn_state_writer(config: Config) -> JoinHandle<()> {
343 tokio::spawn(async move {
344 let path = state_file_path(&config);
345 if let Some(parent) = path.parent() {
346 let _ = tokio::fs::create_dir_all(parent).await;
347 }
348
349 let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS));
350 loop {
351 interval.tick().await;
352 let mut json = crate::health::snapshot_json();
353 if let Some(obj) = json.as_object_mut() {
354 obj.insert(
355 "written_at".into(),
356 serde_json::json!(Utc::now().to_rfc3339()),
357 );
358 }
359 let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec());
360 let _ = tokio::fs::write(&path, data).await;
361 }
362 })
363}
364
365fn spawn_component_supervisor<F, Fut>(
366 name: &'static str,
367 initial_backoff_secs: u64,
368 max_backoff_secs: u64,
369 mut run_component: F,
370) -> JoinHandle<()>
371where
372 F: FnMut() -> Fut + Send + 'static,
373 Fut: Future<Output = Result<()>> + Send + 'static,
374{
375 tokio::spawn(async move {
376 let mut backoff = initial_backoff_secs.max(1);
377 let max_backoff = max_backoff_secs.max(backoff);
378
379 loop {
380 crate::health::mark_component_ok(name);
381 match run_component().await {
382 Ok(()) => {
383 crate::health::mark_component_error(name, "component exited unexpectedly");
384 ::zeroclaw_log::record!(
385 WARN,
386 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
387 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
388 .with_attrs(::serde_json::json!({"name": name})),
389 &format!("Daemon component '{name}' exited unexpectedly")
390 );
391 backoff = initial_backoff_secs.max(1);
393 }
394 Err(e) => {
395 crate::health::mark_component_error(name, e.to_string());
396 ::zeroclaw_log::record!(
397 ERROR,
398 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
399 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
400 .with_attrs(
401 ::serde_json::json!({"error": format!("{}", e), "name": name})
402 ),
403 &format!("Daemon component '{name}' failed: {e}")
404 );
405 }
406 }
407
408 crate::health::bump_component_restart(name);
409 tokio::time::sleep(Duration::from_secs(backoff)).await;
410 backoff = backoff.saturating_mul(2).min(max_backoff);
412 }
413 })
414}
415
416async fn run_heartbeat_worker(config: Config) -> Result<()> {
417 use crate::heartbeat::engine::{
418 HeartbeatEngine, HeartbeatTask, TaskPriority, TaskStatus, compute_adaptive_interval,
419 };
420 use std::sync::Arc;
421
422 let agent_alias = config.heartbeat.agent.trim().to_string();
423 if agent_alias.is_empty() {
424 anyhow::bail!(
425 "heartbeat worker requires `[heartbeat] agent = \"<alias>\"` naming a configured agent"
426 );
427 }
428 if config.agent(&agent_alias).is_none() {
429 anyhow::bail!(
430 "[heartbeat] agent = {agent_alias:?} is not configured ([agents.{agent_alias}] missing)"
431 );
432 }
433
434 let observer: std::sync::Arc<dyn crate::observability::Observer> =
435 std::sync::Arc::from(crate::observability::create_observer(&config.observability));
436 let engine = HeartbeatEngine::new(config.heartbeat.clone(), config.data_dir.clone(), observer);
437 let metrics = engine.metrics();
438 let delivery = resolve_heartbeat_delivery(&config)?;
439 let two_phase = config.heartbeat.two_phase;
440 let adaptive = config.heartbeat.adaptive;
441 let start_time = std::time::Instant::now();
442
443 let deadman_timeout = config.heartbeat.deadman_timeout_minutes;
445 if deadman_timeout > 0 {
446 let dm_metrics = Arc::clone(&metrics);
447 let dm_config = config.clone();
448 let dm_delivery = delivery.clone();
449 tokio::spawn(async move {
450 let check_interval = Duration::from_secs(60);
451 let timeout = chrono::Duration::minutes(i64::from(deadman_timeout));
452 loop {
453 tokio::time::sleep(check_interval).await;
454 let last_tick = dm_metrics.lock().last_tick_at;
455 if let Some(last) = last_tick
456 && chrono::Utc::now() - last > timeout
457 {
458 let alert = format!(
459 "⚠️ Heartbeat dead-man's switch: no tick in {deadman_timeout} minutes"
460 );
461 let (channel, target) = if let Some(ch) = &dm_config.heartbeat.deadman_channel {
462 let to = dm_config
463 .heartbeat
464 .deadman_to
465 .as_deref()
466 .or(dm_config.heartbeat.to.as_deref())
467 .unwrap_or_default();
468 (ch.clone(), to.to_string())
469 } else if let Some((ch, to)) = &dm_delivery {
470 (ch.clone(), to.clone())
471 } else {
472 continue;
473 };
474 let delivery_fut = crate::cron::scheduler::deliver_announcement(
475 &dm_config, &channel, &target, None, &alert,
476 );
477 match tokio::time::timeout(Duration::from_secs(30), delivery_fut).await {
478 Ok(Err(e)) => {
479 ::zeroclaw_log::record!(
480 WARN,
481 ::zeroclaw_log::Event::new(
482 module_path!(),
483 ::zeroclaw_log::Action::Note
484 )
485 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
486 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
487 "Deadman alert delivery failed"
488 );
489 }
490 Err(_) => {
491 ::zeroclaw_log::record!(
492 WARN,
493 ::zeroclaw_log::Event::new(
494 module_path!(),
495 ::zeroclaw_log::Action::Note
496 )
497 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
498 "Deadman alert delivery timed out (30s)"
499 );
500 }
501 Ok(Ok(())) => {}
502 }
503 }
504 }
505 });
506 }
507
508 let base_interval = config.heartbeat.interval_minutes.max(1);
509 let mut sleep_mins = base_interval;
510
511 loop {
512 tokio::time::sleep(Duration::from_secs(u64::from(sleep_mins) * 60)).await;
513
514 {
516 let mut m = metrics.lock();
517 m.uptime_secs = start_time.elapsed().as_secs();
518 }
519
520 let tick_start = std::time::Instant::now();
521
522 let mut tasks = engine.collect_runnable_tasks().await?;
524 let has_high_priority = tasks.iter().any(|t| t.priority == TaskPriority::High);
525
526 if tasks.is_empty() {
527 if let Some(fallback) = config
528 .heartbeat
529 .message
530 .as_deref()
531 .map(str::trim)
532 .filter(|m| !m.is_empty())
533 {
534 tasks.push(HeartbeatTask {
535 text: fallback.to_string(),
536 priority: TaskPriority::Medium,
537 status: TaskStatus::Active,
538 });
539 } else {
540 #[allow(clippy::cast_precision_loss)]
541 let elapsed = tick_start.elapsed().as_millis() as f64;
542 metrics.lock().record_success(elapsed);
543 continue;
544 }
545 }
546
547 let tasks_to_run = if two_phase {
549 let decision_prompt = format!(
550 "[Heartbeat Task | decision] {}",
551 HeartbeatEngine::build_decision_prompt(&tasks),
552 );
553 let phase1_fut = Box::pin(crate::agent::run(
554 config.clone(),
555 &agent_alias,
556 Some(decision_prompt),
557 None,
558 None,
559 Some(0.0),
560 vec![],
561 false,
562 None,
563 None,
564 crate::agent::loop_::AgentRunOverrides::default(),
565 ));
566 let phase1_result = if config.heartbeat.task_timeout_secs > 0 {
567 match tokio::time::timeout(
568 Duration::from_secs(config.heartbeat.task_timeout_secs),
569 phase1_fut,
570 )
571 .await
572 {
573 Ok(r) => r,
574 Err(_) => {
575 ::zeroclaw_log::record!(
576 WARN,
577 ::zeroclaw_log::Event::new(
578 module_path!(),
579 ::zeroclaw_log::Action::Timeout
580 )
581 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
582 .with_attrs(::serde_json::json!({
583 "phase": "phase1_decision",
584 "timeout_secs": config.heartbeat.task_timeout_secs,
585 })),
586 "heartbeat: phase1 decision timed out"
587 );
588 Err(anyhow::Error::msg(format!(
589 "Phase 1 decision timed out ({}s)",
590 config.heartbeat.task_timeout_secs
591 )))
592 }
593 }
594 } else {
595 phase1_fut.await
596 };
597 match phase1_result {
598 Ok(response) => {
599 let indices = HeartbeatEngine::parse_decision_response(&response, tasks.len());
600 if indices.is_empty() {
601 ::zeroclaw_log::record!(
602 INFO,
603 ::zeroclaw_log::Event::new(
604 module_path!(),
605 ::zeroclaw_log::Action::Note
606 ),
607 "heartbeat phase 1: skip (nothing to do)"
608 );
609 crate::health::mark_component_ok("heartbeat");
610 #[allow(clippy::cast_precision_loss)]
611 let elapsed = tick_start.elapsed().as_millis() as f64;
612 metrics.lock().record_success(elapsed);
613 continue;
614 }
615 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"selected": indices.len(), "total": tasks.len()})), "heartbeat phase 1: running task subset");
616 indices
617 .into_iter()
618 .filter_map(|i| tasks.get(i).cloned())
619 .collect()
620 }
621 Err(e) => {
622 ::zeroclaw_log::record!(
623 WARN,
624 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
625 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
626 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
627 "heartbeat phase 1 failed; running all tasks"
628 );
629 tasks
630 }
631 }
632 } else {
633 tasks
634 };
635
636 let session_context = if config.heartbeat.load_session_context {
640 load_heartbeat_session_context(&config)
641 } else {
642 None
643 };
644
645 let heartbeat_memory: Option<Box<dyn zeroclaw_memory::Memory>> =
647 zeroclaw_memory::create_memory(
648 &config.memory,
649 &config.data_dir,
650 config
651 .model_provider_for_agent(&agent_alias)
652 .and_then(|e| e.api_key.as_deref()),
653 )
654 .ok();
655
656 let mut tick_had_error = false;
657 for task in &tasks_to_run {
658 let task_start = std::time::Instant::now();
659 let task_prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text);
660
661 let memory_context = if let Some(ref mem) = heartbeat_memory {
665 match mem.recall(&task.text, 5, None, None, None).await {
666 Ok(entries) if !entries.is_empty() => {
667 let ctx: String = entries
668 .iter()
669 .filter(|e| {
670 !matches!(
671 e.category,
672 zeroclaw_memory::traits::MemoryCategory::Conversation
673 )
674 })
675 .map(|e| format!("- {}: {}", e.key, e.content))
676 .collect::<Vec<_>>()
677 .join("\n");
678 if ctx.is_empty() {
679 None
680 } else {
681 Some(format!(
682 "{MEMORY_CONTEXT_OPEN}\n{ctx}\n{MEMORY_CONTEXT_CLOSE}\n\n"
683 ))
684 }
685 }
686 _ => None,
687 }
688 } else {
689 None
690 };
691
692 let prompt = match (&session_context, &memory_context) {
693 (Some(sc), Some(mc)) => format!("{mc}\n{sc}\n\n{task_prompt}"),
694 (Some(sc), None) => format!("{sc}\n\n{task_prompt}"),
695 (None, Some(mc)) => format!("{mc}\n\n{task_prompt}"),
696 (None, None) => task_prompt,
697 };
698 let temp: Option<f64> = config
699 .model_provider_for_agent(&agent_alias)
700 .and_then(|e| e.temperature);
701 let phase2_fut = Box::pin(crate::agent::run(
702 config.clone(),
703 &agent_alias,
704 Some(prompt),
705 None,
706 None,
707 temp,
708 vec![],
709 false,
710 None,
711 None,
712 crate::agent::loop_::AgentRunOverrides::default(),
713 ));
714 let phase2_result = if config.heartbeat.task_timeout_secs > 0 {
715 match tokio::time::timeout(
716 Duration::from_secs(config.heartbeat.task_timeout_secs),
717 phase2_fut,
718 )
719 .await
720 {
721 Ok(r) => r,
722 Err(_) => {
723 ::zeroclaw_log::record!(
724 WARN,
725 ::zeroclaw_log::Event::new(
726 module_path!(),
727 ::zeroclaw_log::Action::Timeout
728 )
729 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
730 .with_attrs(::serde_json::json!({
731 "phase": "phase2_heartbeat",
732 "timeout_secs": config.heartbeat.task_timeout_secs,
733 })),
734 "heartbeat task timed out"
735 );
736 Err(anyhow::Error::msg(format!(
737 "Heartbeat task timed out ({}s)",
738 config.heartbeat.task_timeout_secs
739 )))
740 }
741 }
742 } else {
743 phase2_fut.await
744 };
745 match phase2_result {
746 Ok(output) => {
747 crate::health::mark_component_ok("heartbeat");
748 #[allow(clippy::cast_possible_truncation)]
749 let duration_ms = task_start.elapsed().as_millis() as i64;
750 let now = chrono::Utc::now();
751 let _ = crate::heartbeat::store::record_run(
752 &config.data_dir,
753 &task.text,
754 &task.priority.to_string(),
755 now - chrono::Duration::milliseconds(duration_ms),
756 now,
757 "ok",
758 Some(output.as_str()),
759 duration_ms,
760 config.heartbeat.max_run_history,
761 );
762 if config.memory.auto_save
764 && output.chars().count() >= 50
765 && let Some(ref mem) = heartbeat_memory
766 {
767 let key = format!("heartbeat_{}", uuid::Uuid::new_v4());
768 let summary = if output.len() > 500 {
769 let mut end = 500;
771 while end > 0 && !output.is_char_boundary(end) {
772 end -= 1;
773 }
774 &output[..end]
775 } else {
776 &output
777 };
778 let _ = mem
779 .store(
780 &key,
781 &format!("Heartbeat task '{}': {}", task.text, summary),
782 zeroclaw_memory::MemoryCategory::Daily,
783 None,
784 )
785 .await;
786 }
787
788 let announcement = if output.trim().is_empty() {
789 format!("💓 heartbeat task completed: {}", task.text)
790 } else {
791 output
792 };
793 if let Some((channel, target)) = &delivery {
794 let delivery_result = tokio::time::timeout(
795 Duration::from_secs(30),
796 crate::cron::scheduler::deliver_announcement(
797 &config,
798 channel,
799 target,
800 None,
801 &announcement,
802 ),
803 )
804 .await;
805 match delivery_result {
806 Ok(Err(e)) => {
807 crate::health::mark_component_error(
808 "heartbeat",
809 format!("delivery failed: {e}"),
810 );
811 ::zeroclaw_log::record!(
812 WARN,
813 ::zeroclaw_log::Event::new(
814 module_path!(),
815 ::zeroclaw_log::Action::Note
816 )
817 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
818 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
819 "Heartbeat delivery failed"
820 );
821 }
822 Err(_) => {
823 crate::health::mark_component_error(
824 "heartbeat",
825 "delivery timed out (30s)".to_string(),
826 );
827 ::zeroclaw_log::record!(
828 WARN,
829 ::zeroclaw_log::Event::new(
830 module_path!(),
831 ::zeroclaw_log::Action::Note
832 )
833 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
834 "Heartbeat delivery timed out (30s)"
835 );
836 }
837 Ok(Ok(())) => {}
838 }
839 }
840 }
841 Err(e) => {
842 tick_had_error = true;
843 #[allow(clippy::cast_possible_truncation)]
844 let duration_ms = task_start.elapsed().as_millis() as i64;
845 let now = chrono::Utc::now();
846 let _ = crate::heartbeat::store::record_run(
847 &config.data_dir,
848 &task.text,
849 &task.priority.to_string(),
850 now - chrono::Duration::milliseconds(duration_ms),
851 now,
852 "error",
853 Some(&e.to_string()),
854 duration_ms,
855 config.heartbeat.max_run_history,
856 );
857 crate::health::mark_component_error("heartbeat", e.to_string());
858 ::zeroclaw_log::record!(
859 WARN,
860 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
861 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
862 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
863 "Heartbeat task failed"
864 );
865 }
866 }
867 }
868
869 #[allow(clippy::cast_precision_loss)]
871 let tick_elapsed = tick_start.elapsed().as_millis() as f64;
872 {
873 let mut m = metrics.lock();
874 if tick_had_error {
875 m.record_failure(tick_elapsed);
876 } else {
877 m.record_success(tick_elapsed);
878 }
879 }
880
881 if adaptive {
883 let failures = metrics.lock().consecutive_failures;
884 sleep_mins = compute_adaptive_interval(
885 base_interval,
886 config.heartbeat.min_interval_minutes,
887 config.heartbeat.max_interval_minutes,
888 failures,
889 has_high_priority,
890 );
891 } else {
892 sleep_mins = base_interval;
893 }
894 }
895}
896
897fn resolve_heartbeat_delivery(config: &Config) -> Result<Option<(String, String)>> {
899 let channel = config
900 .heartbeat
901 .target
902 .as_deref()
903 .map(str::trim)
904 .filter(|value| !value.is_empty());
905 let target = config
906 .heartbeat
907 .to
908 .as_deref()
909 .map(str::trim)
910 .filter(|value| !value.is_empty());
911
912 match (channel, target) {
913 (Some(channel), Some(target)) => {
915 validate_heartbeat_channel_config(config, channel)?;
916 Ok(Some((channel.to_string(), target.to_string())))
917 }
918 (Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
920 (None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
921 (None, None) => Ok(auto_detect_heartbeat_channel(config)),
923 }
924}
925
926const HEARTBEAT_SESSION_CONTEXT_MESSAGES: usize = 20;
936
937fn load_heartbeat_session_context(config: &Config) -> Option<String> {
938 use zeroclaw_providers::traits::ChatMessage;
939
940 let channel = config
941 .heartbeat
942 .target
943 .as_deref()
944 .map(str::trim)
945 .filter(|v| !v.is_empty())?;
946 let to = config
947 .heartbeat
948 .to
949 .as_deref()
950 .map(str::trim)
951 .filter(|v| !v.is_empty())?;
952
953 if channel.contains('/') || channel.contains('\\') || to.contains('/') || to.contains('\\') {
954 ::zeroclaw_log::record!(
955 WARN,
956 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
957 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
958 "heartbeat session context: channel/to contains path separators, skipping"
959 );
960 return None;
961 }
962
963 let sessions_dir = config.data_dir.join("sessions");
964
965 let prefix = format!("{channel}_");
968 let suffix = format!("_{to}.jsonl");
969 let exact = format!("{channel}_{to}.jsonl");
970 let mid_prefix = format!("{channel}_{to}_");
971
972 let path = std::fs::read_dir(&sessions_dir)
973 .ok()?
974 .filter_map(|e| e.ok())
975 .filter(|e| {
976 let name = e.file_name();
977 let name = name.to_string_lossy();
978 name.ends_with(".jsonl")
979 && (name == exact
980 || (name.starts_with(&prefix) && name.ends_with(&suffix))
981 || name.starts_with(&mid_prefix))
982 })
983 .max_by_key(|e| {
984 e.metadata()
985 .and_then(|m| m.modified())
986 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
987 })
988 .map(|e| e.path())?;
989
990 if !path.exists() {
991 ::zeroclaw_log::record!(
992 DEBUG,
993 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
994 .with_attrs(::serde_json::json!({"channel": channel, "to": to})),
995 "heartbeat session context: no session file found"
996 );
997 return None;
998 }
999
1000 let messages = load_jsonl_messages(&path);
1001 if messages.is_empty() {
1002 return None;
1003 }
1004
1005 let recent: Vec<&ChatMessage> = messages
1006 .iter()
1007 .filter(|m| m.role == "user" || m.role == "assistant")
1008 .rev()
1009 .take(HEARTBEAT_SESSION_CONTEXT_MESSAGES)
1010 .collect::<Vec<_>>()
1011 .into_iter()
1012 .rev()
1013 .collect();
1014
1015 let has_user_message = recent.iter().any(|m| m.role == "user");
1020 if !has_user_message {
1021 ::zeroclaw_log::record!(
1022 DEBUG,
1023 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1024 "💓 Heartbeat session context: no user messages in recent history — skipping"
1025 );
1026 return None;
1027 }
1028
1029 let last_message_age = std::fs::metadata(&path)
1031 .ok()
1032 .and_then(|m| m.modified().ok())
1033 .and_then(|mtime| mtime.elapsed().ok());
1034
1035 let silence_note = match last_message_age {
1036 Some(age) => {
1037 let mins = age.as_secs() / 60;
1038 if mins < 60 {
1039 format!("(last message ~{mins} minutes ago)\n")
1040 } else {
1041 let hours = mins / 60;
1042 let rem = mins % 60;
1043 if rem == 0 {
1044 format!("(last message ~{hours}h ago)\n")
1045 } else {
1046 format!("(last message ~{hours}h {rem}m ago)\n")
1047 }
1048 }
1049 }
1050 None => String::new(),
1051 };
1052
1053 ::zeroclaw_log::record!(
1054 DEBUG,
1055 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1056 &format!(
1057 "💓 Heartbeat session context: {} messages from {}, silence: {}",
1058 recent.len(),
1059 path.display().to_string(),
1060 silence_note.trim()
1061 )
1062 );
1063
1064 let mut ctx = format!(
1065 "[Recent conversation history — use this for context when composing your message] {silence_note}",
1066 );
1067 for msg in &recent {
1068 let label = if msg.role == "user" { "User" } else { "You" };
1069 let content = if msg.content.len() > 500 {
1072 let truncate_at = msg
1073 .content
1074 .char_indices()
1075 .map(|(i, _)| i)
1076 .take_while(|&i| i <= 500)
1077 .last()
1078 .unwrap_or(0);
1079 format!("{}…", &msg.content[..truncate_at])
1080 } else {
1081 msg.content.clone()
1082 };
1083 ctx.push_str(label);
1084 ctx.push_str(": ");
1085 ctx.push_str(&content);
1086 ctx.push('\n');
1087 }
1088
1089 Some(ctx)
1090}
1091
1092fn load_jsonl_messages(path: &std::path::Path) -> Vec<zeroclaw_providers::traits::ChatMessage> {
1096 use std::collections::VecDeque;
1097 use std::io::BufRead;
1098
1099 let file = match std::fs::File::open(path) {
1100 Ok(f) => f,
1101 Err(_) => return Vec::new(),
1102 };
1103 let reader = std::io::BufReader::new(file);
1104 let mut window: VecDeque<zeroclaw_providers::traits::ChatMessage> =
1105 VecDeque::with_capacity(HEARTBEAT_SESSION_CONTEXT_MESSAGES + 1);
1106 for line in reader.lines() {
1107 let Ok(line) = line else { continue };
1108 let trimmed = line.trim();
1109 if trimmed.is_empty() {
1110 continue;
1111 }
1112 if let Ok(msg) = serde_json::from_str::<zeroclaw_providers::traits::ChatMessage>(trimmed) {
1113 window.push_back(msg);
1114 if window.len() > HEARTBEAT_SESSION_CONTEXT_MESSAGES {
1115 window.pop_front();
1116 }
1117 }
1118 }
1119 window.into_iter().collect()
1120}
1121
1122fn auto_detect_heartbeat_channel(config: &Config) -> Option<(String, String)> {
1125 if !config.channels.telegram.is_empty() {
1130 for alias in config.channels.telegram.keys() {
1131 let peers = config.channel_external_peers("telegram", alias);
1132 if let Some(target) = peers.into_iter().next() {
1133 return Some(("telegram".to_string(), target));
1134 }
1135 }
1136 }
1137 if !config.channels.discord.is_empty() {
1138 return None;
1140 }
1141 if !config.channels.slack.is_empty() {
1142 return None;
1144 }
1145 if !config.channels.mattermost.is_empty() {
1146 return None;
1148 }
1149 None
1150}
1151
1152fn validate_heartbeat_channel_config(config: &Config, channel: &str) -> Result<()> {
1153 match channel.to_ascii_lowercase().as_str() {
1154 "telegram" => {
1155 if config.channels.telegram.is_empty() {
1156 anyhow::bail!(
1157 "heartbeat.target is set to telegram but channels.telegram is not configured"
1158 );
1159 }
1160 }
1161 "discord" => {
1162 if config.channels.discord.is_empty() {
1163 anyhow::bail!(
1164 "heartbeat.target is set to discord but channels.discord is not configured"
1165 );
1166 }
1167 }
1168 "slack" => {
1169 if config.channels.slack.is_empty() {
1170 anyhow::bail!(
1171 "heartbeat.target is set to slack but channels.slack is not configured"
1172 );
1173 }
1174 }
1175 "mattermost" => {
1176 if config.channels.mattermost.is_empty() {
1177 anyhow::bail!(
1178 "heartbeat.target is set to mattermost but channels.mattermost is not configured"
1179 );
1180 }
1181 }
1182 other => anyhow::bail!("unsupported heartbeat.target channel: {other}"),
1183 }
1184
1185 Ok(())
1186}
1187
1188fn has_supervised_channels(config: &Config) -> bool {
1189 config
1190 .channels
1191 .channels()
1192 .iter()
1193 .any(|info| info.configured)
1194}
1195
1196#[cfg(test)]
1200mod tests {
1201 use super::*;
1202 use tempfile::TempDir;
1203
1204 fn test_config(tmp: &TempDir) -> Config {
1205 let config = Config {
1206 data_dir: tmp.path().join("data"),
1207 config_path: tmp.path().join("config.toml"),
1208 ..Config::default()
1209 };
1210 std::fs::create_dir_all(&config.data_dir).unwrap();
1211 config
1212 }
1213
1214 #[test]
1215 fn state_file_path_uses_config_directory() {
1216 let tmp = TempDir::new().unwrap();
1217 let config = test_config(&tmp);
1218
1219 let path = state_file_path(&config);
1220 assert_eq!(path, tmp.path().join("daemon_state.json"));
1221 }
1222
1223 #[tokio::test]
1224 async fn supervisor_marks_error_and_restart_on_failure() {
1225 let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async {
1226 anyhow::bail!("boom")
1227 });
1228
1229 tokio::time::sleep(Duration::from_millis(50)).await;
1230 handle.abort();
1231 let _ = handle.await;
1232
1233 let snapshot = crate::health::snapshot_json();
1234 let component = &snapshot["components"]["daemon-test-fail"];
1235 assert_eq!(component["status"], "error");
1236 assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
1237 assert!(
1238 component["last_error"]
1239 .as_str()
1240 .unwrap_or("")
1241 .contains("boom")
1242 );
1243 }
1244
1245 #[tokio::test]
1246 async fn supervisor_marks_unexpected_exit_as_error() {
1247 let handle = spawn_component_supervisor("daemon-test-exit", 1, 1, || async { Ok(()) });
1248
1249 tokio::time::sleep(Duration::from_millis(50)).await;
1250 handle.abort();
1251 let _ = handle.await;
1252
1253 let snapshot = crate::health::snapshot_json();
1254 let component = &snapshot["components"]["daemon-test-exit"];
1255 assert_eq!(component["status"], "error");
1256 assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
1257 assert!(
1258 component["last_error"]
1259 .as_str()
1260 .unwrap_or("")
1261 .contains("component exited unexpectedly")
1262 );
1263 }
1264
1265 #[test]
1266 fn detects_no_supervised_channels() {
1267 let config = Config::default();
1268 assert!(!has_supervised_channels(&config));
1269 }
1270
1271 #[test]
1272 fn detects_supervised_channels_present() {
1273 let mut config = Config::default();
1274 config.channels.telegram.insert(
1275 "default".to_string(),
1276 zeroclaw_config::schema::TelegramConfig {
1277 enabled: true,
1278 bot_token: "token".into(),
1279 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1280 draft_update_interval_ms: 1000,
1281 interrupt_on_new_message: false,
1282 mention_only: false,
1283 ack_reactions: None,
1284 proxy_url: None,
1285 approval_timeout_secs: 120,
1286 excluded_tools: vec![],
1287 default_target: None,
1288 },
1289 );
1290 assert!(has_supervised_channels(&config));
1291 }
1292
1293 #[test]
1294 fn detects_dingtalk_as_supervised_channel() {
1295 let mut config = Config::default();
1296 config.channels.dingtalk.insert(
1297 "default".to_string(),
1298 zeroclaw_config::schema::DingTalkConfig {
1299 enabled: true,
1300 client_id: "client_id".into(),
1301 client_secret: "client_secret".into(),
1302 proxy_url: None,
1303 excluded_tools: vec![],
1304 },
1305 );
1306 assert!(has_supervised_channels(&config));
1307 }
1308
1309 #[test]
1310 fn detects_mattermost_as_supervised_channel() {
1311 let mut config = Config::default();
1312 config.channels.mattermost.insert(
1313 "default".to_string(),
1314 zeroclaw_config::schema::MattermostConfig {
1315 enabled: true,
1316 url: "https://mattermost.example.com".into(),
1317 bot_token: Some("token".into()),
1318 login_id: None,
1319 password: None,
1320 channel_ids: vec!["channel-id".into()],
1321 team_ids: vec![],
1322 discover_dms: None,
1323 thread_replies: Some(true),
1324 mention_only: Some(false),
1325 interrupt_on_new_message: false,
1326 proxy_url: None,
1327 excluded_tools: vec![],
1328 default_target: None,
1329 },
1330 );
1331 assert!(has_supervised_channels(&config));
1332 }
1333
1334 #[test]
1335 fn detects_qq_as_supervised_channel() {
1336 let mut config = Config::default();
1337 config.channels.qq.insert(
1338 "default".to_string(),
1339 zeroclaw_config::schema::QQConfig {
1340 enabled: true,
1341 app_id: "app-id".into(),
1342 app_secret: "app-secret".into(),
1343 proxy_url: None,
1344 excluded_tools: vec![],
1345 },
1346 );
1347 assert!(has_supervised_channels(&config));
1348 }
1349
1350 #[test]
1351 fn detects_nextcloud_talk_as_supervised_channel() {
1352 let mut config = Config::default();
1353 config.channels.nextcloud_talk.insert(
1354 "default".to_string(),
1355 zeroclaw_config::schema::NextcloudTalkConfig {
1356 enabled: true,
1357 base_url: "https://cloud.example.com".into(),
1358 app_token: "app-token".into(),
1359 webhook_secret: None,
1360 proxy_url: None,
1361 bot_name: None,
1362 excluded_tools: vec![],
1363 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1364 draft_update_interval_ms: 1000,
1365 },
1366 );
1367 assert!(has_supervised_channels(&config));
1368 }
1369
1370 #[test]
1371 fn webhook_only_config_is_supervised() {
1372 let mut config = Config::default();
1373 config.channels.webhook.insert(
1374 "default".to_string(),
1375 zeroclaw_config::schema::WebhookConfig {
1376 enabled: true,
1377 port: 8080,
1378 listen_path: None,
1379 send_url: None,
1380 send_method: None,
1381 auth_header: None,
1382 secret: None,
1383 excluded_tools: vec![],
1384 max_retries: None,
1385 retry_base_delay_ms: None,
1386 retry_max_delay_ms: None,
1387 },
1388 );
1389 assert!(has_supervised_channels(&config));
1390 }
1391
1392 #[test]
1393 fn resolve_delivery_none_when_unset() {
1394 let config = Config::default();
1395 let target = resolve_heartbeat_delivery(&config).unwrap();
1396 assert!(target.is_none());
1397 }
1398
1399 #[test]
1400 fn resolve_delivery_requires_to_field() {
1401 let mut config = Config::default();
1402 config.heartbeat.target = Some("telegram".into());
1403 let err = resolve_heartbeat_delivery(&config).unwrap_err();
1404 assert!(
1405 err.to_string()
1406 .contains("heartbeat.to is required when heartbeat.target is set")
1407 );
1408 }
1409
1410 #[test]
1411 fn resolve_delivery_requires_target_field() {
1412 let mut config = Config::default();
1413 config.heartbeat.to = Some("123456".into());
1414 let err = resolve_heartbeat_delivery(&config).unwrap_err();
1415 assert!(
1416 err.to_string()
1417 .contains("heartbeat.target is required when heartbeat.to is set")
1418 );
1419 }
1420
1421 #[test]
1422 fn resolve_delivery_rejects_unsupported_channel() {
1423 let mut config = Config::default();
1424 config.heartbeat.target = Some("email".into());
1425 config.heartbeat.to = Some("ops@example.com".into());
1426 let err = resolve_heartbeat_delivery(&config).unwrap_err();
1427 assert!(
1428 err.to_string()
1429 .contains("unsupported heartbeat.target channel")
1430 );
1431 }
1432
1433 #[test]
1434 fn resolve_delivery_requires_channel_configuration() {
1435 let mut config = Config::default();
1436 config.heartbeat.target = Some("telegram".into());
1437 config.heartbeat.to = Some("123456".into());
1438 let err = resolve_heartbeat_delivery(&config).unwrap_err();
1439 assert!(
1440 err.to_string()
1441 .contains("channels.telegram is not configured")
1442 );
1443 }
1444
1445 #[test]
1446 fn resolve_delivery_accepts_telegram_configuration() {
1447 let mut config = Config::default();
1448 config.heartbeat.target = Some("telegram".into());
1449 config.heartbeat.to = Some("123456".into());
1450 config.channels.telegram.insert(
1451 "default".to_string(),
1452 zeroclaw_config::schema::TelegramConfig {
1453 enabled: true,
1454 bot_token: "bot-token".into(),
1455 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1456 draft_update_interval_ms: 1000,
1457 interrupt_on_new_message: false,
1458 mention_only: false,
1459 ack_reactions: None,
1460 proxy_url: None,
1461 approval_timeout_secs: 120,
1462 excluded_tools: vec![],
1463 default_target: None,
1464 },
1465 );
1466
1467 let target = resolve_heartbeat_delivery(&config).unwrap();
1468 assert_eq!(target, Some(("telegram".to_string(), "123456".to_string())));
1469 }
1470
1471 #[test]
1472 fn auto_detect_telegram_when_configured() {
1473 use zeroclaw_config::multi_agent::{PeerGroupConfig, PeerUsername};
1474
1475 let mut config = Config::default();
1476 config.channels.telegram.insert(
1477 "default".to_string(),
1478 zeroclaw_config::schema::TelegramConfig {
1479 enabled: true,
1480 bot_token: "bot-token".into(),
1481 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1482 draft_update_interval_ms: 1000,
1483 interrupt_on_new_message: false,
1484 mention_only: false,
1485 ack_reactions: None,
1486 proxy_url: None,
1487 approval_timeout_secs: 120,
1488 excluded_tools: vec![],
1489 default_target: None,
1490 },
1491 );
1492 config.peer_groups.insert(
1496 "telegram_default".to_string(),
1497 PeerGroupConfig {
1498 channel: "telegram".to_string(),
1499 external_peers: vec![PeerUsername::new("user123")],
1500 ..PeerGroupConfig::default()
1501 },
1502 );
1503
1504 let target = resolve_heartbeat_delivery(&config).unwrap();
1505 assert_eq!(
1506 target,
1507 Some(("telegram".to_string(), "user123".to_string()))
1508 );
1509 }
1510
1511 #[test]
1512 fn auto_detect_none_when_no_channels() {
1513 let config = Config::default();
1514 let target = auto_detect_heartbeat_channel(&config);
1515 assert!(target.is_none());
1516 }
1517
1518 #[cfg(unix)]
1521 #[tokio::test]
1522 async fn sighup_does_not_shut_down_daemon() {
1523 use libc;
1524 use tokio::time::{Duration, timeout};
1525
1526 let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1527 let handle = tokio::spawn(wait_for_exit_signal(reload_rx));
1528
1529 tokio::time::sleep(Duration::from_millis(50)).await;
1531
1532 unsafe { libc::raise(libc::SIGHUP) };
1534
1535 let result = timeout(Duration::from_millis(200), handle).await;
1537 assert!(
1538 result.is_err(),
1539 "wait_for_exit_signal should not return after SIGHUP"
1540 );
1541 }
1542
1543 #[tokio::test]
1546 async fn reload_channel_returns_reload() {
1547 use tokio::time::{Duration, timeout};
1548
1549 let (reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1550 let handle = tokio::spawn(wait_for_exit_signal(reload_rx));
1551 tokio::time::sleep(Duration::from_millis(50)).await;
1552 reload_tx.send(true).expect("send reload");
1553
1554 let result = timeout(Duration::from_secs(2), handle)
1555 .await
1556 .expect("wait_for_exit_signal should return after reload signal")
1557 .expect("task should not panic")
1558 .expect("signal handler should not error");
1559 assert_eq!(result, DaemonExit::Reload);
1560 }
1561}