Skip to main content

zeroclaw_runtime/daemon/
mod.rs

1use anyhow::Result;
2use chrono::Utc;
3use std::path::PathBuf;
4use tokio::task::JoinHandle;
5use tokio::time::Duration;
6use zeroclaw_config::schema::Config;
7use zeroclaw_memory::{MEMORY_CONTEXT_CLOSE, MEMORY_CONTEXT_OPEN};
8
9mod registry;
10pub use registry::DaemonRegistry;
11
12const STATUS_FLUSH_SECONDS: u64 = 5;
13
14/// Why the daemon's main loop returned.
15///
16/// `Shutdown`: process exits cleanly. `Reload`: caller (typically `src/main.rs`)
17/// re-reads the config from disk and calls `daemon::run` again. The PID stays
18/// the same; only the in-process subsystems get torn down and re-instantiated.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum DaemonExit {
21    Shutdown,
22    Reload,
23}
24
25/// Wait for either a shutdown signal (SIGINT / SIGTERM / Ctrl+C) or an
26/// in-process reload signal (the gateway's `/admin/reload` writes `true`
27/// on the watch channel). Returns the reason so the outer loop can decide
28/// whether to re-init or exit. SIGHUP is ignored on Unix so the daemon
29/// survives terminal / SSH disconnects.
30///
31/// The reload trigger is a tokio watch channel (not an OS signal) so it
32/// works identically on Linux, macOS, and Windows. The Sender is owned by
33/// the daemon (created in `run`) and cloned to the gateway for AppState.
34/// Default grace period (seconds) before ephemeral shutdown after last client disconnects.
35const EPHEMERAL_GRACE_SECS: u64 = 1;
36
37async fn wait_for_exit_signal(
38    mut reload_rx: tokio::sync::watch::Receiver<bool>,
39    ephemeral: bool,
40    client_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
41) -> Result<DaemonExit> {
42    use std::sync::atomic::Ordering;
43
44    // Future that resolves when ephemeral shutdown is triggered:
45    // waits for at least one client to connect, then for all clients to
46    // disconnect, then sleeps the grace period. Pending forever if not
47    // ephemeral.
48    let ephemeral_shutdown = async {
49        if !ephemeral {
50            return std::future::pending::<()>().await;
51        }
52        // Wait until at least one client has connected.
53        loop {
54            if client_count.load(Ordering::Relaxed) > 0 {
55                break;
56            }
57            tokio::time::sleep(Duration::from_secs(1)).await;
58        }
59        // Wait until all clients disconnect.
60        loop {
61            if client_count.load(Ordering::Relaxed) == 0 {
62                break;
63            }
64            tokio::time::sleep(Duration::from_secs(1)).await;
65        }
66        ::zeroclaw_log::record!(
67            INFO,
68            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
69                .with_attrs(::serde_json::json!({"grace_secs": EPHEMERAL_GRACE_SECS})),
70            "All socket clients disconnected; starting ephemeral grace period"
71        );
72        // Grace period — if a client reconnects, abort.
73        for _ in 0..EPHEMERAL_GRACE_SECS {
74            tokio::time::sleep(Duration::from_secs(1)).await;
75            if client_count.load(Ordering::Relaxed) > 0 {
76                // Client reconnected — restart the whole wait.
77                return Box::pin(wait_for_ephemeral(client_count.clone())).await;
78            }
79        }
80    };
81    tokio::pin!(ephemeral_shutdown);
82
83    #[cfg(unix)]
84    {
85        use tokio::signal::unix::{SignalKind, signal};
86
87        let mut sigint = signal(SignalKind::interrupt())?;
88        let mut sigterm = signal(SignalKind::terminate())?;
89        let mut sighup = signal(SignalKind::hangup())?;
90
91        loop {
92            tokio::select! {
93                _ = sigint.recv() => {
94                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGINT, shutting down...");
95                    return Ok(DaemonExit::Shutdown);
96                }
97                _ = sigterm.recv() => {
98                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGTERM, shutting down...");
99                    return Ok(DaemonExit::Shutdown);
100                }
101                _ = sighup.recv() => {
102                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGHUP, ignoring (daemon stays running)");
103                }
104                changed = reload_rx.changed() => {
105                    if changed.is_err() {
106                        ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "Reload sender dropped; shutting down");
107                        return Ok(DaemonExit::Shutdown);
108                    }
109                    if *reload_rx.borrow_and_update() {
110                        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Reload requested via /admin/reload");
111                        return Ok(DaemonExit::Reload);
112                    }
113                }
114                _ = &mut ephemeral_shutdown => {
115                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Ephemeral daemon: no clients remaining, shutting down");
116                    return Ok(DaemonExit::Shutdown);
117                }
118            }
119        }
120    }
121
122    #[cfg(not(unix))]
123    {
124        loop {
125            tokio::select! {
126                res = tokio::signal::ctrl_c() => {
127                    res?;
128                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received Ctrl+C, shutting down...");
129                    return Ok(DaemonExit::Shutdown);
130                }
131                changed = reload_rx.changed() => {
132                    if changed.is_err() {
133                        ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "Reload sender dropped; shutting down");
134                        return Ok(DaemonExit::Shutdown);
135                    }
136                    if *reload_rx.borrow_and_update() {
137                        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Reload requested via /admin/reload");
138                        return Ok(DaemonExit::Reload);
139                    }
140                }
141                _ = &mut ephemeral_shutdown => {
142                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Ephemeral daemon: no clients remaining, shutting down");
143                    return Ok(DaemonExit::Shutdown);
144                }
145            }
146        }
147    }
148}
149
150/// Recursive helper: wait for clients to connect then all disconnect, with grace period.
151async fn wait_for_ephemeral(client_count: std::sync::Arc<std::sync::atomic::AtomicUsize>) {
152    use std::sync::atomic::Ordering;
153    // Wait until all clients disconnect again.
154    loop {
155        if client_count.load(Ordering::Relaxed) == 0 {
156            break;
157        }
158        tokio::time::sleep(Duration::from_secs(1)).await;
159    }
160    ::zeroclaw_log::record!(
161        INFO,
162        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
163            .with_attrs(::serde_json::json!({"grace_secs": EPHEMERAL_GRACE_SECS})),
164        "All socket clients disconnected; starting ephemeral grace period"
165    );
166    for _ in 0..EPHEMERAL_GRACE_SECS {
167        tokio::time::sleep(Duration::from_secs(1)).await;
168        if client_count.load(Ordering::Relaxed) > 0 {
169            return Box::pin(wait_for_ephemeral(client_count)).await;
170        }
171    }
172}
173
174pub async fn run(
175    config: Config,
176    host: String,
177    port: u16,
178    mut registry: DaemonRegistry,
179    ephemeral: bool,
180) -> Result<DaemonExit> {
181    let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1);
182    let max_backoff = config
183        .reliability
184        .channel_max_backoff_secs
185        .max(initial_backoff);
186
187    crate::health::mark_component_ok("daemon");
188
189    // Shared broadcast channel so all daemon components (gateway, cron,
190    // heartbeat) can publish real-time events to dashboard clients.
191    let (event_tx, _rx) = tokio::sync::broadcast::channel::<serde_json::Value>(256);
192
193    // Wire the log broadcast hook so every record!() emission reaches the
194    // RPC logs/subscribe stream. Without this, tool calls and agent events
195    // logged via record!() are invisible to the zerocode Logs pane when
196    // connected over the Unix socket (the gateway wires this separately for
197    // its own event_tx; the daemon's RPC event_tx must be wired here).
198    zeroclaw_log::set_broadcast_hook(event_tx.clone());
199
200    if config.heartbeat.enabled {
201        let _ = crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.data_dir)
202            .await;
203    }
204
205    let mut handles: Vec<JoinHandle<()>> = vec![spawn_state_writer(config.clone())];
206
207    // Reload channel: gateway's /admin/reload writes here; our wait loop
208    // (below) selects on it alongside OS signals. Cross-platform.
209    let (reload_tx, reload_rx) = tokio::sync::watch::channel::<bool>(false);
210
211    // Construct the TUI registry early so both the gateway (for /api/tuis)
212    // and the RPC socket (for tui/list) share the same Arc.
213    let tui_registry =
214        std::sync::Arc::new(crate::rpc::tui_identity::TuiRegistry::new(&config.data_dir));
215
216    if let Some(gateway_start) = registry.take_gateway_start() {
217        let gateway_cfg = config.clone();
218        let gateway_host = host.clone();
219        let gateway_event_tx = event_tx.clone();
220        let gateway_reload_tx = reload_tx.clone();
221        let gateway_tui_registry = tui_registry.clone();
222        let gateway_start = std::sync::Arc::new(gateway_start);
223        handles.push(spawn_component_supervisor(
224            "gateway",
225            initial_backoff,
226            max_backoff,
227            move || {
228                let cfg = gateway_cfg.clone();
229                let host = gateway_host.clone();
230                let tx = gateway_event_tx.clone();
231                let reload = gateway_reload_tx.clone();
232                let tui_reg = gateway_tui_registry.clone();
233                let start = gateway_start.clone();
234                async move { start(host, port, cfg, Some(tx), Some(reload), Some(tui_reg)).await }
235            },
236        ));
237    }
238
239    let channels_cancel = tokio_util::sync::CancellationToken::new();
240
241    if let Some(channels_start) = registry.take_channels_start() {
242        if has_supervised_channels(&config) {
243            let channels_cfg = config.clone();
244            let channels_start = std::sync::Arc::new(channels_start);
245            let cancel_for_supervisor = channels_cancel.clone();
246            handles.push(spawn_component_supervisor(
247                "channels",
248                initial_backoff,
249                max_backoff,
250                move || {
251                    let cfg = channels_cfg.clone();
252                    let start = channels_start.clone();
253                    let cancel = cancel_for_supervisor.clone();
254                    async move { start(cfg, cancel).await }
255                },
256            ));
257        } else {
258            crate::health::mark_component_ok("channels");
259            ::zeroclaw_log::record!(
260                INFO,
261                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
262                "No channels configured; channel supervisor disabled"
263            );
264        }
265    } else {
266        crate::health::mark_component_ok("channels");
267        ::zeroclaw_log::record!(
268            INFO,
269            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
270            "Channels subsystem not wired; channel supervisor disabled"
271        );
272    }
273
274    // RPC transports: Unix socket (#6837) and WSS (remote TUI connections).
275    // Build the shared RpcContext if either transport is configured.
276    let socket_client_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
277    let need_rpc_ctx = registry.has_socket_start() || registry.has_wss_start();
278
279    let rpc_ctx = if need_rpc_ctx {
280        use crate::rpc::context::RpcContext;
281        use crate::rpc::session::SessionStore;
282        use zeroclaw_infra::session_queue::SessionActorQueue;
283
284        let session_queue = std::sync::Arc::new(SessionActorQueue::new(32, 30, 600));
285        let sessions = std::sync::Arc::new(SessionStore::new(64, session_queue.clone()));
286
287        {
288            let reaper_queue = std::sync::Arc::clone(&session_queue);
289            zeroclaw_spawn::spawn!(async move {
290                const TICK: std::time::Duration = std::time::Duration::from_secs(60);
291                let mut interval = tokio::time::interval(TICK);
292                interval.tick().await;
293                loop {
294                    interval.tick().await;
295                    let queue_evicted = reaper_queue.evict_idle().await;
296                    if queue_evicted > 0 {
297                        let span = ::zeroclaw_log::info_span!(
298                            target: "zeroclaw_log_internal_scope",
299                            "zeroclaw_scope",
300                            channel = "rpc",
301                        );
302                        let _guard = span.enter();
303                        ::zeroclaw_log::record!(
304                            INFO,
305                            ::zeroclaw_log::Event::new(
306                                module_path!(),
307                                ::zeroclaw_log::Action::Note,
308                            )
309                            .with_category(::zeroclaw_log::EventCategory::Agent)
310                            .with_attrs(::serde_json::json!({
311                                "evicted_queue_slots": queue_evicted,
312                            })),
313                            "Session queue: released idle actor-queue slots"
314                        );
315                        crate::util::release_freed_heap();
316                    }
317                }
318            });
319        }
320        let session_backend = zeroclaw_infra::make_session_backend(
321            &config.data_dir,
322            &config.channels.session_backend,
323        )
324        .ok();
325
326        // Wire the memory subsystem so `memory/list` and `memory/search`
327        // work over RPC transports (same pattern as the gateway).
328        let rpc_memory: Option<std::sync::Arc<dyn zeroclaw_api::memory_traits::Memory>> = if config
329            .agents
330            .is_empty()
331        {
332            None
333        } else {
334            match zeroclaw_memory::create_memory_with_storage_and_routes(
335                &config.memory,
336                &config.embedding_routes,
337                config.resolve_active_storage(),
338                &config.data_dir,
339                None,
340            ) {
341                Ok(mem) => Some(std::sync::Arc::from(mem)),
342                Err(_e) => {
343                    ::zeroclaw_log::record!(
344                        WARN,
345                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
346                        "RPC memory subsystem unavailable"
347                    );
348                    None
349                }
350            }
351        };
352
353        // Open the ACP session DB at boot so the file exists from the
354        // moment the daemon is up, not when (if ever) `zeroclaw acp`
355        // runs. Best-effort: on failure, log and continue with `None`.
356        let acp_session_store: Option<
357            std::sync::Arc<zeroclaw_infra::acp_session_store::AcpSessionStore>,
358        > = match zeroclaw_infra::acp_session_store::AcpSessionStore::new(&config.data_dir) {
359            Ok(s) => Some(std::sync::Arc::new(s)),
360            Err(e) => {
361                ::zeroclaw_log::record!(
362                    WARN,
363                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
364                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
365                        .with_attrs(::serde_json::json!({"error": e.to_string()})),
366                    "Failed to open ACP session store at daemon boot"
367                );
368                None
369            }
370        };
371
372        Some(std::sync::Arc::new(RpcContext {
373            config: std::sync::Arc::new(parking_lot::RwLock::new(config.clone())),
374            sessions,
375            session_backend,
376            memory: rpc_memory,
377            cost_tracker: None, // TODO: wire when cost tracker is daemon-scoped
378            event_tx: Some(event_tx.clone()),
379            reload_tx: Some(reload_tx.clone()),
380            approval_pending: std::sync::Arc::new(
381                crate::rpc::context::ApprovalPendingMap::default(),
382            ),
383            tui_registry,
384            acp_session_store,
385        }))
386    } else {
387        None
388    };
389
390    // Local IPC RPC listener (Unix socket on Unix, Named Pipe on Windows).
391    if let Some(socket_start) = registry.take_socket_start() {
392        let rpc_ctx = rpc_ctx
393            .clone()
394            .expect("rpc_ctx built when socket_start is Some");
395        let socket_start = std::sync::Arc::new(socket_start);
396        let socket_cancel = channels_cancel.clone();
397        let count = socket_client_count.clone();
398        handles.push(spawn_component_supervisor(
399            "socket",
400            initial_backoff,
401            max_backoff,
402            move || {
403                let ctx = rpc_ctx.clone();
404                let start = socket_start.clone();
405                let cancel = socket_cancel.clone();
406                let count = count.clone();
407                async move { start(ctx, cancel, count).await }
408            },
409        ));
410    }
411
412    // WSS RPC listener (remote TUI connections).
413    if let Some(wss_start) = registry.take_wss_start() {
414        let rpc_ctx = rpc_ctx
415            .clone()
416            .expect("rpc_ctx built when wss_start is Some");
417        let wss_start = std::sync::Arc::new(wss_start);
418        let wss_cancel = channels_cancel.clone();
419        let count = socket_client_count.clone();
420        handles.push(spawn_component_supervisor(
421            "wss",
422            initial_backoff,
423            max_backoff,
424            move || {
425                let ctx = rpc_ctx.clone();
426                let start = wss_start.clone();
427                let cancel = wss_cancel.clone();
428                let count = count.clone();
429                async move { start(ctx, cancel, count).await }
430            },
431        ));
432    }
433
434    // Wire up MQTT SOP listener if configured and referenced by an enabled agent
435    if let Some(mqtt_start) = registry.take_mqtt_start() {
436        let active_mqtt: std::collections::HashSet<String> = config
437            .agents
438            .values()
439            .filter(|a| a.enabled)
440            .flat_map(|a| a.channels.iter().map(|c| c.as_str().to_string()))
441            .collect();
442        let mut mqtt_started = false;
443        for (alias, mqtt_config) in &config.channels.mqtt {
444            if !active_mqtt.contains(&format!("mqtt.{alias}")) {
445                continue;
446            }
447            let mqtt_cfg = mqtt_config.clone();
448            let mqtt_start = std::sync::Arc::new(mqtt_start);
449            handles.push(spawn_component_supervisor(
450                "mqtt",
451                initial_backoff,
452                max_backoff,
453                move || {
454                    let cfg = mqtt_cfg.clone();
455                    let start = mqtt_start.clone();
456                    async move { start(cfg).await }
457                },
458            ));
459            mqtt_started = true;
460            break;
461        }
462        if !mqtt_started {
463            crate::health::mark_component_ok("mqtt");
464        }
465    } else {
466        crate::health::mark_component_ok("mqtt");
467    }
468
469    if config.heartbeat.enabled {
470        let heartbeat_cfg = config.clone();
471        handles.push(spawn_component_supervisor(
472            "heartbeat",
473            initial_backoff,
474            max_backoff,
475            move || {
476                let cfg = heartbeat_cfg.clone();
477                async move { Box::pin(run_heartbeat_worker(cfg)).await }
478            },
479        ));
480    }
481
482    if config.scheduler.enabled {
483        let scheduler_cfg = config.clone();
484        let scheduler_event_tx = event_tx.clone();
485        handles.push(spawn_component_supervisor(
486            "scheduler",
487            initial_backoff,
488            max_backoff,
489            move || {
490                let cfg = scheduler_cfg.clone();
491                let tx = scheduler_event_tx.clone();
492                async move { Box::pin(crate::cron::scheduler::run(cfg, Some(tx))).await }
493            },
494        ));
495    } else {
496        crate::health::mark_component_ok("scheduler");
497        ::zeroclaw_log::record!(
498            INFO,
499            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
500            "Cron disabled; scheduler supervisor not started"
501        );
502    }
503
504    println!("🧠 ZeroClaw daemon started");
505    println!("   Gateway:  http://{host}:{port}");
506    println!(
507        "   Socket:   {}",
508        crate::rpc::local::socket_path(&config).display()
509    );
510    println!("   Components: gateway, channels, heartbeat, scheduler");
511    if config.gateway.require_pairing {
512        println!("   Pairing:    enabled (code appears in gateway output above)");
513    }
514    println!("   Ctrl+C or SIGTERM to stop");
515
516    // Wait for shutdown (SIGINT/SIGTERM/Ctrl+C) or reload (in-process channel).
517    let exit = wait_for_exit_signal(reload_rx, ephemeral, socket_client_count).await?;
518    crate::health::mark_component_error(
519        "daemon",
520        match exit {
521            DaemonExit::Shutdown => "shutdown requested",
522            DaemonExit::Reload => "reload requested",
523        },
524    );
525
526    // Fire channel cancellation before aborting supervisors so listener tasks
527    // get a chance to drop their `Arc<dyn Channel>` (and the matrix-sdk SQLite
528    // pools the Arc transitively pins).
529    channels_cancel.cancel();
530    for handle in &handles {
531        handle.abort();
532    }
533    for handle in handles {
534        let _ = handle.await;
535    }
536
537    #[cfg(all(target_os = "linux", target_env = "gnu"))]
538    unsafe {
539        libc::malloc_trim(0);
540    }
541
542    Ok(exit)
543}
544
545pub fn state_file_path(config: &Config) -> PathBuf {
546    config
547        .config_path
548        .parent()
549        .map_or_else(|| PathBuf::from("."), PathBuf::from)
550        .join("state")
551        .join("daemon_state.json")
552}
553
554fn spawn_state_writer(config: Config) -> JoinHandle<()> {
555    zeroclaw_spawn::spawn!(async move {
556        let path = state_file_path(&config);
557        if let Some(parent) = path.parent() {
558            let _ = tokio::fs::create_dir_all(parent).await;
559        }
560
561        let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS));
562        loop {
563            interval.tick().await;
564            let mut json = crate::health::snapshot_json();
565            if let Some(obj) = json.as_object_mut() {
566                obj.insert(
567                    "written_at".into(),
568                    serde_json::json!(Utc::now().to_rfc3339()),
569                );
570            }
571            let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec());
572            let _ = tokio::fs::write(&path, data).await;
573        }
574    })
575}
576
577fn spawn_component_supervisor<F, Fut>(
578    name: &'static str,
579    initial_backoff_secs: u64,
580    max_backoff_secs: u64,
581    mut run_component: F,
582) -> JoinHandle<()>
583where
584    F: FnMut() -> Fut + Send + 'static,
585    Fut: Future<Output = Result<()>> + Send + 'static,
586{
587    zeroclaw_spawn::spawn!(async move {
588        let mut backoff = initial_backoff_secs.max(1);
589        let max_backoff = max_backoff_secs.max(backoff);
590
591        loop {
592            crate::health::mark_component_ok(name);
593            match run_component().await {
594                Ok(()) => {
595                    crate::health::mark_component_error(name, "component exited unexpectedly");
596                    ::zeroclaw_log::record!(
597                        WARN,
598                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
599                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
600                            .with_attrs(::serde_json::json!({"name": name})),
601                        &format!("Daemon component '{name}' exited unexpectedly")
602                    );
603                    // Clean exit — reset backoff since the component ran successfully
604                    backoff = initial_backoff_secs.max(1);
605                }
606                Err(e) => {
607                    crate::health::mark_component_error(name, e.to_string());
608                    ::zeroclaw_log::record!(
609                        ERROR,
610                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
611                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
612                            .with_attrs(
613                                ::serde_json::json!({"error": format!("{}", e), "name": name})
614                            ),
615                        &format!("Daemon component '{name}' failed: {e}")
616                    );
617                }
618            }
619
620            crate::health::bump_component_restart(name);
621            tokio::time::sleep(Duration::from_secs(backoff)).await;
622            // Double backoff AFTER sleeping so first error uses initial_backoff
623            backoff = backoff.saturating_mul(2).min(max_backoff);
624        }
625    })
626}
627
628async fn run_heartbeat_worker(config: Config) -> Result<()> {
629    use crate::heartbeat::engine::{
630        HeartbeatEngine, HeartbeatTask, TaskPriority, TaskStatus, compute_adaptive_interval,
631    };
632    use std::sync::Arc;
633
634    let agent_alias = config.heartbeat.agent.trim().to_string();
635    if agent_alias.is_empty() {
636        anyhow::bail!(
637            "heartbeat worker requires `[heartbeat] agent = \"<alias>\"` naming a configured agent"
638        );
639    }
640    if config.agent(&agent_alias).is_none() {
641        anyhow::bail!(
642            "[heartbeat] agent = {agent_alias:?} is not configured ([agents.{agent_alias}] missing)"
643        );
644    }
645
646    let observer: std::sync::Arc<dyn crate::observability::Observer> =
647        std::sync::Arc::from(crate::observability::create_observer(&config.observability));
648    let engine = HeartbeatEngine::new(config.heartbeat.clone(), config.data_dir.clone(), observer);
649    let metrics = engine.metrics();
650    let delivery = resolve_heartbeat_delivery(&config)?;
651    let two_phase = config.heartbeat.two_phase;
652    let adaptive = config.heartbeat.adaptive;
653    let start_time = std::time::Instant::now();
654
655    // ── Deadman watcher ──────────────────────────────────────────
656    let deadman_timeout = config.heartbeat.deadman_timeout_minutes;
657    if deadman_timeout > 0 {
658        let dm_metrics = Arc::clone(&metrics);
659        let dm_config = config.clone();
660        let dm_delivery = delivery.clone();
661        zeroclaw_spawn::spawn!(async move {
662            let check_interval = Duration::from_secs(60);
663            let timeout = chrono::Duration::minutes(i64::from(deadman_timeout));
664            loop {
665                tokio::time::sleep(check_interval).await;
666                let last_tick = dm_metrics.lock().last_tick_at;
667                if let Some(last) = last_tick
668                    && chrono::Utc::now() - last > timeout
669                {
670                    let alert = format!(
671                        "⚠️ Heartbeat dead-man's switch: no tick in {deadman_timeout} minutes"
672                    );
673                    let (channel, target) = if let Some(ch) = &dm_config.heartbeat.deadman_channel {
674                        let to = dm_config
675                            .heartbeat
676                            .deadman_to
677                            .as_deref()
678                            .or(dm_config.heartbeat.to.as_deref())
679                            .unwrap_or_default();
680                        (ch.clone(), to.to_string())
681                    } else if let Some((ch, to)) = &dm_delivery {
682                        (ch.clone(), to.clone())
683                    } else {
684                        continue;
685                    };
686                    let delivery_fut = crate::cron::scheduler::deliver_announcement(
687                        &dm_config, &channel, &target, None, &alert,
688                    );
689                    match tokio::time::timeout(Duration::from_secs(30), delivery_fut).await {
690                        Ok(Err(e)) => {
691                            ::zeroclaw_log::record!(
692                                WARN,
693                                ::zeroclaw_log::Event::new(
694                                    module_path!(),
695                                    ::zeroclaw_log::Action::Note
696                                )
697                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
698                                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
699                                "Deadman alert delivery failed"
700                            );
701                        }
702                        Err(_) => {
703                            ::zeroclaw_log::record!(
704                                WARN,
705                                ::zeroclaw_log::Event::new(
706                                    module_path!(),
707                                    ::zeroclaw_log::Action::Note
708                                )
709                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
710                                "Deadman alert delivery timed out (30s)"
711                            );
712                        }
713                        Ok(Ok(())) => {}
714                    }
715                }
716            }
717        });
718    }
719
720    let base_interval = config.heartbeat.interval_minutes.max(1);
721    let mut sleep_mins = base_interval;
722
723    loop {
724        tokio::time::sleep(Duration::from_secs(u64::from(sleep_mins) * 60)).await;
725
726        // Update uptime
727        {
728            let mut m = metrics.lock();
729            m.uptime_secs = start_time.elapsed().as_secs();
730        }
731
732        let tick_start = std::time::Instant::now();
733
734        // Collect runnable tasks (active only, sorted by priority)
735        let mut tasks = engine.collect_runnable_tasks().await?;
736        let has_high_priority = tasks.iter().any(|t| t.priority == TaskPriority::High);
737
738        if tasks.is_empty() {
739            if let Some(fallback) = config
740                .heartbeat
741                .message
742                .as_deref()
743                .map(str::trim)
744                .filter(|m| !m.is_empty())
745            {
746                tasks.push(HeartbeatTask {
747                    text: fallback.to_string(),
748                    priority: TaskPriority::Medium,
749                    status: TaskStatus::Active,
750                });
751            } else {
752                #[allow(clippy::cast_precision_loss)]
753                let elapsed = tick_start.elapsed().as_millis() as f64;
754                metrics.lock().record_success(elapsed);
755                continue;
756            }
757        }
758
759        // ── Phase 1: LLM decision (two-phase mode) ──────────────
760        let tasks_to_run = if two_phase {
761            let decision_prompt = format!(
762                "[Heartbeat Task | decision] {}",
763                HeartbeatEngine::build_decision_prompt(&tasks),
764            );
765            let phase1_fut = Box::pin(crate::agent::run(
766                config.clone(),
767                &agent_alias,
768                Some(decision_prompt),
769                None,
770                None,
771                Some(0.0),
772                vec![],
773                false,
774                None,
775                None,
776                crate::agent::loop_::AgentRunOverrides::default(),
777            ));
778            let phase1_result = if config.heartbeat.task_timeout_secs > 0 {
779                match tokio::time::timeout(
780                    Duration::from_secs(config.heartbeat.task_timeout_secs),
781                    phase1_fut,
782                )
783                .await
784                {
785                    Ok(r) => r,
786                    Err(_) => {
787                        ::zeroclaw_log::record!(
788                            WARN,
789                            ::zeroclaw_log::Event::new(
790                                module_path!(),
791                                ::zeroclaw_log::Action::Timeout
792                            )
793                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
794                            .with_attrs(::serde_json::json!({
795                                "phase": "phase1_decision",
796                                "timeout_secs": config.heartbeat.task_timeout_secs,
797                            })),
798                            "heartbeat: phase1 decision timed out"
799                        );
800                        Err(anyhow::Error::msg(format!(
801                            "Phase 1 decision timed out ({}s)",
802                            config.heartbeat.task_timeout_secs
803                        )))
804                    }
805                }
806            } else {
807                phase1_fut.await
808            };
809            match phase1_result {
810                Ok(response) => {
811                    let indices = HeartbeatEngine::parse_decision_response(&response, tasks.len());
812                    if indices.is_empty() {
813                        ::zeroclaw_log::record!(
814                            INFO,
815                            ::zeroclaw_log::Event::new(
816                                module_path!(),
817                                ::zeroclaw_log::Action::Note
818                            ),
819                            "heartbeat phase 1: skip (nothing to do)"
820                        );
821                        crate::health::mark_component_ok("heartbeat");
822                        #[allow(clippy::cast_precision_loss)]
823                        let elapsed = tick_start.elapsed().as_millis() as f64;
824                        metrics.lock().record_success(elapsed);
825                        continue;
826                    }
827                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"selected": indices.len(), "total": tasks.len()})), "heartbeat phase 1: running task subset");
828                    indices
829                        .into_iter()
830                        .filter_map(|i| tasks.get(i).cloned())
831                        .collect()
832                }
833                Err(e) => {
834                    ::zeroclaw_log::record!(
835                        WARN,
836                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
837                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
838                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
839                        "heartbeat phase 1 failed; running all tasks"
840                    );
841                    tasks
842                }
843            }
844        } else {
845            tasks
846        };
847
848        // ── Phase 2: Execute selected tasks ─────────────────────
849        // Re-read session context on every tick so we pick up messages
850        // that arrived since the daemon started.
851        let session_context = if config.heartbeat.load_session_context {
852            load_heartbeat_session_context(&config)
853        } else {
854            None
855        };
856
857        // Create memory once per tick for recall + consolidation.
858        let heartbeat_memory: Option<Box<dyn zeroclaw_memory::Memory>> =
859            zeroclaw_memory::create_memory(
860                &config.memory,
861                &config.data_dir,
862                config
863                    .model_provider_for_agent(&agent_alias)
864                    .and_then(|e| e.api_key.as_deref()),
865            )
866            .ok();
867
868        let mut tick_had_error = false;
869        for task in &tasks_to_run {
870            let task_start = std::time::Instant::now();
871            let task_prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text);
872
873            // Recall relevant memories so heartbeat tasks have context awareness.
874            // Exclude `Conversation` memories to prevent chat context from
875            // leaking into scheduled executions.
876            let memory_context = if let Some(ref mem) = heartbeat_memory {
877                match mem.recall(&task.text, 5, None, None, None).await {
878                    Ok(entries) if !entries.is_empty() => {
879                        let ctx: String = entries
880                            .iter()
881                            .filter(|e| {
882                                !matches!(
883                                    e.category,
884                                    zeroclaw_memory::traits::MemoryCategory::Conversation
885                                )
886                            })
887                            .map(|e| format!("- {}: {}", e.key, e.content))
888                            .collect::<Vec<_>>()
889                            .join("\n");
890                        if ctx.is_empty() {
891                            None
892                        } else {
893                            Some(format!(
894                                "{MEMORY_CONTEXT_OPEN}\n{ctx}\n{MEMORY_CONTEXT_CLOSE}\n\n"
895                            ))
896                        }
897                    }
898                    _ => None,
899                }
900            } else {
901                None
902            };
903
904            let prompt = match (&session_context, &memory_context) {
905                (Some(sc), Some(mc)) => format!("{mc}\n{sc}\n\n{task_prompt}"),
906                (Some(sc), None) => format!("{sc}\n\n{task_prompt}"),
907                (None, Some(mc)) => format!("{mc}\n\n{task_prompt}"),
908                (None, None) => task_prompt,
909            };
910            let temp: Option<f64> = config
911                .model_provider_for_agent(&agent_alias)
912                .and_then(|e| e.temperature);
913            let phase2_fut = Box::pin(crate::agent::run(
914                config.clone(),
915                &agent_alias,
916                Some(prompt),
917                None,
918                None,
919                temp,
920                vec![],
921                false,
922                None,
923                None,
924                crate::agent::loop_::AgentRunOverrides::default(),
925            ));
926            let phase2_result = if config.heartbeat.task_timeout_secs > 0 {
927                match tokio::time::timeout(
928                    Duration::from_secs(config.heartbeat.task_timeout_secs),
929                    phase2_fut,
930                )
931                .await
932                {
933                    Ok(r) => r,
934                    Err(_) => {
935                        ::zeroclaw_log::record!(
936                            WARN,
937                            ::zeroclaw_log::Event::new(
938                                module_path!(),
939                                ::zeroclaw_log::Action::Timeout
940                            )
941                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
942                            .with_attrs(::serde_json::json!({
943                                "phase": "phase2_heartbeat",
944                                "timeout_secs": config.heartbeat.task_timeout_secs,
945                            })),
946                            "heartbeat task timed out"
947                        );
948                        Err(anyhow::Error::msg(format!(
949                            "Heartbeat task timed out ({}s)",
950                            config.heartbeat.task_timeout_secs
951                        )))
952                    }
953                }
954            } else {
955                phase2_fut.await
956            };
957            match phase2_result {
958                Ok(output) => {
959                    crate::health::mark_component_ok("heartbeat");
960                    #[allow(clippy::cast_possible_truncation)]
961                    let duration_ms = task_start.elapsed().as_millis() as i64;
962                    let now = chrono::Utc::now();
963                    let _ = crate::heartbeat::store::record_run(
964                        &config.data_dir,
965                        &task.text,
966                        &task.priority.to_string(),
967                        now - chrono::Duration::milliseconds(duration_ms),
968                        now,
969                        "ok",
970                        Some(output.as_str()),
971                        duration_ms,
972                        config.heartbeat.max_run_history,
973                    );
974                    // Consolidate heartbeat output to memory for cross-session awareness.
975                    if config.memory.auto_save
976                        && output.chars().count() >= 50
977                        && let Some(ref mem) = heartbeat_memory
978                    {
979                        let key = format!("heartbeat_{}", uuid::Uuid::new_v4());
980                        let summary = if output.len() > 500 {
981                            // Find a valid UTF-8 char boundary at or before 500.
982                            let mut end = 500;
983                            while end > 0 && !output.is_char_boundary(end) {
984                                end -= 1;
985                            }
986                            &output[..end]
987                        } else {
988                            &output
989                        };
990                        let _ = mem
991                            .store(
992                                &key,
993                                &format!("Heartbeat task '{}': {}", task.text, summary),
994                                zeroclaw_memory::MemoryCategory::Daily,
995                                None,
996                            )
997                            .await;
998                    }
999
1000                    let announcement = if output.trim().is_empty() {
1001                        format!("💓 heartbeat task completed: {}", task.text)
1002                    } else {
1003                        output
1004                    };
1005                    if let Some((channel, target)) = &delivery {
1006                        let delivery_result = tokio::time::timeout(
1007                            Duration::from_secs(30),
1008                            crate::cron::scheduler::deliver_announcement(
1009                                &config,
1010                                channel,
1011                                target,
1012                                None,
1013                                &announcement,
1014                            ),
1015                        )
1016                        .await;
1017                        match delivery_result {
1018                            Ok(Err(e)) => {
1019                                crate::health::mark_component_error(
1020                                    "heartbeat",
1021                                    format!("delivery failed: {e}"),
1022                                );
1023                                ::zeroclaw_log::record!(
1024                                    WARN,
1025                                    ::zeroclaw_log::Event::new(
1026                                        module_path!(),
1027                                        ::zeroclaw_log::Action::Note
1028                                    )
1029                                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1030                                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1031                                    "Heartbeat delivery failed"
1032                                );
1033                            }
1034                            Err(_) => {
1035                                crate::health::mark_component_error(
1036                                    "heartbeat",
1037                                    "delivery timed out (30s)".to_string(),
1038                                );
1039                                ::zeroclaw_log::record!(
1040                                    WARN,
1041                                    ::zeroclaw_log::Event::new(
1042                                        module_path!(),
1043                                        ::zeroclaw_log::Action::Note
1044                                    )
1045                                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1046                                    "Heartbeat delivery timed out (30s)"
1047                                );
1048                            }
1049                            Ok(Ok(())) => {}
1050                        }
1051                    }
1052                }
1053                Err(e) => {
1054                    tick_had_error = true;
1055                    #[allow(clippy::cast_possible_truncation)]
1056                    let duration_ms = task_start.elapsed().as_millis() as i64;
1057                    let now = chrono::Utc::now();
1058                    let _ = crate::heartbeat::store::record_run(
1059                        &config.data_dir,
1060                        &task.text,
1061                        &task.priority.to_string(),
1062                        now - chrono::Duration::milliseconds(duration_ms),
1063                        now,
1064                        "error",
1065                        Some(&e.to_string()),
1066                        duration_ms,
1067                        config.heartbeat.max_run_history,
1068                    );
1069                    crate::health::mark_component_error("heartbeat", e.to_string());
1070                    ::zeroclaw_log::record!(
1071                        WARN,
1072                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1073                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
1074                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
1075                        "Heartbeat task failed"
1076                    );
1077                }
1078            }
1079        }
1080
1081        // Update metrics
1082        #[allow(clippy::cast_precision_loss)]
1083        let tick_elapsed = tick_start.elapsed().as_millis() as f64;
1084        {
1085            let mut m = metrics.lock();
1086            if tick_had_error {
1087                m.record_failure(tick_elapsed);
1088            } else {
1089                m.record_success(tick_elapsed);
1090            }
1091        }
1092
1093        // Compute next sleep interval
1094        if adaptive {
1095            let failures = metrics.lock().consecutive_failures;
1096            sleep_mins = compute_adaptive_interval(
1097                base_interval,
1098                config.heartbeat.min_interval_minutes,
1099                config.heartbeat.max_interval_minutes,
1100                failures,
1101                has_high_priority,
1102            );
1103        } else {
1104            sleep_mins = base_interval;
1105        }
1106    }
1107}
1108
1109/// Resolve delivery target: explicit config > auto-detect first configured channel.
1110fn resolve_heartbeat_delivery(config: &Config) -> Result<Option<(String, String)>> {
1111    let channel = config
1112        .heartbeat
1113        .target
1114        .as_deref()
1115        .map(str::trim)
1116        .filter(|value| !value.is_empty());
1117    let target = config
1118        .heartbeat
1119        .to
1120        .as_deref()
1121        .map(str::trim)
1122        .filter(|value| !value.is_empty());
1123
1124    match (channel, target) {
1125        // Both explicitly set — validate and use.
1126        (Some(channel), Some(target)) => {
1127            validate_heartbeat_channel_config(config, channel)?;
1128            Ok(Some((channel.to_string(), target.to_string())))
1129        }
1130        // Only one set — error.
1131        (Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
1132        (None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
1133        // Neither set — try auto-detect the first configured channel.
1134        (None, None) => Ok(auto_detect_heartbeat_channel(config)),
1135    }
1136}
1137
1138/// Load recent conversation history for the heartbeat's delivery target and
1139/// format it as a text preamble to inject into the task prompt.
1140///
1141/// Scans `{workspace}/sessions/` for JSONL files whose name starts with
1142/// `{channel}_` and ends with `_{to}.jsonl` (or exactly `{channel}_{to}.jsonl`),
1143/// then picks the most recently modified match. This handles session key
1144/// formats such as `telegram_diskiller.jsonl` and
1145/// `telegram_5673725398_diskiller.jsonl`.
1146/// Returns `None` when `target`/`to` are not configured or no session exists.
1147const HEARTBEAT_SESSION_CONTEXT_MESSAGES: usize = 20;
1148
1149fn load_heartbeat_session_context(config: &Config) -> Option<String> {
1150    use zeroclaw_providers::traits::ChatMessage;
1151
1152    let channel = config
1153        .heartbeat
1154        .target
1155        .as_deref()
1156        .map(str::trim)
1157        .filter(|v| !v.is_empty())?;
1158    let to = config
1159        .heartbeat
1160        .to
1161        .as_deref()
1162        .map(str::trim)
1163        .filter(|v| !v.is_empty())?;
1164
1165    if channel.contains('/') || channel.contains('\\') || to.contains('/') || to.contains('\\') {
1166        ::zeroclaw_log::record!(
1167            WARN,
1168            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1169                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
1170            "heartbeat session context: channel/to contains path separators, skipping"
1171        );
1172        return None;
1173    }
1174
1175    let sessions_dir = config.data_dir.join("sessions");
1176
1177    // Find the most recently modified JSONL file that belongs to this target.
1178    // Matches both `{channel}_{to}.jsonl` and `{channel}_{anything}_{to}.jsonl`.
1179    let prefix = format!("{channel}_");
1180    let suffix = format!("_{to}.jsonl");
1181    let exact = format!("{channel}_{to}.jsonl");
1182    let mid_prefix = format!("{channel}_{to}_");
1183
1184    let path = std::fs::read_dir(&sessions_dir)
1185        .ok()?
1186        .filter_map(|e| e.ok())
1187        .filter(|e| {
1188            let name = e.file_name();
1189            let name = name.to_string_lossy();
1190            name.ends_with(".jsonl")
1191                && (name == exact
1192                    || (name.starts_with(&prefix) && name.ends_with(&suffix))
1193                    || name.starts_with(&mid_prefix))
1194        })
1195        .max_by_key(|e| {
1196            e.metadata()
1197                .and_then(|m| m.modified())
1198                .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
1199        })
1200        .map(|e| e.path())?;
1201
1202    if !path.exists() {
1203        ::zeroclaw_log::record!(
1204            DEBUG,
1205            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
1206                .with_attrs(::serde_json::json!({"channel": channel, "to": to})),
1207            "heartbeat session context: no session file found"
1208        );
1209        return None;
1210    }
1211
1212    let messages = load_jsonl_messages(&path);
1213    if messages.is_empty() {
1214        return None;
1215    }
1216
1217    let recent: Vec<&ChatMessage> = messages
1218        .iter()
1219        .filter(|m| m.role == "user" || m.role == "assistant")
1220        .rev()
1221        .take(HEARTBEAT_SESSION_CONTEXT_MESSAGES)
1222        .collect::<Vec<_>>()
1223        .into_iter()
1224        .rev()
1225        .collect();
1226
1227    // Only inject context if there is at least one real user message in the
1228    // window. If the JSONL contains only assistant messages (e.g. previous
1229    // heartbeat outputs with no reply yet), skip context to avoid feeding
1230    // Monika's own messages back to her in a loop.
1231    let has_user_message = recent.iter().any(|m| m.role == "user");
1232    if !has_user_message {
1233        ::zeroclaw_log::record!(
1234            DEBUG,
1235            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1236            "💓 Heartbeat session context: no user messages in recent history — skipping"
1237        );
1238        return None;
1239    }
1240
1241    // Use the session file's mtime as a proxy for when the last message arrived.
1242    let last_message_age = std::fs::metadata(&path)
1243        .ok()
1244        .and_then(|m| m.modified().ok())
1245        .and_then(|mtime| mtime.elapsed().ok());
1246
1247    let silence_note = match last_message_age {
1248        Some(age) => {
1249            let mins = age.as_secs() / 60;
1250            if mins < 60 {
1251                format!("(last message ~{mins} minutes ago)\n")
1252            } else {
1253                let hours = mins / 60;
1254                let rem = mins % 60;
1255                if rem == 0 {
1256                    format!("(last message ~{hours}h ago)\n")
1257                } else {
1258                    format!("(last message ~{hours}h {rem}m ago)\n")
1259                }
1260            }
1261        }
1262        None => String::new(),
1263    };
1264
1265    ::zeroclaw_log::record!(
1266        DEBUG,
1267        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1268        &format!(
1269            "💓 Heartbeat session context: {} messages from {}, silence: {}",
1270            recent.len(),
1271            path.display().to_string(),
1272            silence_note.trim()
1273        )
1274    );
1275
1276    let mut ctx = format!(
1277        "[Recent conversation history — use this for context when composing your message] {silence_note}",
1278    );
1279    for msg in &recent {
1280        let label = if msg.role == "user" { "User" } else { "You" };
1281        // Truncate very long messages to avoid bloating the prompt.
1282        // Use char_indices to avoid panicking on multi-byte UTF-8 characters.
1283        let content = if msg.content.len() > 500 {
1284            let truncate_at = msg
1285                .content
1286                .char_indices()
1287                .map(|(i, _)| i)
1288                .take_while(|&i| i <= 500)
1289                .last()
1290                .unwrap_or(0);
1291            format!("{}…", &msg.content[..truncate_at])
1292        } else {
1293            msg.content.clone()
1294        };
1295        ctx.push_str(label);
1296        ctx.push_str(": ");
1297        ctx.push_str(&content);
1298        ctx.push('\n');
1299    }
1300
1301    Some(ctx)
1302}
1303
1304/// Read the last `HEARTBEAT_SESSION_CONTEXT_MESSAGES` `ChatMessage` lines from
1305/// a JSONL session file using a bounded rolling window so we never hold the
1306/// entire file in memory.
1307fn load_jsonl_messages(path: &std::path::Path) -> Vec<zeroclaw_providers::traits::ChatMessage> {
1308    use std::collections::VecDeque;
1309    use std::io::BufRead;
1310
1311    let file = match std::fs::File::open(path) {
1312        Ok(f) => f,
1313        Err(_) => return Vec::new(),
1314    };
1315    let reader = std::io::BufReader::new(file);
1316    let mut window: VecDeque<zeroclaw_providers::traits::ChatMessage> =
1317        VecDeque::with_capacity(HEARTBEAT_SESSION_CONTEXT_MESSAGES + 1);
1318    for line in reader.lines() {
1319        let Ok(line) = line else { continue };
1320        let trimmed = line.trim();
1321        if trimmed.is_empty() {
1322            continue;
1323        }
1324        if let Ok(msg) = serde_json::from_str::<zeroclaw_providers::traits::ChatMessage>(trimmed) {
1325            window.push_back(msg);
1326            if window.len() > HEARTBEAT_SESSION_CONTEXT_MESSAGES {
1327                window.pop_front();
1328            }
1329        }
1330    }
1331    window.into_iter().collect()
1332}
1333
1334/// Auto-detect the best channel for heartbeat delivery by checking which
1335/// channels are configured. Returns the first match in priority order.
1336fn auto_detect_heartbeat_channel(config: &Config) -> Option<(String, String)> {
1337    // Priority order: telegram > discord > slack > mattermost
1338    // Find the first external peer authorized on a telegram channel
1339    // (peer authorization lives in peer_groups in V3, not on the
1340    // channel block).
1341    if !config.channels.telegram.is_empty() {
1342        for alias in config.channels.telegram.keys() {
1343            let peers = config.channel_external_peers("telegram", alias);
1344            if let Some(target) = peers.into_iter().next() {
1345                return Some(("telegram".to_string(), target));
1346            }
1347        }
1348    }
1349    if !config.channels.discord.is_empty() {
1350        // Discord requires explicit target — can't auto-detect
1351        return None;
1352    }
1353    if !config.channels.slack.is_empty() {
1354        // Slack requires explicit target
1355        return None;
1356    }
1357    if !config.channels.mattermost.is_empty() {
1358        // Mattermost requires explicit target
1359        return None;
1360    }
1361    None
1362}
1363
1364fn validate_heartbeat_channel_config(config: &Config, channel: &str) -> Result<()> {
1365    match channel.to_ascii_lowercase().as_str() {
1366        "telegram" => {
1367            if config.channels.telegram.is_empty() {
1368                anyhow::bail!(
1369                    "heartbeat.target is set to telegram but channels.telegram is not configured"
1370                );
1371            }
1372        }
1373        "discord" => {
1374            if config.channels.discord.is_empty() {
1375                anyhow::bail!(
1376                    "heartbeat.target is set to discord but channels.discord is not configured"
1377                );
1378            }
1379        }
1380        "slack" => {
1381            if config.channels.slack.is_empty() {
1382                anyhow::bail!(
1383                    "heartbeat.target is set to slack but channels.slack is not configured"
1384                );
1385            }
1386        }
1387        "mattermost" => {
1388            if config.channels.mattermost.is_empty() {
1389                anyhow::bail!(
1390                    "heartbeat.target is set to mattermost but channels.mattermost is not configured"
1391                );
1392            }
1393        }
1394        other => anyhow::bail!("unsupported heartbeat.target channel: {other}"),
1395    }
1396
1397    Ok(())
1398}
1399
1400fn has_supervised_channels(config: &Config) -> bool {
1401    // Check that at least one channel entry has `enabled = true`.
1402    // A config with only `enabled = false` entries (e.g. partially-configured
1403    // or intentionally disabled bots) must not start the supervisor — the
1404    // channels component would find nothing to listen on, return Ok(()), and
1405    // the daemon supervisor would restart it in a tight loop.
1406    config.channels.has_any_enabled()
1407}
1408
1409// run_mqtt_sop_listener has been moved to zeroclaw-channels::orchestrator::mqtt.
1410// The daemon now receives it as a starter via DaemonRegistry::register_mqtt.
1411
1412#[cfg(test)]
1413mod tests {
1414    use super::*;
1415    use tempfile::TempDir;
1416
1417    fn test_config(tmp: &TempDir) -> Config {
1418        let config = Config {
1419            data_dir: tmp.path().join("data"),
1420            config_path: tmp.path().join("config.toml"),
1421            ..Config::default()
1422        };
1423        std::fs::create_dir_all(&config.data_dir).unwrap();
1424        config
1425    }
1426
1427    #[test]
1428    fn state_file_path_uses_config_state_directory() {
1429        let tmp = TempDir::new().unwrap();
1430        let config = test_config(&tmp);
1431
1432        let path = state_file_path(&config);
1433        assert_eq!(path, tmp.path().join("state").join("daemon_state.json"));
1434    }
1435
1436    #[tokio::test]
1437    async fn supervisor_marks_error_and_restart_on_failure() {
1438        let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async {
1439            anyhow::bail!("boom")
1440        });
1441
1442        tokio::time::sleep(Duration::from_millis(50)).await;
1443        handle.abort();
1444        let _ = handle.await;
1445
1446        let snapshot = crate::health::snapshot_json();
1447        let component = &snapshot["components"]["daemon-test-fail"];
1448        assert_eq!(component["status"], "error");
1449        assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
1450        assert!(
1451            component["last_error"]
1452                .as_str()
1453                .unwrap_or("")
1454                .contains("boom")
1455        );
1456    }
1457
1458    #[tokio::test]
1459    async fn supervisor_marks_unexpected_exit_as_error() {
1460        let handle = spawn_component_supervisor("daemon-test-exit", 1, 1, || async { Ok(()) });
1461
1462        tokio::time::sleep(Duration::from_millis(50)).await;
1463        handle.abort();
1464        let _ = handle.await;
1465
1466        let snapshot = crate::health::snapshot_json();
1467        let component = &snapshot["components"]["daemon-test-exit"];
1468        assert_eq!(component["status"], "error");
1469        assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
1470        assert!(
1471            component["last_error"]
1472                .as_str()
1473                .unwrap_or("")
1474                .contains("component exited unexpectedly")
1475        );
1476    }
1477
1478    #[test]
1479    fn detects_no_supervised_channels() {
1480        let config = Config::default();
1481        assert!(!has_supervised_channels(&config));
1482    }
1483
1484    #[test]
1485    fn all_disabled_channels_not_supervised() {
1486        // Regression test: a config with channel entries that all have
1487        // `enabled = false` must not start the channels supervisor.
1488        // Previously, has_supervised_channels only checked map non-emptiness,
1489        // causing the supervisor to start, find nothing to listen on, return
1490        // Ok(()), and restart in a tight loop.
1491        let mut config = Config::default();
1492        config.channels.discord.insert(
1493            "clamps".to_string(),
1494            zeroclaw_config::schema::DiscordConfig {
1495                enabled: false,
1496                bot_token: "token".into(),
1497                guild_ids: vec![],
1498                channel_ids: vec![],
1499                listen_to_bots: false,
1500                mention_only: true,
1501                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1502                draft_update_interval_ms: 0,
1503                multi_message_delay_ms: 0,
1504                stall_timeout_secs: 0,
1505                interrupt_on_new_message: false,
1506                archive: false,
1507                approval_timeout_secs: 0,
1508                proxy_url: None,
1509                excluded_tools: vec![],
1510                reply_min_interval_secs: 0,
1511                reply_queue_depth_max: 0,
1512            },
1513        );
1514        config.channels.discord.insert(
1515            "glados".to_string(),
1516            zeroclaw_config::schema::DiscordConfig {
1517                enabled: false,
1518                bot_token: "token2".into(),
1519                guild_ids: vec![],
1520                channel_ids: vec![],
1521                listen_to_bots: false,
1522                mention_only: true,
1523                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1524                draft_update_interval_ms: 0,
1525                multi_message_delay_ms: 0,
1526                stall_timeout_secs: 0,
1527                interrupt_on_new_message: false,
1528                archive: false,
1529                approval_timeout_secs: 0,
1530                proxy_url: None,
1531                excluded_tools: vec![],
1532                reply_min_interval_secs: 0,
1533                reply_queue_depth_max: 0,
1534            },
1535        );
1536        assert!(!has_supervised_channels(&config));
1537    }
1538
1539    #[test]
1540    fn detects_supervised_channels_present() {
1541        let mut config = Config::default();
1542        config.channels.telegram.insert(
1543            "default".to_string(),
1544            zeroclaw_config::schema::TelegramConfig {
1545                enabled: true,
1546                bot_token: "token".into(),
1547                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1548                draft_update_interval_ms: 1000,
1549                interrupt_on_new_message: false,
1550                mention_only: false,
1551                ack_reactions: None,
1552                proxy_url: None,
1553                approval_timeout_secs: 120,
1554                excluded_tools: vec![],
1555                reply_min_interval_secs: 0,
1556                reply_queue_depth_max: 0,
1557            },
1558        );
1559        assert!(has_supervised_channels(&config));
1560    }
1561
1562    #[test]
1563    fn detects_dingtalk_as_supervised_channel() {
1564        let mut config = Config::default();
1565        config.channels.dingtalk.insert(
1566            "default".to_string(),
1567            zeroclaw_config::schema::DingTalkConfig {
1568                enabled: true,
1569                client_id: "client_id".into(),
1570                client_secret: "client_secret".into(),
1571                proxy_url: None,
1572                excluded_tools: vec![],
1573            },
1574        );
1575        assert!(has_supervised_channels(&config));
1576    }
1577
1578    #[test]
1579    fn detects_mattermost_as_supervised_channel() {
1580        let mut config = Config::default();
1581        config.channels.mattermost.insert(
1582            "default".to_string(),
1583            zeroclaw_config::schema::MattermostConfig {
1584                enabled: true,
1585                url: "https://mattermost.example.com".into(),
1586                bot_token: Some("token".into()),
1587                login_id: None,
1588                password: None,
1589                channel_ids: vec!["channel-id".into()],
1590                team_ids: vec![],
1591                discover_dms: None,
1592                thread_replies: Some(true),
1593                mention_only: Some(false),
1594                interrupt_on_new_message: false,
1595                proxy_url: None,
1596                excluded_tools: vec![],
1597                reply_min_interval_secs: 0,
1598                reply_queue_depth_max: 0,
1599            },
1600        );
1601        assert!(has_supervised_channels(&config));
1602    }
1603
1604    #[test]
1605    fn detects_qq_as_supervised_channel() {
1606        let mut config = Config::default();
1607        config.channels.qq.insert(
1608            "default".to_string(),
1609            zeroclaw_config::schema::QQConfig {
1610                enabled: true,
1611                app_id: "app-id".into(),
1612                app_secret: "app-secret".into(),
1613                proxy_url: None,
1614                excluded_tools: vec![],
1615            },
1616        );
1617        assert!(has_supervised_channels(&config));
1618    }
1619
1620    #[test]
1621    fn detects_nextcloud_talk_as_supervised_channel() {
1622        let mut config = Config::default();
1623        config.channels.nextcloud_talk.insert(
1624            "default".to_string(),
1625            zeroclaw_config::schema::NextcloudTalkConfig {
1626                enabled: true,
1627                base_url: "https://cloud.example.com".into(),
1628                app_token: "app-token".into(),
1629                webhook_secret: None,
1630                proxy_url: None,
1631                bot_name: None,
1632                excluded_tools: vec![],
1633                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1634                draft_update_interval_ms: 1000,
1635            },
1636        );
1637        assert!(has_supervised_channels(&config));
1638    }
1639
1640    #[test]
1641    fn webhook_only_config_is_supervised() {
1642        let mut config = Config::default();
1643        config.channels.webhook.insert(
1644            "default".to_string(),
1645            zeroclaw_config::schema::WebhookConfig {
1646                enabled: true,
1647                port: 8080,
1648                listen_path: None,
1649                send_url: None,
1650                send_method: None,
1651                auth_header: None,
1652                secret: None,
1653                excluded_tools: vec![],
1654                reply_min_interval_secs: 0,
1655                reply_queue_depth_max: 0,
1656                max_retries: None,
1657                retry_base_delay_ms: None,
1658                retry_max_delay_ms: None,
1659            },
1660        );
1661        assert!(has_supervised_channels(&config));
1662    }
1663
1664    #[test]
1665    fn resolve_delivery_none_when_unset() {
1666        let config = Config::default();
1667        let target = resolve_heartbeat_delivery(&config).unwrap();
1668        assert!(target.is_none());
1669    }
1670
1671    #[test]
1672    fn resolve_delivery_requires_to_field() {
1673        let mut config = Config::default();
1674        config.heartbeat.target = Some("telegram".into());
1675        let err = resolve_heartbeat_delivery(&config).unwrap_err();
1676        assert!(
1677            err.to_string()
1678                .contains("heartbeat.to is required when heartbeat.target is set")
1679        );
1680    }
1681
1682    #[test]
1683    fn resolve_delivery_requires_target_field() {
1684        let mut config = Config::default();
1685        config.heartbeat.to = Some("123456".into());
1686        let err = resolve_heartbeat_delivery(&config).unwrap_err();
1687        assert!(
1688            err.to_string()
1689                .contains("heartbeat.target is required when heartbeat.to is set")
1690        );
1691    }
1692
1693    #[test]
1694    fn resolve_delivery_rejects_unsupported_channel() {
1695        let mut config = Config::default();
1696        config.heartbeat.target = Some("email".into());
1697        config.heartbeat.to = Some("ops@example.com".into());
1698        let err = resolve_heartbeat_delivery(&config).unwrap_err();
1699        assert!(
1700            err.to_string()
1701                .contains("unsupported heartbeat.target channel")
1702        );
1703    }
1704
1705    #[test]
1706    fn resolve_delivery_requires_channel_configuration() {
1707        let mut config = Config::default();
1708        config.heartbeat.target = Some("telegram".into());
1709        config.heartbeat.to = Some("123456".into());
1710        let err = resolve_heartbeat_delivery(&config).unwrap_err();
1711        assert!(
1712            err.to_string()
1713                .contains("channels.telegram is not configured")
1714        );
1715    }
1716
1717    #[test]
1718    fn resolve_delivery_accepts_telegram_configuration() {
1719        let mut config = Config::default();
1720        config.heartbeat.target = Some("telegram".into());
1721        config.heartbeat.to = Some("123456".into());
1722        config.channels.telegram.insert(
1723            "default".to_string(),
1724            zeroclaw_config::schema::TelegramConfig {
1725                enabled: true,
1726                bot_token: "bot-token".into(),
1727                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1728                draft_update_interval_ms: 1000,
1729                interrupt_on_new_message: false,
1730                mention_only: false,
1731                ack_reactions: None,
1732                proxy_url: None,
1733                approval_timeout_secs: 120,
1734                excluded_tools: vec![],
1735                reply_min_interval_secs: 0,
1736                reply_queue_depth_max: 0,
1737            },
1738        );
1739
1740        let target = resolve_heartbeat_delivery(&config).unwrap();
1741        assert_eq!(target, Some(("telegram".to_string(), "123456".to_string())));
1742    }
1743
1744    #[test]
1745    fn auto_detect_telegram_when_configured() {
1746        use zeroclaw_config::multi_agent::{PeerGroupConfig, PeerUsername};
1747
1748        let mut config = Config::default();
1749        config.channels.telegram.insert(
1750            "default".to_string(),
1751            zeroclaw_config::schema::TelegramConfig {
1752                enabled: true,
1753                bot_token: "bot-token".into(),
1754                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1755                draft_update_interval_ms: 1000,
1756                interrupt_on_new_message: false,
1757                mention_only: false,
1758                ack_reactions: None,
1759                proxy_url: None,
1760                approval_timeout_secs: 120,
1761                excluded_tools: vec![],
1762                reply_min_interval_secs: 0,
1763                reply_queue_depth_max: 0,
1764            },
1765        );
1766        // Inbound peer authorization lives in peer_groups in V3.
1767        // Auto-detect picks the first external peer of the synthesized
1768        // `telegram_default` group as the heartbeat target.
1769        config.peer_groups.insert(
1770            "telegram_default".to_string(),
1771            PeerGroupConfig {
1772                channel: "telegram".to_string(),
1773                external_peers: vec![PeerUsername::new("user123")],
1774                ..PeerGroupConfig::default()
1775            },
1776        );
1777
1778        let target = resolve_heartbeat_delivery(&config).unwrap();
1779        assert_eq!(
1780            target,
1781            Some(("telegram".to_string(), "user123".to_string()))
1782        );
1783    }
1784
1785    #[test]
1786    fn auto_detect_none_when_no_channels() {
1787        let config = Config::default();
1788        let target = auto_detect_heartbeat_channel(&config);
1789        assert!(target.is_none());
1790    }
1791
1792    /// Verify that SIGHUP does not cause shutdown — the daemon should ignore it
1793    /// and only terminate on SIGINT or SIGTERM.
1794    #[cfg(unix)]
1795    #[tokio::test]
1796    async fn sighup_does_not_shut_down_daemon() {
1797        use libc;
1798        use tokio::time::{Duration, timeout};
1799
1800        let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1801        let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1802        let handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, false, count));
1803
1804        // Give the signal handler time to register
1805        tokio::time::sleep(Duration::from_millis(50)).await;
1806
1807        // Send SIGHUP to ourselves — should be ignored by the handler
1808        unsafe { libc::raise(libc::SIGHUP) };
1809
1810        // The future should NOT complete within a short window
1811        let result = timeout(Duration::from_millis(200), handle).await;
1812        assert!(
1813            result.is_err(),
1814            "wait_for_exit_signal should not return after SIGHUP"
1815        );
1816    }
1817
1818    /// In-process reload channel returns DaemonExit::Reload so the outer
1819    /// loop can re-init. Cross-platform — works on Linux, macOS, Windows.
1820    #[tokio::test]
1821    async fn reload_channel_returns_reload() {
1822        use tokio::time::{Duration, timeout};
1823
1824        let (reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1825        let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1826        let handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, false, count));
1827        tokio::time::sleep(Duration::from_millis(50)).await;
1828        reload_tx.send(true).expect("send reload");
1829
1830        let result = timeout(Duration::from_secs(2), handle)
1831            .await
1832            .expect("wait_for_exit_signal should return after reload signal")
1833            .expect("task should not panic")
1834            .expect("signal handler should not error");
1835        assert_eq!(result, DaemonExit::Reload);
1836    }
1837
1838    #[tokio::test]
1839    async fn registry_gateway_starter_can_trigger_daemon_reload() {
1840        use tokio::time::{Duration, timeout};
1841
1842        let tmp = TempDir::new().unwrap();
1843        let config = test_config(&tmp);
1844        let expected_data_dir = config.data_dir.clone();
1845        let (seen_tx, mut seen_rx) = tokio::sync::mpsc::unbounded_channel();
1846
1847        let mut registry = DaemonRegistry::new();
1848        registry.register_gateway(Box::new(
1849            move |host, port, config, event_tx, reload_tx, tui_registry| {
1850                let seen_tx = seen_tx.clone();
1851                Box::pin(async move {
1852                    let has_event_tx = event_tx.is_some();
1853                    let has_reload_tx = reload_tx.is_some();
1854                    let has_tui_registry = tui_registry.is_some();
1855                    seen_tx
1856                        .send((
1857                            host,
1858                            port,
1859                            config.data_dir.clone(),
1860                            has_event_tx,
1861                            has_reload_tx,
1862                            has_tui_registry,
1863                        ))
1864                        .expect("record gateway starter inputs");
1865                    reload_tx
1866                        .expect("daemon should pass reload sender to gateway starter")
1867                        .send(true)
1868                        .expect("send reload signal");
1869                    std::future::pending::<Result<()>>().await
1870                })
1871            },
1872        ));
1873
1874        let exit = timeout(
1875            Duration::from_secs(2),
1876            run(config, "127.0.0.1".to_string(), 4242, registry, false),
1877        )
1878        .await
1879        .expect("daemon should return after gateway-triggered reload")
1880        .expect("daemon run should succeed");
1881
1882        assert_eq!(exit, DaemonExit::Reload);
1883        let (host, port, data_dir, has_event_tx, has_reload_tx, has_tui_registry) = seen_rx
1884            .try_recv()
1885            .expect("gateway starter should record its daemon inputs");
1886        assert_eq!(host, "127.0.0.1");
1887        assert_eq!(port, 4242);
1888        assert_eq!(data_dir, expected_data_dir);
1889        assert!(has_event_tx);
1890        assert!(has_reload_tx);
1891        assert!(has_tui_registry);
1892    }
1893
1894    #[tokio::test]
1895    async fn ephemeral_does_not_exit_before_client_connects() {
1896        use tokio::time::{Duration, timeout};
1897
1898        let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1899        let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1900        let handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, true, count));
1901
1902        // No clients ever connect — should NOT shut down.
1903        let result = timeout(Duration::from_millis(500), handle).await;
1904        assert!(
1905            result.is_err(),
1906            "ephemeral daemon should not exit before any client connects"
1907        );
1908    }
1909
1910    #[tokio::test]
1911    async fn ephemeral_exits_after_client_disconnects() {
1912        use std::sync::atomic::Ordering;
1913        use tokio::time::{Duration, timeout};
1914
1915        let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1916        let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1917        let count2 = count.clone();
1918        let handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, true, count2));
1919
1920        // Simulate client connect then disconnect.
1921        count.store(1, Ordering::Relaxed);
1922        tokio::time::sleep(Duration::from_millis(100)).await;
1923        count.store(0, Ordering::Relaxed);
1924
1925        // Should exit within grace period + buffer.
1926        let result = timeout(Duration::from_secs(EPHEMERAL_GRACE_SECS + 5), handle)
1927            .await
1928            .expect("ephemeral daemon should shut down after last client disconnects")
1929            .expect("task should not panic")
1930            .expect("signal handler should not error");
1931        assert_eq!(result, DaemonExit::Shutdown);
1932    }
1933
1934    #[tokio::test]
1935    async fn ephemeral_grace_period_resets_on_reconnect() {
1936        use std::sync::atomic::Ordering;
1937        use tokio::time::{Duration, timeout};
1938
1939        let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1940        let count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1941        let count2 = count.clone();
1942        let mut handle = zeroclaw_spawn::spawn!(wait_for_exit_signal(reload_rx, true, count2));
1943
1944        // Client connects, disconnects.
1945        count.store(1, Ordering::Relaxed);
1946        tokio::time::sleep(Duration::from_millis(100)).await;
1947        count.store(0, Ordering::Relaxed);
1948
1949        // Reconnect partway through the grace period — must be strictly
1950        // less than EPHEMERAL_GRACE_SECS so the daemon hasn't already
1951        // exited. With the 1s grace window we sleep ~200ms.
1952        tokio::time::sleep(Duration::from_millis(200)).await;
1953        count.store(1, Ordering::Relaxed);
1954
1955        // Should NOT shut down while client is connected.
1956        let result = timeout(Duration::from_millis(500), &mut handle).await;
1957        assert!(
1958            result.is_err(),
1959            "ephemeral daemon should not exit while client is connected"
1960        );
1961
1962        // Disconnect again — should eventually shut down.
1963        count.store(0, Ordering::Relaxed);
1964        let result = timeout(Duration::from_secs(EPHEMERAL_GRACE_SECS + 5), handle)
1965            .await
1966            .expect("ephemeral daemon should shut down after second disconnect")
1967            .expect("task should not panic")
1968            .expect("signal handler should not error");
1969        assert_eq!(result, DaemonExit::Shutdown);
1970    }
1971}