1use anyhow::Result;
2use chrono::Utc;
3use std::path::PathBuf;
4use tokio::task::JoinHandle;
5use tokio::time::Duration;
6use zeroclaw_config::schema::Config;
7use zeroclaw_memory::{MEMORY_CONTEXT_CLOSE, MEMORY_CONTEXT_OPEN};
8
9mod registry;
10pub use registry::DaemonRegistry;
11
12const STATUS_FLUSH_SECONDS: u64 = 5;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum DaemonExit {
21 Shutdown,
22 Reload,
23}
24
25const EPHEMERAL_GRACE_SECS: u64 = 1;
36
37async fn wait_for_exit_signal(
38 mut reload_rx: tokio::sync::watch::Receiver<bool>,
39 ephemeral: bool,
40 client_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
41) -> Result<DaemonExit> {
42 use std::sync::atomic::Ordering;
43
44 let ephemeral_shutdown = async {
49 if !ephemeral {
50 return std::future::pending::<()>().await;
51 }
52 loop {
54 if client_count.load(Ordering::Relaxed) > 0 {
55 break;
56 }
57 tokio::time::sleep(Duration::from_secs(1)).await;
58 }
59 loop {
61 if client_count.load(Ordering::Relaxed) == 0 {
62 break;
63 }
64 tokio::time::sleep(Duration::from_secs(1)).await;
65 }
66 ::zeroclaw_log::record!(
67 INFO,
68 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
69 .with_attrs(::serde_json::json!({"grace_secs": EPHEMERAL_GRACE_SECS})),
70 "All socket clients disconnected; starting ephemeral grace period"
71 );
72 for _ in 0..EPHEMERAL_GRACE_SECS {
74 tokio::time::sleep(Duration::from_secs(1)).await;
75 if client_count.load(Ordering::Relaxed) > 0 {
76 return Box::pin(wait_for_ephemeral(client_count.clone())).await;
78 }
79 }
80 };
81 tokio::pin!(ephemeral_shutdown);
82
83 #[cfg(unix)]
84 {
85 use tokio::signal::unix::{SignalKind, signal};
86
87 let mut sigint = signal(SignalKind::interrupt())?;
88 let mut sigterm = signal(SignalKind::terminate())?;
89 let mut sighup = signal(SignalKind::hangup())?;
90
91 loop {
92 tokio::select! {
93 _ = sigint.recv() => {
94 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGINT, shutting down...");
95 return Ok(DaemonExit::Shutdown);
96 }
97 _ = sigterm.recv() => {
98 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGTERM, shutting down...");
99 return Ok(DaemonExit::Shutdown);
100 }
101 _ = sighup.recv() => {
102 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGHUP, ignoring (daemon stays running)");
103 }
104 changed = reload_rx.changed() => {
105 if changed.is_err() {
106 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "Reload sender dropped; shutting down");
107 return Ok(DaemonExit::Shutdown);
108 }
109 if *reload_rx.borrow_and_update() {
110 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Reload requested via /admin/reload");
111 return Ok(DaemonExit::Reload);
112 }
113 }
114 _ = &mut ephemeral_shutdown => {
115 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Ephemeral daemon: no clients remaining, shutting down");
116 return Ok(DaemonExit::Shutdown);
117 }
118 }
119 }
120 }
121
122 #[cfg(not(unix))]
123 {
124 loop {
125 tokio::select! {
126 res = tokio::signal::ctrl_c() => {
127 res?;
128 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received Ctrl+C, shutting down...");
129 return Ok(DaemonExit::Shutdown);
130 }
131 changed = reload_rx.changed() => {
132 if changed.is_err() {
133 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "Reload sender dropped; shutting down");
134 return Ok(DaemonExit::Shutdown);
135 }
136 if *reload_rx.borrow_and_update() {
137 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Reload requested via /admin/reload");
138 return Ok(DaemonExit::Reload);
139 }
140 }
141 _ = &mut ephemeral_shutdown => {
142 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Ephemeral daemon: no clients remaining, shutting down");
143 return Ok(DaemonExit::Shutdown);
144 }
145 }
146 }
147 }
148}
149
150async fn wait_for_ephemeral(client_count: std::sync::Arc<std::sync::atomic::AtomicUsize>) {
152 use std::sync::atomic::Ordering;
153 loop {
155 if client_count.load(Ordering::Relaxed) == 0 {
156 break;
157 }
158 tokio::time::sleep(Duration::from_secs(1)).await;
159 }
160 ::zeroclaw_log::record!(
161 INFO,
162 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
163 .with_attrs(::serde_json::json!({"grace_secs": EPHEMERAL_GRACE_SECS})),
164 "All socket clients disconnected; starting ephemeral grace period"
165 );
166 for _ in 0..EPHEMERAL_GRACE_SECS {
167 tokio::time::sleep(Duration::from_secs(1)).await;
168 if client_count.load(Ordering::Relaxed) > 0 {
169 return Box::pin(wait_for_ephemeral(client_count)).await;
170 }
171 }
172}
173
174pub async fn run(
175 config: Config,
176 host: String,
177 port: u16,
178 mut registry: DaemonRegistry,
179 ephemeral: bool,
180) -> Result<DaemonExit> {
181 let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1);
182 let max_backoff = config
183 .reliability
184 .channel_max_backoff_secs
185 .max(initial_backoff);
186
187 crate::health::mark_component_ok("daemon");
188
189 let (event_tx, _rx) = tokio::sync::broadcast::channel::<serde_json::Value>(256);
192
193 zeroclaw_log::set_broadcast_hook(event_tx.clone());
199
200 if config.heartbeat.enabled {
201 let _ = crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.data_dir)
202 .await;
203 }
204
205 let mut handles: Vec<JoinHandle<()>> = vec![spawn_state_writer(config.clone())];
206
207 let (reload_tx, reload_rx) = tokio::sync::watch::channel::<bool>(false);
210
211 let tui_registry =
214 std::sync::Arc::new(crate::rpc::tui_identity::TuiRegistry::new(&config.data_dir));
215
216 if let Some(gateway_start) = registry.take_gateway_start() {
217 let gateway_cfg = config.clone();
218 let gateway_host = host.clone();
219 let gateway_event_tx = event_tx.clone();
220 let gateway_reload_tx = reload_tx.clone();
221 let gateway_tui_registry = tui_registry.clone();
222 let gateway_start = std::sync::Arc::new(gateway_start);
223 handles.push(spawn_component_supervisor(
224 "gateway",
225 initial_backoff,
226 max_backoff,
227 move || {
228 let cfg = gateway_cfg.clone();
229 let host = gateway_host.clone();
230 let tx = gateway_event_tx.clone();
231 let reload = gateway_reload_tx.clone();
232 let tui_reg = gateway_tui_registry.clone();
233 let start = gateway_start.clone();
234 async move { start(host, port, cfg, Some(tx), Some(reload), Some(tui_reg)).await }
235 },
236 ));
237 }
238
239 let channels_cancel = tokio_util::sync::CancellationToken::new();
240
241 if let Some(channels_start) = registry.take_channels_start() {
242 if has_supervised_channels(&config) {
243 let channels_cfg = config.clone();
244 let channels_start = std::sync::Arc::new(channels_start);
245 let cancel_for_supervisor = channels_cancel.clone();
246 handles.push(spawn_component_supervisor(
247 "channels",
248 initial_backoff,
249 max_backoff,
250 move || {
251 let cfg = channels_cfg.clone();
252 let start = channels_start.clone();
253 let cancel = cancel_for_supervisor.clone();
254 async move { start(cfg, cancel).await }
255 },
256 ));
257 } else {
258 crate::health::mark_component_ok("channels");
259 ::zeroclaw_log::record!(
260 INFO,
261 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
262 "No channels configured; channel supervisor disabled"
263 );
264 }
265 } else {
266 crate::health::mark_component_ok("channels");
267 ::zeroclaw_log::record!(
268 INFO,
269 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
270 "Channels subsystem not wired; channel supervisor disabled"
271 );
272 }
273
274 let socket_client_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
277 let need_rpc_ctx = registry.has_socket_start() || registry.has_wss_start();
278
279 let rpc_ctx = if need_rpc_ctx {
280 use crate::rpc::context::RpcContext;
281 use crate::rpc::session::SessionStore;
282 use zeroclaw_infra::session_queue::SessionActorQueue;
283
284 let session_queue = std::sync::Arc::new(SessionActorQueue::new(32, 30, 600));
285 let sessions = std::sync::Arc::new(SessionStore::new(64, session_queue.clone()));
286
287 {
288 let reaper_queue = std::sync::Arc::clone(&session_queue);
289 zeroclaw_spawn::spawn!(async move {
290 const TICK: std::time::Duration = std::time::Duration::from_secs(60);
291 let mut interval = tokio::time::interval(TICK);
292 interval.tick().await;
293 loop {
294 interval.tick().await;
295 let queue_evicted = reaper_queue.evict_idle().await;
296 if queue_evicted > 0 {
297 let span = ::zeroclaw_log::info_span!(
298 target: "zeroclaw_log_internal_scope",
299 "zeroclaw_scope",
300 channel = "rpc",
301 );
302 let _guard = span.enter();
303 ::zeroclaw_log::record!(
304 INFO,
305 ::zeroclaw_log::Event::new(
306 module_path!(),
307 ::zeroclaw_log::Action::Note,
308 )
309 .with_category(::zeroclaw_log::EventCategory::Agent)
310 .with_attrs(::serde_json::json!({
311 "evicted_queue_slots": queue_evicted,
312 })),
313 "Session queue: released idle actor-queue slots"
314 );
315 crate::util::release_freed_heap();
316 }
317 }
318 });
319 }
320 let session_backend = zeroclaw_infra::make_session_backend(
321 &config.data_dir,
322 &config.channels.session_backend,
323 )
324 .ok();
325
326 let rpc_memory: Option<std::sync::Arc<dyn zeroclaw_api::memory_traits::Memory>> = if config
329 .agents
330 .is_empty()
331 {
332 None
333 } else {
334 match zeroclaw_memory::create_memory_with_storage_and_routes(
335 &config.memory,
336 &config.embedding_routes,
337 config.resolve_active_storage(),
338 &config.data_dir,
339 None,
340 ) {
341 Ok(mem) => Some(std::sync::Arc::from(mem)),
342 Err(_e) => {
343 ::zeroclaw_log::record!(
344 WARN,
345 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
346 "RPC memory subsystem unavailable"
347 );
348 None
349 }
350 }
351 };
352
353 let acp_session_store: Option<
357 std::sync::Arc<zeroclaw_infra::acp_session_store::AcpSessionStore>,
358 > = match zeroclaw_infra::acp_session_store::AcpSessionStore::new(&config.data_dir) {
359 Ok(s) => Some(std::sync::Arc::new(s)),
360 Err(e) => {
361 ::zeroclaw_log::record!(
362 WARN,
363 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
364 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
365 .with_attrs(::serde_json::json!({"error": e.to_string()})),
366 "Failed to open ACP session store at daemon boot"
367 );
368 None
369 }
370 };
371
372 Some(std::sync::Arc::new(RpcContext {
373 config: std::sync::Arc::new(parking_lot::RwLock::new(config.clone())),
374 sessions,
375 session_backend,
376 memory: rpc_memory,
377 cost_tracker: None, event_tx: Some(event_tx.clone()),
379 reload_tx: Some(reload_tx.clone()),
380 approval_pending: std::sync::Arc::new(
381 crate::rpc::context::ApprovalPendingMap::default(),
382 ),
383 tui_registry,
384 acp_session_store,
385 }))
386 } else {
387 None
388 };
389
390 if let Some(socket_start) = registry.take_socket_start() {
392 let rpc_ctx = rpc_ctx
393 .clone()
394 .expect("rpc_ctx built when socket_start is Some");
395 let socket_start = std::sync::Arc::new(socket_start);
396 let socket_cancel = channels_cancel.clone();
397 let count = socket_client_count.clone();
398 handles.push(spawn_component_supervisor(
399 "socket",
400 initial_backoff,
401 max_backoff,
402 move || {
403 let ctx = rpc_ctx.clone();
404 let start = socket_start.clone();
405 let cancel = socket_cancel.clone();
406 let count = count.clone();
407 async move { start(ctx, cancel, count).await }
408 },
409 ));
410 }
411
412 if let Some(wss_start) = registry.take_wss_start() {
414 let rpc_ctx = rpc_ctx
415 .clone()
416 .expect("rpc_ctx built when wss_start is Some");
417 let wss_start = std::sync::Arc::new(wss_start);
418 let wss_cancel = channels_cancel.clone();
419 let count = socket_client_count.clone();
420 handles.push(spawn_component_supervisor(
421 "wss",
422 initial_backoff,
423 max_backoff,
424 move || {
425 let ctx = rpc_ctx.clone();
426 let start = wss_start.clone();
427 let cancel = wss_cancel.clone();
428 let count = count.clone();
429 async move { start(ctx, cancel, count).await }
430 },
431 ));
432 }
433
434 if let Some(mqtt_start) = registry.take_mqtt_start() {
436 let active_mqtt: std::collections::HashSet<String> = config
437 .agents
438 .values()
439 .filter(|a| a.enabled)
440 .flat_map(|a| a.channels.iter().map(|c| c.as_str().to_string()))
441 .collect();
442 let mut mqtt_started = false;
443 for (alias, mqtt_config) in &config.channels.mqtt {
444 if !active_mqtt.contains(&format!("mqtt.{alias}")) {
445 continue;
446 }
447 let mqtt_cfg = mqtt_config.clone();
448 let mqtt_start = std::sync::Arc::new(mqtt_start);
449 handles.push(spawn_component_supervisor(
450 "mqtt",
451 initial_backoff,
452 max_backoff,
453 move || {
454 let cfg = mqtt_cfg.clone();
455 let start = mqtt_start.clone();
456 async move { start(cfg).await }
457 },
458 ));
459 mqtt_started = true;
460 break;
461 }
462 if !mqtt_started {
463 crate::health::mark_component_ok("mqtt");
464 }
465 } else {
466 crate::health::mark_component_ok("mqtt");
467 }
468
469 if config.heartbeat.enabled {
470 let heartbeat_cfg = config.clone();
471 handles.push(spawn_component_supervisor(
472 "heartbeat",
473 initial_backoff,
474 max_backoff,
475 move || {
476 let cfg = heartbeat_cfg.clone();
477 async move { Box::pin(run_heartbeat_worker(cfg)).await }
478 },
479 ));
480 }
481
482 if config.scheduler.enabled {
483 let scheduler_cfg = config.clone();
484 let scheduler_event_tx = event_tx.clone();
485 handles.push(spawn_component_supervisor(
486 "scheduler",
487 initial_backoff,
488 max_backoff,
489 move || {
490 let cfg = scheduler_cfg.clone();
491 let tx = scheduler_event_tx.clone();
492 async move { Box::pin(crate::cron::scheduler::run(cfg, Some(tx))).await }
493 },
494 ));
495 } else {
496 crate::health::mark_component_ok("scheduler");
497 ::zeroclaw_log::record!(
498 INFO,
499 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
500 "Cron disabled; scheduler supervisor not started"
501 );
502 }
503
504 println!("🧠 ZeroClaw daemon started");
505 println!(" Gateway: http://{host}:{port}");
506 println!(
507 " Socket: {}",
508 crate::rpc::local::socket_path(&config).display()
509 );
510 println!(" Components: gateway, channels, heartbeat, scheduler");
511 if config.gateway.require_pairing {
512 println!(" Pairing: enabled (code appears in gateway output above)");
513 }
514 println!(" Ctrl+C or SIGTERM to stop");
515
516 let exit = wait_for_exit_signal(reload_rx, ephemeral, socket_client_count).await?;
518 crate::health::mark_component_error(
519 "daemon",
520 match exit {
521 DaemonExit::Shutdown => "shutdown requested",
522 DaemonExit::Reload => "reload requested",
523 },
524 );
525
526 channels_cancel.cancel();
530 for handle in &handles {
531 handle.abort();
532 }
533 for handle in handles {
534 let _ = handle.await;
535 }
536
537 #[cfg(all(target_os = "linux", target_env = "gnu"))]
538 unsafe {
539 libc::malloc_trim(0);
540 }
541
542 Ok(exit)
543}
544
545pub fn state_file_path(config: &Config) -> PathBuf {
546 config
547 .config_path
548 .parent()
549 .map_or_else(|| PathBuf::from("."), PathBuf::from)
550 .join("state")
551 .join("daemon_state.json")
552}
553
554fn spawn_state_writer(config: Config) -> JoinHandle<()> {
555 zeroclaw_spawn::spawn!(async move {
556 let path = state_file_path(&config);
557 if let Some(parent) = path.parent() {
558 let _ = tokio::fs::create_dir_all(parent).await;
559 }
560
561 let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS));
562 loop {
563 interval.tick().await;
564 let mut json = crate::health::snapshot_json();
565 if let Some(obj) = json.as_object_mut() {
566 obj.insert(
567 "written_at".into(),
568 serde_json::json!(Utc::now().to_rfc3339()),
569 );
570 }
571 let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec());
572 let _ = tokio::fs::write(&path, data).await;
573 }
574 })
575}
576
577fn spawn_component_supervisor<F, Fut>(
578 name: &'static str,
579 initial_backoff_secs: u64,
580 max_backoff_secs: u64,
581 mut run_component: F,
582) -> JoinHandle<()>
583where
584 F: FnMut() -> Fut + Send + 'static,
585 Fut: Future<Output = Result<()>> + Send + 'static,
586{
587 zeroclaw_spawn::spawn!(async move {
588 let mut backoff = initial_backoff_secs.max(1);
589 let max_backoff = max_backoff_secs.max(backoff);
590
591 loop {
592 crate::health::mark_component_ok(name);
593 match run_component().await {
594 Ok(()) => {
595 crate::health::mark_component_error(name, "component exited unexpectedly");
596 ::zeroclaw_log::record!(
597 WARN,
598 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
599 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
600 .with_attrs(::serde_json::json!({"name": name})),
601 &format!("Daemon component '{name}' exited unexpectedly")
602 );
603 backoff = initial_backoff_secs.max(1);
605 }
606 Err(e) => {
607 crate::health::mark_component_error(name, e.to_string());
608 ::zeroclaw_log::record!(
609 ERROR,
610 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
611 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
612 .with_attrs(
613 ::serde_json::json!({"error": format!("{}", e), "name": name})
614 ),
615 &format!("Daemon component '{name}' failed: {e}")
616 );
617 }
618 }
619
620 crate::health::bump_component_restart(name);
621 tokio::time::sleep(Duration::from_secs(backoff)).await;
622 backoff = backoff.saturating_mul(2).min(max_backoff);
624 }
625 })
626}
627
628async fn run_heartbeat_worker(config: Config) -> Result<()> {
629 use crate::heartbeat::engine::{
630 HeartbeatEngine, HeartbeatTask, TaskPriority, TaskStatus, compute_adaptive_interval,
631 };
632 use std::sync::Arc;
633
634 let agent_alias = config.heartbeat.agent.trim().to_string();
635 if agent_alias.is_empty() {
636 anyhow::bail!(
637 "heartbeat worker requires `[heartbeat] agent = \"<alias>\"` naming a configured agent"
638 );
639 }
640 if config.agent(&agent_alias).is_none() {
641 anyhow::bail!(
642 "[heartbeat] agent = {agent_alias:?} is not configured ([agents.{agent_alias}] missing)"
643 );
644 }
645
646 let observer: std::sync::Arc<dyn crate::observability::Observer> =
647 std::sync::Arc::from(crate::observability::create_observer(&config.observability));
648 let engine = HeartbeatEngine::new(config.heartbeat.clone(), config.data_dir.clone(), observer);
649 let metrics = engine.metrics();
650 let delivery = resolve_heartbeat_delivery(&config)?;
651 let two_phase = config.heartbeat.two_phase;
652 let adaptive = config.heartbeat.adaptive;
653 let start_time = std::time::Instant::now();
654
655 let deadman_timeout = config.heartbeat.deadman_timeout_minutes;
657 if deadman_timeout > 0 {
658 let dm_metrics = Arc::clone(&metrics);
659 let dm_config = config.clone();
660 let dm_delivery = delivery.clone();
661 zeroclaw_spawn::spawn!(async move {
662 let check_interval = Duration::from_secs(60);
663 let timeout = chrono::Duration::minutes(i64::from(deadman_timeout));
664 loop {
665 tokio::time::sleep(check_interval).await;
666 let last_tick = dm_metrics.lock().last_tick_at;
667 if let Some(last) = last_tick
668 && chrono::Utc::now() - last > timeout
669 {
670 let alert = format!(
671 "⚠️ Heartbeat dead-man's switch: no tick in {deadman_timeout} minutes"
672 );
673 let (channel, target) = if let Some(ch) = &dm_config.heartbeat.deadman_channel {
674 let to = dm_config
675 .heartbeat
676 .deadman_to
677 .as_deref()
678 .or(dm_config.heartbeat.to.as_deref())
679 .unwrap_or_default();
680 (ch.clone(), to.to_string())
681 } else if let Some((ch, to)) = &dm_delivery {
682 (ch.clone(), to.clone())
683 } else {
684 continue;
685 };
686 let delivery_fut = crate::cron::scheduler::deliver_announcement(
687 &dm_config, &channel, &target, None, &alert,
688 );
689 match tokio::time::timeout(Duration::from_secs(30), delivery_fut).await {
690 Ok(Err(e)) => {
691 ::zeroclaw_log::record!(
692 WARN,
693 ::zeroclaw_log::Event::new(
694 module_path!(),
695 ::zeroclaw_log::Action::Note
696 )
697 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
698 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
699 "Deadman alert delivery failed"
700 );
701 }
702 Err(_) => {
703 ::zeroclaw_log::record!(
704 WARN,
705 ::zeroclaw_log::Event::new(
706 module_path!(),
707 ::zeroclaw_log::Action::Note
708 )
709 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
710 "Deadman alert delivery timed out (30s)"
711 );
712 }
713 Ok(Ok(())) => {}
714 }
715 }
716 }
717 });
718 }
719
720 let base_interval = config.heartbeat.interval_minutes.max(1);
721 let mut sleep_mins = base_interval;
722
723 loop {
724 tokio::time::sleep(Duration::from_secs(u64::from(sleep_mins) * 60)).await;
725
726 {
728 let mut m = metrics.lock();
729 m.uptime_secs = start_time.elapsed().as_secs();
730 }
731
732 let tick_start = std::time::Instant::now();
733
734 let mut tasks = engine.collect_runnable_tasks().await?;
736 let has_high_priority = tasks.iter().any(|t| t.priority == TaskPriority::High);
737
738 if tasks.is_empty() {
739 if let Some(fallback) = config
740 .heartbeat
741 .message
742 .as_deref()
743 .map(str::trim)
744 .filter(|m| !m.is_empty())
745 {
746 tasks.push(HeartbeatTask {
747 text: fallback.to_string(),
748 priority: TaskPriority::Medium,
749 status: TaskStatus::Active,
750 });
751 } else {
752 #[allow(clippy::cast_precision_loss)]
753 let elapsed = tick_start.elapsed().as_millis() as f64;
754 metrics.lock().record_success(elapsed);
755 continue;
756 }
757 }
758
759 let tasks_to_run = if two_phase {
761 let decision_prompt = format!(
762 "[Heartbeat Task | decision] {}",
763 HeartbeatEngine::build_decision_prompt(&tasks),
764 );
765 let phase1_fut = Box::pin(crate::agent::run(
766 config.clone(),
767 &agent_alias,
768 Some(decision_prompt),
769 None,
770 None,
771 Some(0.0),
772 vec![],
773 false,
774 None,
775 None,
776 crate::agent::loop_::AgentRunOverrides::default(),
777 ));
778 let phase1_result = if config.heartbeat.task_timeout_secs > 0 {
779 match tokio::time::timeout(
780 Duration::from_secs(config.heartbeat.task_timeout_secs),
781 phase1_fut,
782 )
783 .await
784 {
785 Ok(r) => r,
786 Err(_) => {
787 ::zeroclaw_log::record!(
788 WARN,
789 ::zeroclaw_log::Event::new(
790 module_path!(),
791 ::zeroclaw_log::Action::Timeout
792 )
793 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
794 .with_attrs(::serde_json::json!({
795 "phase": "phase1_decision",
796 "timeout_secs": config.heartbeat.task_timeout_secs,
797 })),
798 "heartbeat: phase1 decision timed out"
799 );
800 Err(anyhow::Error::msg(format!(
801 "Phase 1 decision timed out ({}s)",
802 config.heartbeat.task_timeout_secs
803 )))
804 }
805 }
806 } else {
807 phase1_fut.await
808 };
809 match phase1_result {
810 Ok(response) => {
811 let indices = HeartbeatEngine::parse_decision_response(&response, tasks.len());
812 if indices.is_empty() {
813 ::zeroclaw_log::record!(
814 INFO,
815 ::zeroclaw_log::Event::new(
816 module_path!(),
817 ::zeroclaw_log::Action::Note
818 ),
819 "heartbeat phase 1: skip (nothing to do)"
820 );
821 crate::health::mark_component_ok("heartbeat");
822 #[allow(clippy::cast_precision_loss)]
823 let elapsed = tick_start.elapsed().as_millis() as f64;
824 metrics.lock().record_success(elapsed);
825 continue;
826 }
827 ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"selected": indices.len(), "total": tasks.len()})), "heartbeat phase 1: running task subset");
828 indices
829 .into_iter()
830 .filter_map(|i| tasks.get(i).cloned())
831 .collect()
832 }
833 Err(e) => {
834 ::zeroclaw_log::record!(
835 WARN,
836 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
837 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
838 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
839 "heartbeat phase 1 failed; running all tasks"
840 );
841 tasks
842 }
843 }
844 } else {
845 tasks
846 };
847
848 let session_context = if config.heartbeat.load_session_context {
852 load_heartbeat_session_context(&config)
853 } else {
854 None
855 };
856
857 let heartbeat_memory: Option<Box<dyn zeroclaw_memory::Memory>> =
859 zeroclaw_memory::create_memory(
860 &config.memory,
861 &config.data_dir,
862 config
863 .model_provider_for_agent(&agent_alias)
864 .and_then(|e| e.api_key.as_deref()),
865 )
866 .ok();
867
868 let mut tick_had_error = false;
869 for task in &tasks_to_run {
870 let task_start = std::time::Instant::now();
871 let task_prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text);
872
873 let memory_context = if let Some(ref mem) = heartbeat_memory {
877 match mem.recall(&task.text, 5, None, None, None).await {
878 Ok(entries) if !entries.is_empty() => {
879 let ctx: String = entries
880 .iter()
881 .filter(|e| {
882 !matches!(
883 e.category,
884 zeroclaw_memory::traits::MemoryCategory::Conversation
885 )
886 })
887 .map(|e| format!("- {}: {}", e.key, e.content))
888 .collect::<Vec<_>>()
889 .join("\n");
890 if ctx.is_empty() {
891 None
892 } else {
893 Some(format!(
894 "{MEMORY_CONTEXT_OPEN}\n{ctx}\n{MEMORY_CONTEXT_CLOSE}\n\n"
895 ))
896 }
897 }
898 _ => None,
899 }
900 } else {
901 None
902 };
903
904 let prompt = match (&session_context, &memory_context) {
905 (Some(sc), Some(mc)) => format!("{mc}\n{sc}\n\n{task_prompt}"),
906 (Some(sc), None) => format!("{sc}\n\n{task_prompt}"),
907 (None, Some(mc)) => format!("{mc}\n\n{task_prompt}"),
908 (None, None) => task_prompt,
909 };
910 let temp: Option<f64> = config
911 .model_provider_for_agent(&agent_alias)
912 .and_then(|e| e.temperature);
913 let phase2_fut = Box::pin(crate::agent::run(
914 config.clone(),
915 &agent_alias,
916 Some(prompt),
917 None,
918 None,
919 temp,
920 vec![],
921 false,
922 None,
923 None,
924 crate::agent::loop_::AgentRunOverrides::default(),
925 ));
926 let phase2_result = if config.heartbeat.task_timeout_secs > 0 {
927 match tokio::time::timeout(
928 Duration::from_secs(config.heartbeat.task_timeout_secs),
929 phase2_fut,
930 )
931 .await
932 {
933 Ok(r) => r,
934 Err(_) => {
935 ::zeroclaw_log::record!(
936 WARN,
937 ::zeroclaw_log::Event::new(
938 module_path!(),
939 ::zeroclaw_log::Action::Timeout
940 )
941 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
942 .with_attrs(::serde_json::json!({
943 "phase": "phase2_heartbeat",
944 "timeout_secs": config.heartbeat.task_timeout_secs,
945 })),
946 "heartbeat task timed out"
947 );
948 Err(anyhow::Error::msg(format!(
949 "Heartbeat task timed out ({}s)",
950 config.heartbeat.task_timeout_secs
951 )))
952 }
953 }
954 } else {
955 phase2_fut.await
956 };
957 match phase2_result {
958 Ok(output) => {
959 crate::health::mark_component_ok("heartbeat");
960 #[allow(clippy::cast_possible_truncation)]
961 let duration_ms = task_start.elapsed().as_millis() as i64;
962 let now = chrono::Utc::now();
963 let _ = crate::heartbeat::store::record_run(
964 &config.data_dir,
965 &task.text,
966 &task.priority.to_string(),
967 now - chrono::Duration::milliseconds(duration_ms),
968 now,
969 "ok",
970 Some(output.as_str()),
971 duration_ms,
972 config.heartbeat.max_run_history,
973 );
974 if config.memory.auto_save
976 && output.chars().count() >= 50
977 && let Some(ref mem) = heartbeat_memory
978 {
979 let key = format!("heartbeat_{}", uuid::Uuid::new_v4());
980 let summary = if output.len() > 500 {
981 let mut end = 500;
983 while end > 0 && !output.is_char_boundary(end) {
984 end -= 1;
985 }
986 &output[..end]
987 } else {
988 &output
989 };
990 let _ = mem
991 .store(
992 &key,
993 &format!("Heartbeat task '{}': {}", task.text, summary),
994 zeroclaw_memory::MemoryCategory::Daily,
995 None,
996 )
997 .await;
998 }
999
1000 let announcement = if output.trim().is_empty() {
1001 format!("💓 heartbeat task completed: {}", task.text)
1002 } else {
1003 output
1004 };
1005 if let Some((channel, target)) = &delivery {
1006 let delivery_result = tokio::time::timeout(
1007 Duration::from_secs(30),
1008 crate::cron::scheduler::deliver_announcement(
1009 &config,
1010 channel,
1011 target,
1012 None,
1013 &announcement,
1014 ),
1015 )
1016 .await;
1017 match delivery_result {
1018 Ok(Err(e)) => {
1019 crate::health::mark_component_error(
1020 "heartbeat",
1021 format!("delivery failed: {e}"),
1022 );
1023 ::zeroclaw_log::record!(
1024 WARN,
1025 ::zeroclaw_log::Event::new(
1026 module_path!(),
1027 ::zeroclaw_log::Action::Note
1028 )
1029 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1030 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1031 "Heartbeat delivery failed"
1032 );
1033 }
1034 Err(_) => {
1035 crate::health::mark_component_error(
1036 "heartbeat",
1037 "delivery timed out (30s)".to_string(),
1038 );
1039 ::zeroclaw_log::record!(
1040 WARN,
1041 ::zeroclaw_log::Event::new(
1042 module_path!(),
1043 ::zeroclaw_log::Action::Note
1044 )
1045 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1046 "Heartbeat delivery timed out (30s)"
1047 );
1048 }
1049 Ok(Ok(())) => {}
1050 }
1051 }
1052 }
1053 Err(e) => {
1054 tick_had_error = true;
1055 #[allow(clippy::cast_possible_truncation)]
1056 let duration_ms = task_start.elapsed().as_millis() as i64;
1057 let now = chrono::Utc::now();
1058 let _ = crate::heartbeat::store::record_run(
1059 &config.data_dir,
1060 &task.text,
1061 &task.priority.to_string(),
1062 now - chrono::Duration::milliseconds(duration_ms),
1063 now,
1064 "error",
1065 Some(&e.to_string()),
1066 duration_ms,
1067 config.heartbeat.max_run_history,
1068 );
1069 crate::health::mark_component_error("heartbeat", e.to_string());
1070 ::zeroclaw_log::record!(
1071 WARN,
1072 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1073 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1074 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1075 "Heartbeat task failed"
1076 );
1077 }
1078 }
1079 }
1080
1081 #[allow(clippy::cast_precision_loss)]
1083 let tick_elapsed = tick_start.elapsed().as_millis() as f64;
1084 {
1085 let mut m = metrics.lock();
1086 if tick_had_error {
1087 m.record_failure(tick_elapsed);
1088 } else {
1089 m.record_success(tick_elapsed);
1090 }
1091 }
1092
1093 if adaptive {
1095 let failures = metrics.lock().consecutive_failures;
1096 sleep_mins = compute_adaptive_interval(
1097 base_interval,
1098 config.heartbeat.min_interval_minutes,
1099 config.heartbeat.max_interval_minutes,
1100 failures,
1101 has_high_priority,
1102 );
1103 } else {
1104 sleep_mins = base_interval;
1105 }
1106 }
1107}
1108
1109fn resolve_heartbeat_delivery(config: &Config) -> Result<Option<(String, String)>> {
1111 let channel = config
1112 .heartbeat
1113 .target
1114 .as_deref()
1115 .map(str::trim)
1116 .filter(|value| !value.is_empty());
1117 let target = config
1118 .heartbeat
1119 .to
1120 .as_deref()
1121 .map(str::trim)
1122 .filter(|value| !value.is_empty());
1123
1124 match (channel, target) {
1125 (Some(channel), Some(target)) => {
1127 validate_heartbeat_channel_config(config, channel)?;
1128 Ok(Some((channel.to_string(), target.to_string())))
1129 }
1130 (Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
1132 (None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
1133 (None, None) => Ok(auto_detect_heartbeat_channel(config)),
1135 }
1136}
1137
1138const HEARTBEAT_SESSION_CONTEXT_MESSAGES: usize = 20;
1148
1149fn load_heartbeat_session_context(config: &Config) -> Option<String> {
1150 use zeroclaw_providers::traits::ChatMessage;
1151
1152 let channel = config
1153 .heartbeat
1154 .target
1155 .as_deref()
1156 .map(str::trim)
1157 .filter(|v| !v.is_empty())?;
1158 let to = config
1159 .heartbeat
1160 .to
1161 .as_deref()
1162 .map(str::trim)
1163 .filter(|v| !v.is_empty())?;
1164
1165 if channel.contains('/') || channel.contains('\\') || to.contains('/') || to.contains('\\') {
1166 ::zeroclaw_log::record!(
1167 WARN,
1168 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1169 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1170 "heartbeat session context: channel/to contains path separators, skipping"
1171 );
1172 return None;
1173 }
1174
1175 let sessions_dir = config.data_dir.join("sessions");
1176
1177 let prefix = format!("{channel}_");
1180 let suffix = format!("_{to}.jsonl");
1181 let exact = format!("{channel}_{to}.jsonl");
1182 let mid_prefix = format!("{channel}_{to}_");
1183
1184 let path = std::fs::read_dir(&sessions_dir)
1185 .ok()?
1186 .filter_map(|e| e.ok())
1187 .filter(|e| {
1188 let name = e.file_name();
1189 let name = name.to_string_lossy();
1190 name.ends_with(".jsonl")
1191 && (name == exact
1192 || (name.starts_with(&prefix) && name.ends_with(&suffix))
1193 || name.starts_with(&mid_prefix))
1194 })
1195 .max_by_key(|e| {
1196 e.metadata()
1197 .and_then(|m| m.modified())
1198 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
1199 })
1200 .map(|e| e.path())?;
1201
1202 if !path.exists() {
1203 ::zeroclaw_log::record!(
1204 DEBUG,
1205 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1206 .with_attrs(::serde_json::json!({"channel": channel, "to": to})),
1207 "heartbeat session context: no session file found"
1208 );
1209 return None;
1210 }
1211
1212 let messages = load_jsonl_messages(&path);
1213 if messages.is_empty() {
1214 return None;
1215 }
1216
1217 let recent: Vec<&ChatMessage> = messages
1218 .iter()
1219 .filter(|m| m.role == "user" || m.role == "assistant")
1220 .rev()
1221 .take(HEARTBEAT_SESSION_CONTEXT_MESSAGES)
1222 .collect::<Vec<_>>()
1223 .into_iter()
1224 .rev()
1225 .collect();
1226
1227 let has_user_message = recent.iter().any(|m| m.role == "user");
1232 if !has_user_message {
1233 ::zeroclaw_log::record!(
1234 DEBUG,
1235 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1236 "💓 Heartbeat session context: no user messages in recent history — skipping"
1237 );
1238 return None;
1239 }
1240
1241 let last_message_age = std::fs::metadata(&path)
1243 .ok()
1244 .and_then(|m| m.modified().ok())
1245 .and_then(|mtime| mtime.elapsed().ok());
1246
1247 let silence_note = match last_message_age {
1248 Some(age) => {
1249 let mins = age.as_secs() / 60;
1250 if mins < 60 {
1251 format!("(last message ~{mins} minutes ago)\n")
1252 } else {
1253 let hours = mins / 60;
1254 let rem = mins % 60;
1255 if rem == 0 {
1256 format!("(last message ~{hours}h ago)\n")
1257 } else {
1258 format!("(last message ~{hours}h {rem}m ago)\n")
1259 }
1260 }
1261 }
1262 None => String::new(),
1263 };
1264
1265 ::zeroclaw_log::record!(
1266 DEBUG,
1267 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1268 &format!(
1269 "💓 Heartbeat session context: {} messages from {}, silence: {}",
1270 recent.len(),
1271 path.display().to_string(),
1272 silence_note.trim()
1273 )
1274 );
1275
1276 let mut ctx = format!(
1277 "[Recent conversation history — use this for context when composing your message] {silence_note}",
1278 );
1279 for msg in &recent {
1280 let label = if msg.role == "user" { "User" } else { "You" };
1281 let content = if msg.content.len() > 500 {
1284 let truncate_at = msg
1285 .content
1286 .char_indices()
1287 .map(|(i, _)| i)
1288 .take_while(|&i| i <= 500)
1289 .last()
1290 .unwrap_or(0);
1291 format!("{}…", &msg.content[..truncate_at])
1292 } else {
1293 msg.content.clone()
1294 };
1295 ctx.push_str(label);
1296 ctx.push_str(": ");
1297 ctx.push_str(&content);
1298 ctx.push('\n');
1299 }
1300
1301 Some(ctx)
1302}
1303
1304fn load_jsonl_messages(path: &std::path::Path) -> Vec<zeroclaw_providers::traits::ChatMessage> {
1308 use std::collections::VecDeque;
1309 use std::io::BufRead;
1310
1311 let file = match std::fs::File::open(path) {
1312 Ok(f) => f,
1313 Err(_) => return Vec::new(),
1314 };
1315 let reader = std::io::BufReader::new(file);
1316 let mut window: VecDeque<zeroclaw_providers::traits::ChatMessage> =
1317 VecDeque::with_capacity(HEARTBEAT_SESSION_CONTEXT_MESSAGES + 1);
1318 for line in reader.lines() {
1319 let Ok(line) = line else { continue };
1320 let trimmed = line.trim();
1321 if trimmed.is_empty() {
1322 continue;
1323 }
1324 if let Ok(msg) = serde_json::from_str::<zeroclaw_providers::traits::ChatMessage>(trimmed) {
1325 window.push_back(msg);
1326 if window.len() > HEARTBEAT_SESSION_CONTEXT_MESSAGES {
1327 window.pop_front();
1328 }
1329 }
1330 }
1331 window.into_iter().collect()
1332}
1333
1334fn auto_detect_heartbeat_channel(config: &Config) -> Option<(String, String)> {
1337 if !config.channels.telegram.is_empty() {
1342 for alias in config.channels.telegram.keys() {
1343 let peers = config.channel_external_peers("telegram", alias);
1344 if let Some(target) = peers.into_iter().next() {
1345 return Some(("telegram".to_string(), target));
1346 }
1347 }
1348 }
1349 if !config.channels.discord.is_empty() {
1350 return None;
1352 }
1353 if !config.channels.slack.is_empty() {
1354 return None;
1356 }
1357 if !config.channels.mattermost.is_empty() {
1358 return None;
1360 }
1361 None
1362}
1363
1364fn validate_heartbeat_channel_config(config: &Config, channel: &str) -> Result<()> {
1365 match channel.to_ascii_lowercase().as_str() {
1366 "telegram" => {
1367 if config.channels.telegram.is_empty() {
1368 anyhow::bail!(
1369 "heartbeat.target is set to telegram but channels.telegram is not configured"
1370 );
1371 }
1372 }
1373 "discord" => {
1374 if config.channels.discord.is_empty() {
1375 anyhow::bail!(
1376 "heartbeat.target is set to discord but channels.discord is not configured"
1377 );
1378 }
1379 }
1380 "slack" => {
1381 if config.channels.slack.is_empty() {
1382 anyhow::bail!(
1383 "heartbeat.target is set to slack but channels.slack is not configured"
1384 );
1385 }
1386 }
1387 "mattermost" => {
1388 if config.channels.mattermost.is_empty() {
1389 anyhow::bail!(
1390 "heartbeat.target is set to mattermost but channels.mattermost is not configured"
1391 );
1392 }
1393 }
1394 other => anyhow::bail!("unsupported heartbeat.target channel: {other}"),
1395 }
1396
1397 Ok(())
1398}
1399
1400fn has_supervised_channels(config: &Config) -> bool {
1401 config.channels.has_any_enabled()
1407}
1408
1409#[cfg(test)]
1413mod tests {
1414 use super::*;
1415 use tempfile::TempDir;
1416
1417 fn test_config(tmp: &TempDir) -> Config {
1418 let config = Config {
1419 data_dir: tmp.path().join("data"),
1420 config_path: tmp.path().join("config.toml"),
1421 ..Config::default()
1422 };
1423 std::fs::create_dir_all(&config.data_dir).unwrap();
1424 config
1425 }
1426
1427 #[test]
1428 fn state_file_path_uses_config_state_directory() {
1429 let tmp = TempDir::new().unwrap();
1430 let config = test_config(&tmp);
1431
1432 let path = state_file_path(&config);
1433 assert_eq!(path, tmp.path().join("state").join("daemon_state.json"));
1434 }
1435
1436 #[tokio::test]
1437 async fn supervisor_marks_error_and_restart_on_failure() {
1438 let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async {
1439 anyhow::bail!("boom")
1440 });
1441
1442 tokio::time::sleep(Duration::from_millis(50)).await;
1443 handle.abort();
1444 let _ = handle.await;
1445
1446 let snapshot = crate::health::snapshot_json();
1447 let component = &snapshot["components"]["daemon-test-fail"];
1448 assert_eq!(component["status"], "error");
1449 assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
1450 assert!(
1451 component["last_error"]
1452 .as_str()
1453 .unwrap_or("")
1454 .contains("boom")
1455 );
1456 }
1457
1458 #[tokio::test]
1459 async fn supervisor_marks_unexpected_exit_as_error() {
1460 let handle = spawn_component_supervisor("daemon-test-exit", 1, 1, || async { Ok(()) });
1461
1462 tokio::time::sleep(Duration::from_millis(50)).await;
1463 handle.abort();
1464 let _ = handle.await;
1465
1466 let snapshot = crate::health::snapshot_json();
1467 let component = &snapshot["components"]["daemon-test-exit"];
1468 assert_eq!(component["status"], "error");
1469 assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
1470 assert!(
1471 component["last_error"]
1472 .as_str()
1473 .unwrap_or("")
1474 .contains("component exited unexpectedly")
1475 );
1476 }
1477
1478 #[test]
1479 fn detects_no_supervised_channels() {
1480 let config = Config::default();
1481 assert!(!has_supervised_channels(&config));
1482 }
1483
1484 #[test]
1485 fn all_disabled_channels_not_supervised() {
1486 let mut config = Config::default();
1492 config.channels.discord.insert(
1493 "clamps".to_string(),
1494 zeroclaw_config::schema::DiscordConfig {
1495 enabled: false,
1496 bot_token: "token".into(),
1497 guild_ids: vec![],
1498 channel_ids: vec![],
1499 listen_to_bots: false,
1500 mention_only: true,
1501 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1502 draft_update_interval_ms: 0,
1503 multi_message_delay_ms: 0,
1504 stall_timeout_secs: 0,
1505 interrupt_on_new_message: false,
1506 archive: false,
1507 approval_timeout_secs: 0,
1508 proxy_url: None,
1509 excluded_tools: vec![],
1510 reply_min_interval_secs: 0,
1511 reply_queue_depth_max: 0,
1512 },
1513 );
1514 config.channels.discord.insert(
1515 "glados".to_string(),
1516 zeroclaw_config::schema::DiscordConfig {
1517 enabled: false,
1518 bot_token: "token2".into(),
1519 guild_ids: vec![],
1520 channel_ids: vec![],
1521 listen_to_bots: false,
1522 mention_only: true,
1523 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1524 draft_update_interval_ms: 0,
1525 multi_message_delay_ms: 0,
1526 stall_timeout_secs: 0,
1527 interrupt_on_new_message: false,
1528 archive: false,
1529 approval_timeout_secs: 0,
1530 proxy_url: None,
1531 excluded_tools: vec![],
1532 reply_min_interval_secs: 0,
1533 reply_queue_depth_max: 0,
1534 },
1535 );
1536 assert!(!has_supervised_channels(&config));
1537 }
1538
1539 #[test]
1540 fn detects_supervised_channels_present() {
1541 let mut config = Config::default();
1542 config.channels.telegram.insert(
1543 "default".to_string(),
1544 zeroclaw_config::schema::TelegramConfig {
1545 enabled: true,
1546 bot_token: "token".into(),
1547 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1548 draft_update_interval_ms: 1000,
1549 interrupt_on_new_message: false,
1550 mention_only: false,
1551 ack_reactions: None,
1552 proxy_url: None,
1553 approval_timeout_secs: 120,
1554 excluded_tools: vec![],
1555 reply_min_interval_secs: 0,
1556 reply_queue_depth_max: 0,
1557 },
1558 );
1559 assert!(has_supervised_channels(&config));
1560 }
1561
1562 #[test]
1563 fn detects_dingtalk_as_supervised_channel() {
1564 let mut config = Config::default();
1565 config.channels.dingtalk.insert(
1566 "default".to_string(),
1567 zeroclaw_config::schema::DingTalkConfig {
1568 enabled: true,
1569 client_id: "client_id".into(),
1570 client_secret: "client_secret".into(),
1571 proxy_url: None,
1572 excluded_tools: vec![],
1573 },
1574 );
1575 assert!(has_supervised_channels(&config));
1576 }
1577
1578 #[test]
1579 fn detects_mattermost_as_supervised_channel() {
1580 let mut config = Config::default();
1581 config.channels.mattermost.insert(
1582 "default".to_string(),
1583 zeroclaw_config::schema::MattermostConfig {
1584 enabled: true,
1585 url: "https://mattermost.example.com".into(),
1586 bot_token: Some("token".into()),
1587 login_id: None,
1588 password: None,
1589 channel_ids: vec!["channel-id".into()],
1590 team_ids: vec![],
1591 discover_dms: None,
1592 thread_replies: Some(true),
1593 mention_only: Some(false),
1594 interrupt_on_new_message: false,
1595 proxy_url: None,
1596 excluded_tools: vec![],
1597 reply_min_interval_secs: 0,
1598 reply_queue_depth_max: 0,
1599 },
1600 );
1601 assert!(has_supervised_channels(&config));
1602 }
1603
1604 #[test]
1605 fn detects_qq_as_supervised_channel() {
1606 let mut config = Config::default();
1607 config.channels.qq.insert(
1608 "default".to_string(),
1609 zeroclaw_config::schema::QQConfig {
1610 enabled: true,
1611 app_id: "app-id".into(),
1612 app_secret: "app-secret".into(),
1613 proxy_url: None,
1614 excluded_tools: vec![],
1615 },
1616 );
1617 assert!(has_supervised_channels(&config));
1618 }
1619
1620 #[test]
1621 fn detects_nextcloud_talk_as_supervised_channel() {
1622 let mut config = Config::default();
1623 config.channels.nextcloud_talk.insert(
1624 "default".to_string(),
1625 zeroclaw_config::schema::NextcloudTalkConfig {
1626 enabled: true,
1627 base_url: "https://cloud.example.com".into(),
1628 app_token: "app-token".into(),
1629 webhook_secret: None,
1630 proxy_url: None,
1631 bot_name: None,
1632 excluded_tools: vec![],
1633 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1634 draft_update_interval_ms: 1000,
1635 },
1636 );
1637 assert!(has_supervised_channels(&config));
1638 }
1639
1640 #[test]
1641 fn webhook_only_config_is_supervised() {
1642 let mut config = Config::default();
1643 config.channels.webhook.insert(
1644 "default".to_string(),
1645 zeroclaw_config::schema::WebhookConfig {
1646 enabled: true,
1647 port: 8080,
1648 listen_path: None,
1649 send_url: None,
1650 send_method: None,
1651 auth_header: None,
1652 secret: None,
1653 excluded_tools: vec![],
1654 reply_min_interval_secs: 0,
1655 reply_queue_depth_max: 0,
1656 max_retries: None,
1657 retry_base_delay_ms: None,
1658 retry_max_delay_ms: None,
1659 },
1660 );
1661 assert!(has_supervised_channels(&config));
1662 }
1663
1664 #[test]
1665 fn resolve_delivery_none_when_unset() {
1666 let config = Config::default();
1667 let target = resolve_heartbeat_delivery(&config).unwrap();
1668 assert!(target.is_none());
1669 }
1670
1671 #[test]
1672 fn resolve_delivery_requires_to_field() {
1673 let mut config = Config::default();
1674 config.heartbeat.target = Some("telegram".into());
1675 let err = resolve_heartbeat_delivery(&config).unwrap_err();
1676 assert!(
1677 err.to_string()
1678 .contains("heartbeat.to is required when heartbeat.target is set")
1679 );
1680 }
1681
1682 #[test]
1683 fn resolve_delivery_requires_target_field() {
1684 let mut config = Config::default();
1685 config.heartbeat.to = Some("123456".into());
1686 let err = resolve_heartbeat_delivery(&config).unwrap_err();
1687 assert!(
1688 err.to_string()
1689 .contains("heartbeat.target is required when heartbeat.to is set")
1690 );
1691 }
1692
1693 #[test]
1694 fn resolve_delivery_rejects_unsupported_channel() {
1695 let mut config = Config::default();
1696 config.heartbeat.target = Some("email".into());
1697 config.heartbeat.to = Some("ops@example.com".into());
1698 let err = resolve_heartbeat_delivery(&config).unwrap_err();
1699 assert!(
1700 err.to_string()
1701 .contains("unsupported heartbeat.target channel")
1702 );
1703 }
1704
1705 #[test]
1706 fn resolve_delivery_requires_channel_configuration() {
1707 let mut config = Config::default();
1708 config.heartbeat.target = Some("telegram".into());
1709 config.heartbeat.to = Some("123456".into());
1710 let err = resolve_heartbeat_delivery(&config).unwrap_err();
1711 assert!(
1712 err.to_string()
1713 .contains("channels.telegram is not configured")
1714 );
1715 }
1716
1717 #[test]
1718 fn resolve_delivery_accepts_telegram_configuration() {
1719 let mut config = Config::default();
1720 config.heartbeat.target = Some("telegram".into());
1721 config.heartbeat.to = Some("123456".into());
1722 config.channels.telegram.insert(
1723 "default".to_string(),
1724 zeroclaw_config::schema::TelegramConfig {
1725 enabled: true,
1726 bot_token: "bot-token".into(),
1727 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1728 draft_update_interval_ms: 1000,
1729 interrupt_on_new_message: false,
1730 mention_only: false,
1731 ack_reactions: None,
1732 proxy_url: None,
1733 approval_timeout_secs: 120,
1734 excluded_tools: vec![],
1735 reply_min_interval_secs: 0,
1736 reply_queue_depth_max: 0,
1737 },
1738 );
1739
1740 let target = resolve_heartbeat_delivery(&config).unwrap();
1741 assert_eq!(target, Some(("telegram".to_string(), "123456".to_string())));
1742 }
1743
1744 #[test]
1745 fn auto_detect_telegram_when_configured() {
1746 use zeroclaw_config::multi_agent::{PeerGroupConfig, PeerUsername};
1747
1748 let mut config = Config::default();
1749 config.channels.telegram.insert(
1750 "default".to_string(),
1751 zeroclaw_config::schema::TelegramConfig {
1752 enabled: true,
1753 bot_token: "bot-token".into(),
1754 stream_mode: zeroclaw_config::schema::StreamMode::default(),
1755 draft_update_interval_ms: 1000,
1756 interrupt_on_new_message: false,
1757 mention_only: false,
1758 ack_reactions: None,
1759 proxy_url: None,
1760 approval_timeout_secs: 120,
1761 excluded_tools: vec![],
1762 reply_min_interval_secs: 0,
1763 reply_queue_depth_max: 0,
1764 },
1765 );
1766 config.peer_groups.insert(
1770 "telegram_default".to_string(),
1771 PeerGroupConfig {
1772 channel: "telegram".to_string(),
1773 external_peers: vec![PeerUsername::new("user123")],
1774 ..PeerGroupConfig::default()
1775 },
1776 );
1777
1778 let target = resolve_heartbeat_delivery(&config).unwrap();
1779 assert_eq!(
1780 target,
1781 Some(("telegram".to_string(), "user123".to_string()))
1782 );
1783 }
1784
1785 #[test]
1786 fn auto_detect_none_when_no_channels() {
1787 let config = Config::default();
1788 let target = auto_detect_heartbeat_channel(&config);
1789 assert!(target.is_none());
1790 }
1791
1792 #[cfg(unix)]
1795 #[tokio::test]
1796 async fn sighup_does_not_shut_down_daemon() {
1797 use libc;
1798 use tokio::time::{Duration, timeout};
1799
1800 let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1801 let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1802 let handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, false, count));
1803
1804 tokio::time::sleep(Duration::from_millis(50)).await;
1806
1807 unsafe { libc::raise(libc::SIGHUP) };
1809
1810 let result = timeout(Duration::from_millis(200), handle).await;
1812 assert!(
1813 result.is_err(),
1814 "wait_for_exit_signal should not return after SIGHUP"
1815 );
1816 }
1817
1818 #[tokio::test]
1821 async fn reload_channel_returns_reload() {
1822 use tokio::time::{Duration, timeout};
1823
1824 let (reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1825 let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1826 let handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, false, count));
1827 tokio::time::sleep(Duration::from_millis(50)).await;
1828 reload_tx.send(true).expect("send reload");
1829
1830 let result = timeout(Duration::from_secs(2), handle)
1831 .await
1832 .expect("wait_for_exit_signal should return after reload signal")
1833 .expect("task should not panic")
1834 .expect("signal handler should not error");
1835 assert_eq!(result, DaemonExit::Reload);
1836 }
1837
1838 #[tokio::test]
1839 async fn registry_gateway_starter_can_trigger_daemon_reload() {
1840 use tokio::time::{Duration, timeout};
1841
1842 let tmp = TempDir::new().unwrap();
1843 let config = test_config(&tmp);
1844 let expected_data_dir = config.data_dir.clone();
1845 let (seen_tx, mut seen_rx) = tokio::sync::mpsc::unbounded_channel();
1846
1847 let mut registry = DaemonRegistry::new();
1848 registry.register_gateway(Box::new(
1849 move |host, port, config, event_tx, reload_tx, tui_registry| {
1850 let seen_tx = seen_tx.clone();
1851 Box::pin(async move {
1852 let has_event_tx = event_tx.is_some();
1853 let has_reload_tx = reload_tx.is_some();
1854 let has_tui_registry = tui_registry.is_some();
1855 seen_tx
1856 .send((
1857 host,
1858 port,
1859 config.data_dir.clone(),
1860 has_event_tx,
1861 has_reload_tx,
1862 has_tui_registry,
1863 ))
1864 .expect("record gateway starter inputs");
1865 reload_tx
1866 .expect("daemon should pass reload sender to gateway starter")
1867 .send(true)
1868 .expect("send reload signal");
1869 std::future::pending::<Result<()>>().await
1870 })
1871 },
1872 ));
1873
1874 let exit = timeout(
1875 Duration::from_secs(2),
1876 run(config, "127.0.0.1".to_string(), 4242, registry, false),
1877 )
1878 .await
1879 .expect("daemon should return after gateway-triggered reload")
1880 .expect("daemon run should succeed");
1881
1882 assert_eq!(exit, DaemonExit::Reload);
1883 let (host, port, data_dir, has_event_tx, has_reload_tx, has_tui_registry) = seen_rx
1884 .try_recv()
1885 .expect("gateway starter should record its daemon inputs");
1886 assert_eq!(host, "127.0.0.1");
1887 assert_eq!(port, 4242);
1888 assert_eq!(data_dir, expected_data_dir);
1889 assert!(has_event_tx);
1890 assert!(has_reload_tx);
1891 assert!(has_tui_registry);
1892 }
1893
1894 #[tokio::test]
1895 async fn ephemeral_does_not_exit_before_client_connects() {
1896 use tokio::time::{Duration, timeout};
1897
1898 let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1899 let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1900 let handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, true, count));
1901
1902 let result = timeout(Duration::from_millis(500), handle).await;
1904 assert!(
1905 result.is_err(),
1906 "ephemeral daemon should not exit before any client connects"
1907 );
1908 }
1909
1910 #[tokio::test]
1911 async fn ephemeral_exits_after_client_disconnects() {
1912 use std::sync::atomic::Ordering;
1913 use tokio::time::{Duration, timeout};
1914
1915 let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1916 let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1917 let count2 = count.clone();
1918 let handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, true, count2));
1919
1920 count.store(1, Ordering::Relaxed);
1922 tokio::time::sleep(Duration::from_millis(100)).await;
1923 count.store(0, Ordering::Relaxed);
1924
1925 let result = timeout(Duration::from_secs(EPHEMERAL_GRACE_SECS + 5), handle)
1927 .await
1928 .expect("ephemeral daemon should shut down after last client disconnects")
1929 .expect("task should not panic")
1930 .expect("signal handler should not error");
1931 assert_eq!(result, DaemonExit::Shutdown);
1932 }
1933
1934 #[tokio::test]
1935 async fn ephemeral_grace_period_resets_on_reconnect() {
1936 use std::sync::atomic::Ordering;
1937 use tokio::time::{Duration, timeout};
1938
1939 let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1940 let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1941 let count2 = count.clone();
1942 let mut handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, true, count2));
1943
1944 count.store(1, Ordering::Relaxed);
1946 tokio::time::sleep(Duration::from_millis(100)).await;
1947 count.store(0, Ordering::Relaxed);
1948
1949 tokio::time::sleep(Duration::from_millis(200)).await;
1953 count.store(1, Ordering::Relaxed);
1954
1955 let result = timeout(Duration::from_millis(500), &mut handle).await;
1957 assert!(
1958 result.is_err(),
1959 "ephemeral daemon should not exit while client is connected"
1960 );
1961
1962 count.store(0, Ordering::Relaxed);
1964 let result = timeout(Duration::from_secs(EPHEMERAL_GRACE_SECS + 5), handle)
1965 .await
1966 .expect("ephemeral daemon should shut down after second disconnect")
1967 .expect("task should not panic")
1968 .expect("signal handler should not error");
1969 assert_eq!(result, DaemonExit::Shutdown);
1970 }
1971}