Skip to main content

zeroclaw_runtime/daemon/
mod.rs

1use anyhow::Result;
2use chrono::Utc;
3use std::future::Future;
4use std::path::PathBuf;
5use tokio::task::JoinHandle;
6use tokio::time::Duration;
7use zeroclaw_config::schema::Config;
8use zeroclaw_memory::{MEMORY_CONTEXT_CLOSE, MEMORY_CONTEXT_OPEN};
9
10const STATUS_FLUSH_SECONDS: u64 = 5;
11
12/// Why the daemon's main loop returned.
13///
14/// `Shutdown`: process exits cleanly. `Reload`: caller (typically `src/main.rs`)
15/// re-reads the config from disk and calls `daemon::run` again. The PID stays
16/// the same; only the in-process subsystems get torn down and re-instantiated.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum DaemonExit {
19    Shutdown,
20    Reload,
21}
22
23/// Wait for either a shutdown signal (SIGINT / SIGTERM / Ctrl+C) or an
24/// in-process reload signal (the gateway's `/admin/reload` writes `true`
25/// on the watch channel). Returns the reason so the outer loop can decide
26/// whether to re-init or exit. SIGHUP is ignored on Unix so the daemon
27/// survives terminal / SSH disconnects.
28///
29/// The reload trigger is a tokio watch channel (not an OS signal) so it
30/// works identically on Linux, macOS, and Windows. The Sender is owned by
31/// the daemon (created in `run`) and cloned to the gateway for AppState.
32async fn wait_for_exit_signal(
33    mut reload_rx: tokio::sync::watch::Receiver<bool>,
34) -> Result<DaemonExit> {
35    #[cfg(unix)]
36    {
37        use tokio::signal::unix::{SignalKind, signal};
38
39        let mut sigint = signal(SignalKind::interrupt())?;
40        let mut sigterm = signal(SignalKind::terminate())?;
41        let mut sighup = signal(SignalKind::hangup())?;
42
43        loop {
44            tokio::select! {
45                _ = sigint.recv() => {
46                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGINT, shutting down...");
47                    return Ok(DaemonExit::Shutdown);
48                }
49                _ = sigterm.recv() => {
50                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGTERM, shutting down...");
51                    return Ok(DaemonExit::Shutdown);
52                }
53                _ = sighup.recv() => {
54                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received SIGHUP, ignoring (daemon stays running)");
55                }
56                changed = reload_rx.changed() => {
57                    if changed.is_err() {
58                        // Sender dropped — treat as shutdown (shouldn't
59                        // happen in normal operation; the gateway holds a
60                        // clone for the lifetime of the daemon).
61                        ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "Reload sender dropped; shutting down");
62                        return Ok(DaemonExit::Shutdown);
63                    }
64                    if *reload_rx.borrow_and_update() {
65                        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Reload requested via /admin/reload");
66                        return Ok(DaemonExit::Reload);
67                    }
68                }
69            }
70        }
71    }
72
73    #[cfg(not(unix))]
74    {
75        loop {
76            tokio::select! {
77                res = tokio::signal::ctrl_c() => {
78                    res?;
79                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Received Ctrl+C, shutting down...");
80                    return Ok(DaemonExit::Shutdown);
81                }
82                changed = reload_rx.changed() => {
83                    if changed.is_err() {
84                        ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown), "Reload sender dropped; shutting down");
85                        return Ok(DaemonExit::Shutdown);
86                    }
87                    if *reload_rx.borrow_and_update() {
88                        ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note), "Reload requested via /admin/reload");
89                        return Ok(DaemonExit::Reload);
90                    }
91                }
92            }
93        }
94    }
95}
96
97/// Optional subsystem start functions injected by the binary crate.
98/// This allows the daemon to spawn subsystems without depending on their crates.
99#[allow(clippy::type_complexity)]
100pub struct DaemonSubsystems {
101    /// Start the gateway HTTP server. Injected by the binary when `gateway` feature is on.
102    /// The fifth argument is the reload sender — the gateway hands it to its
103    /// AppState so /admin/reload can signal the daemon to re-init.
104    pub gateway_start: Option<
105        Box<
106            dyn Fn(
107                    String,
108                    u16,
109                    Config,
110                    Option<tokio::sync::broadcast::Sender<serde_json::Value>>,
111                    Option<tokio::sync::watch::Sender<bool>>,
112                ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send>>
113                + Send
114                + Sync,
115        >,
116    >,
117    /// Start supervised channels. Injected by the binary when channels crate is available.
118    /// The cancellation token is fired on reload so listener tasks drop their channel Arcs
119    /// before the new supervisor starts.
120    pub channels_start: Option<
121        Box<
122            dyn Fn(
123                    Config,
124                    tokio_util::sync::CancellationToken,
125                ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send>>
126                + Send
127                + Sync,
128        >,
129    >,
130    /// Start the MQTT SOP listener. Injected by the binary when channels crate is available.
131    pub mqtt_start: Option<
132        Box<
133            dyn Fn(
134                    zeroclaw_config::schema::MqttConfig,
135                ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send>>
136                + Send
137                + Sync,
138        >,
139    >,
140}
141
142pub async fn run(
143    config: Config,
144    host: String,
145    port: u16,
146    subsystems: DaemonSubsystems,
147) -> Result<DaemonExit> {
148    let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1);
149    let max_backoff = config
150        .reliability
151        .channel_max_backoff_secs
152        .max(initial_backoff);
153
154    crate::health::mark_component_ok("daemon");
155
156    // Shared broadcast channel so all daemon components (gateway, cron,
157    // heartbeat) can publish real-time events to dashboard clients.
158    let (event_tx, _rx) = tokio::sync::broadcast::channel::<serde_json::Value>(256);
159
160    if config.heartbeat.enabled {
161        let _ = crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.data_dir)
162            .await;
163    }
164
165    let mut handles: Vec<JoinHandle<()>> = vec![spawn_state_writer(config.clone())];
166
167    // Reload channel: gateway's /admin/reload writes here; our wait loop
168    // (below) selects on it alongside OS signals. Cross-platform.
169    let (reload_tx, reload_rx) = tokio::sync::watch::channel::<bool>(false);
170
171    if let Some(gateway_start) = subsystems.gateway_start {
172        let gateway_cfg = config.clone();
173        let gateway_host = host.clone();
174        let gateway_event_tx = event_tx.clone();
175        let gateway_reload_tx = reload_tx.clone();
176        let gateway_start = std::sync::Arc::new(gateway_start);
177        handles.push(spawn_component_supervisor(
178            "gateway",
179            initial_backoff,
180            max_backoff,
181            move || {
182                let cfg = gateway_cfg.clone();
183                let host = gateway_host.clone();
184                let tx = gateway_event_tx.clone();
185                let reload = gateway_reload_tx.clone();
186                let start = gateway_start.clone();
187                async move { start(host, port, cfg, Some(tx), Some(reload)).await }
188            },
189        ));
190    }
191
192    let channels_cancel = tokio_util::sync::CancellationToken::new();
193
194    if let Some(channels_start) = subsystems.channels_start {
195        if has_supervised_channels(&config) {
196            let channels_cfg = config.clone();
197            let channels_start = std::sync::Arc::new(channels_start);
198            let cancel_for_supervisor = channels_cancel.clone();
199            handles.push(spawn_component_supervisor(
200                "channels",
201                initial_backoff,
202                max_backoff,
203                move || {
204                    let cfg = channels_cfg.clone();
205                    let start = channels_start.clone();
206                    let cancel = cancel_for_supervisor.clone();
207                    async move { start(cfg, cancel).await }
208                },
209            ));
210        } else {
211            crate::health::mark_component_ok("channels");
212            ::zeroclaw_log::record!(
213                INFO,
214                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
215                "No channels configured; channel supervisor disabled"
216            );
217        }
218    } else {
219        crate::health::mark_component_ok("channels");
220        ::zeroclaw_log::record!(
221            INFO,
222            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
223            "Channels subsystem not wired; channel supervisor disabled"
224        );
225    }
226
227    // Wire up MQTT SOP listener if configured and referenced by an enabled agent
228    if let Some(mqtt_start) = subsystems.mqtt_start {
229        let active_mqtt: std::collections::HashSet<String> = config
230            .agents
231            .values()
232            .filter(|a| a.enabled)
233            .flat_map(|a| a.channels.iter().map(|c| c.as_str().to_string()))
234            .collect();
235        let mut mqtt_started = false;
236        for (alias, mqtt_config) in &config.channels.mqtt {
237            if !active_mqtt.contains(&format!("mqtt.{alias}")) {
238                continue;
239            }
240            let mqtt_cfg = mqtt_config.clone();
241            let mqtt_start = std::sync::Arc::new(mqtt_start);
242            handles.push(spawn_component_supervisor(
243                "mqtt",
244                initial_backoff,
245                max_backoff,
246                move || {
247                    let cfg = mqtt_cfg.clone();
248                    let start = mqtt_start.clone();
249                    async move { start(cfg).await }
250                },
251            ));
252            mqtt_started = true;
253            break;
254        }
255        if !mqtt_started {
256            crate::health::mark_component_ok("mqtt");
257        }
258    } else {
259        crate::health::mark_component_ok("mqtt");
260    }
261
262    if config.heartbeat.enabled {
263        let heartbeat_cfg = config.clone();
264        handles.push(spawn_component_supervisor(
265            "heartbeat",
266            initial_backoff,
267            max_backoff,
268            move || {
269                let cfg = heartbeat_cfg.clone();
270                async move { Box::pin(run_heartbeat_worker(cfg)).await }
271            },
272        ));
273    }
274
275    if config.scheduler.enabled {
276        let scheduler_cfg = config.clone();
277        let scheduler_event_tx = event_tx.clone();
278        handles.push(spawn_component_supervisor(
279            "scheduler",
280            initial_backoff,
281            max_backoff,
282            move || {
283                let cfg = scheduler_cfg.clone();
284                let tx = scheduler_event_tx.clone();
285                async move { Box::pin(crate::cron::scheduler::run(cfg, Some(tx))).await }
286            },
287        ));
288    } else {
289        crate::health::mark_component_ok("scheduler");
290        ::zeroclaw_log::record!(
291            INFO,
292            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
293            "Cron disabled; scheduler supervisor not started"
294        );
295    }
296
297    println!("🧠 ZeroClaw daemon started");
298    println!("   Gateway:  http://{host}:{port}");
299    println!("   Components: gateway, channels, heartbeat, scheduler");
300    if config.gateway.require_pairing {
301        println!("   Pairing:    enabled (code appears in gateway output above)");
302    }
303    println!("   Ctrl+C or SIGTERM to stop");
304
305    // Wait for shutdown (SIGINT/SIGTERM/Ctrl+C) or reload (in-process channel).
306    let exit = wait_for_exit_signal(reload_rx).await?;
307    crate::health::mark_component_error(
308        "daemon",
309        match exit {
310            DaemonExit::Shutdown => "shutdown requested",
311            DaemonExit::Reload => "reload requested",
312        },
313    );
314
315    // Fire channel cancellation before aborting supervisors so listener tasks
316    // get a chance to drop their `Arc<dyn Channel>` (and the matrix-sdk SQLite
317    // pools the Arc transitively pins).
318    channels_cancel.cancel();
319    for handle in &handles {
320        handle.abort();
321    }
322    for handle in handles {
323        let _ = handle.await;
324    }
325
326    #[cfg(all(target_os = "linux", target_env = "gnu"))]
327    unsafe {
328        libc::malloc_trim(0);
329    }
330
331    Ok(exit)
332}
333
334pub fn state_file_path(config: &Config) -> PathBuf {
335    config
336        .config_path
337        .parent()
338        .map_or_else(|| PathBuf::from("."), PathBuf::from)
339        .join("daemon_state.json")
340}
341
342fn spawn_state_writer(config: Config) -> JoinHandle<()> {
343    tokio::spawn(async move {
344        let path = state_file_path(&config);
345        if let Some(parent) = path.parent() {
346            let _ = tokio::fs::create_dir_all(parent).await;
347        }
348
349        let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS));
350        loop {
351            interval.tick().await;
352            let mut json = crate::health::snapshot_json();
353            if let Some(obj) = json.as_object_mut() {
354                obj.insert(
355                    "written_at".into(),
356                    serde_json::json!(Utc::now().to_rfc3339()),
357                );
358            }
359            let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec());
360            let _ = tokio::fs::write(&path, data).await;
361        }
362    })
363}
364
365fn spawn_component_supervisor<F, Fut>(
366    name: &'static str,
367    initial_backoff_secs: u64,
368    max_backoff_secs: u64,
369    mut run_component: F,
370) -> JoinHandle<()>
371where
372    F: FnMut() -> Fut + Send + 'static,
373    Fut: Future<Output = Result<()>> + Send + 'static,
374{
375    tokio::spawn(async move {
376        let mut backoff = initial_backoff_secs.max(1);
377        let max_backoff = max_backoff_secs.max(backoff);
378
379        loop {
380            crate::health::mark_component_ok(name);
381            match run_component().await {
382                Ok(()) => {
383                    crate::health::mark_component_error(name, "component exited unexpectedly");
384                    ::zeroclaw_log::record!(
385                        WARN,
386                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
387                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
388                            .with_attrs(::serde_json::json!({"name": name})),
389                        &format!("Daemon component '{name}' exited unexpectedly")
390                    );
391                    // Clean exit — reset backoff since the component ran successfully
392                    backoff = initial_backoff_secs.max(1);
393                }
394                Err(e) => {
395                    crate::health::mark_component_error(name, e.to_string());
396                    ::zeroclaw_log::record!(
397                        ERROR,
398                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
399                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
400                            .with_attrs(
401                                ::serde_json::json!({"error": format!("{}", e), "name": name})
402                            ),
403                        &format!("Daemon component '{name}' failed: {e}")
404                    );
405                }
406            }
407
408            crate::health::bump_component_restart(name);
409            tokio::time::sleep(Duration::from_secs(backoff)).await;
410            // Double backoff AFTER sleeping so first error uses initial_backoff
411            backoff = backoff.saturating_mul(2).min(max_backoff);
412        }
413    })
414}
415
416async fn run_heartbeat_worker(config: Config) -> Result<()> {
417    use crate::heartbeat::engine::{
418        HeartbeatEngine, HeartbeatTask, TaskPriority, TaskStatus, compute_adaptive_interval,
419    };
420    use std::sync::Arc;
421
422    let agent_alias = config.heartbeat.agent.trim().to_string();
423    if agent_alias.is_empty() {
424        anyhow::bail!(
425            "heartbeat worker requires `[heartbeat] agent = \"<alias>\"` naming a configured agent"
426        );
427    }
428    if config.agent(&agent_alias).is_none() {
429        anyhow::bail!(
430            "[heartbeat] agent = {agent_alias:?} is not configured ([agents.{agent_alias}] missing)"
431        );
432    }
433
434    let observer: std::sync::Arc<dyn crate::observability::Observer> =
435        std::sync::Arc::from(crate::observability::create_observer(&config.observability));
436    let engine = HeartbeatEngine::new(config.heartbeat.clone(), config.data_dir.clone(), observer);
437    let metrics = engine.metrics();
438    let delivery = resolve_heartbeat_delivery(&config)?;
439    let two_phase = config.heartbeat.two_phase;
440    let adaptive = config.heartbeat.adaptive;
441    let start_time = std::time::Instant::now();
442
443    // ── Deadman watcher ──────────────────────────────────────────
444    let deadman_timeout = config.heartbeat.deadman_timeout_minutes;
445    if deadman_timeout > 0 {
446        let dm_metrics = Arc::clone(&metrics);
447        let dm_config = config.clone();
448        let dm_delivery = delivery.clone();
449        tokio::spawn(async move {
450            let check_interval = Duration::from_secs(60);
451            let timeout = chrono::Duration::minutes(i64::from(deadman_timeout));
452            loop {
453                tokio::time::sleep(check_interval).await;
454                let last_tick = dm_metrics.lock().last_tick_at;
455                if let Some(last) = last_tick
456                    && chrono::Utc::now() - last > timeout
457                {
458                    let alert = format!(
459                        "⚠️ Heartbeat dead-man's switch: no tick in {deadman_timeout} minutes"
460                    );
461                    let (channel, target) = if let Some(ch) = &dm_config.heartbeat.deadman_channel {
462                        let to = dm_config
463                            .heartbeat
464                            .deadman_to
465                            .as_deref()
466                            .or(dm_config.heartbeat.to.as_deref())
467                            .unwrap_or_default();
468                        (ch.clone(), to.to_string())
469                    } else if let Some((ch, to)) = &dm_delivery {
470                        (ch.clone(), to.clone())
471                    } else {
472                        continue;
473                    };
474                    let delivery_fut = crate::cron::scheduler::deliver_announcement(
475                        &dm_config, &channel, &target, None, &alert,
476                    );
477                    match tokio::time::timeout(Duration::from_secs(30), delivery_fut).await {
478                        Ok(Err(e)) => {
479                            ::zeroclaw_log::record!(
480                                WARN,
481                                ::zeroclaw_log::Event::new(
482                                    module_path!(),
483                                    ::zeroclaw_log::Action::Note
484                                )
485                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
486                                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
487                                "Deadman alert delivery failed"
488                            );
489                        }
490                        Err(_) => {
491                            ::zeroclaw_log::record!(
492                                WARN,
493                                ::zeroclaw_log::Event::new(
494                                    module_path!(),
495                                    ::zeroclaw_log::Action::Note
496                                )
497                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
498                                "Deadman alert delivery timed out (30s)"
499                            );
500                        }
501                        Ok(Ok(())) => {}
502                    }
503                }
504            }
505        });
506    }
507
508    let base_interval = config.heartbeat.interval_minutes.max(1);
509    let mut sleep_mins = base_interval;
510
511    loop {
512        tokio::time::sleep(Duration::from_secs(u64::from(sleep_mins) * 60)).await;
513
514        // Update uptime
515        {
516            let mut m = metrics.lock();
517            m.uptime_secs = start_time.elapsed().as_secs();
518        }
519
520        let tick_start = std::time::Instant::now();
521
522        // Collect runnable tasks (active only, sorted by priority)
523        let mut tasks = engine.collect_runnable_tasks().await?;
524        let has_high_priority = tasks.iter().any(|t| t.priority == TaskPriority::High);
525
526        if tasks.is_empty() {
527            if let Some(fallback) = config
528                .heartbeat
529                .message
530                .as_deref()
531                .map(str::trim)
532                .filter(|m| !m.is_empty())
533            {
534                tasks.push(HeartbeatTask {
535                    text: fallback.to_string(),
536                    priority: TaskPriority::Medium,
537                    status: TaskStatus::Active,
538                });
539            } else {
540                #[allow(clippy::cast_precision_loss)]
541                let elapsed = tick_start.elapsed().as_millis() as f64;
542                metrics.lock().record_success(elapsed);
543                continue;
544            }
545        }
546
547        // ── Phase 1: LLM decision (two-phase mode) ──────────────
548        let tasks_to_run = if two_phase {
549            let decision_prompt = format!(
550                "[Heartbeat Task | decision] {}",
551                HeartbeatEngine::build_decision_prompt(&tasks),
552            );
553            let phase1_fut = Box::pin(crate::agent::run(
554                config.clone(),
555                &agent_alias,
556                Some(decision_prompt),
557                None,
558                None,
559                Some(0.0),
560                vec![],
561                false,
562                None,
563                None,
564                crate::agent::loop_::AgentRunOverrides::default(),
565            ));
566            let phase1_result = if config.heartbeat.task_timeout_secs > 0 {
567                match tokio::time::timeout(
568                    Duration::from_secs(config.heartbeat.task_timeout_secs),
569                    phase1_fut,
570                )
571                .await
572                {
573                    Ok(r) => r,
574                    Err(_) => {
575                        ::zeroclaw_log::record!(
576                            WARN,
577                            ::zeroclaw_log::Event::new(
578                                module_path!(),
579                                ::zeroclaw_log::Action::Timeout
580                            )
581                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
582                            .with_attrs(::serde_json::json!({
583                                "phase": "phase1_decision",
584                                "timeout_secs": config.heartbeat.task_timeout_secs,
585                            })),
586                            "heartbeat: phase1 decision timed out"
587                        );
588                        Err(anyhow::Error::msg(format!(
589                            "Phase 1 decision timed out ({}s)",
590                            config.heartbeat.task_timeout_secs
591                        )))
592                    }
593                }
594            } else {
595                phase1_fut.await
596            };
597            match phase1_result {
598                Ok(response) => {
599                    let indices = HeartbeatEngine::parse_decision_response(&response, tasks.len());
600                    if indices.is_empty() {
601                        ::zeroclaw_log::record!(
602                            INFO,
603                            ::zeroclaw_log::Event::new(
604                                module_path!(),
605                                ::zeroclaw_log::Action::Note
606                            ),
607                            "heartbeat phase 1: skip (nothing to do)"
608                        );
609                        crate::health::mark_component_ok("heartbeat");
610                        #[allow(clippy::cast_precision_loss)]
611                        let elapsed = tick_start.elapsed().as_millis() as f64;
612                        metrics.lock().record_success(elapsed);
613                        continue;
614                    }
615                    ::zeroclaw_log::record!(INFO, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_attrs(::serde_json::json!({"selected": indices.len(), "total": tasks.len()})), "heartbeat phase 1: running task subset");
616                    indices
617                        .into_iter()
618                        .filter_map(|i| tasks.get(i).cloned())
619                        .collect()
620                }
621                Err(e) => {
622                    ::zeroclaw_log::record!(
623                        WARN,
624                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
625                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
626                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
627                        "heartbeat phase 1 failed; running all tasks"
628                    );
629                    tasks
630                }
631            }
632        } else {
633            tasks
634        };
635
636        // ── Phase 2: Execute selected tasks ─────────────────────
637        // Re-read session context on every tick so we pick up messages
638        // that arrived since the daemon started.
639        let session_context = if config.heartbeat.load_session_context {
640            load_heartbeat_session_context(&config)
641        } else {
642            None
643        };
644
645        // Create memory once per tick for recall + consolidation.
646        let heartbeat_memory: Option<Box<dyn zeroclaw_memory::Memory>> =
647            zeroclaw_memory::create_memory(
648                &config.memory,
649                &config.data_dir,
650                config
651                    .model_provider_for_agent(&agent_alias)
652                    .and_then(|e| e.api_key.as_deref()),
653            )
654            .ok();
655
656        let mut tick_had_error = false;
657        for task in &tasks_to_run {
658            let task_start = std::time::Instant::now();
659            let task_prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text);
660
661            // Recall relevant memories so heartbeat tasks have context awareness.
662            // Exclude `Conversation` memories to prevent chat context from
663            // leaking into scheduled executions.
664            let memory_context = if let Some(ref mem) = heartbeat_memory {
665                match mem.recall(&task.text, 5, None, None, None).await {
666                    Ok(entries) if !entries.is_empty() => {
667                        let ctx: String = entries
668                            .iter()
669                            .filter(|e| {
670                                !matches!(
671                                    e.category,
672                                    zeroclaw_memory::traits::MemoryCategory::Conversation
673                                )
674                            })
675                            .map(|e| format!("- {}: {}", e.key, e.content))
676                            .collect::<Vec<_>>()
677                            .join("\n");
678                        if ctx.is_empty() {
679                            None
680                        } else {
681                            Some(format!(
682                                "{MEMORY_CONTEXT_OPEN}\n{ctx}\n{MEMORY_CONTEXT_CLOSE}\n\n"
683                            ))
684                        }
685                    }
686                    _ => None,
687                }
688            } else {
689                None
690            };
691
692            let prompt = match (&session_context, &memory_context) {
693                (Some(sc), Some(mc)) => format!("{mc}\n{sc}\n\n{task_prompt}"),
694                (Some(sc), None) => format!("{sc}\n\n{task_prompt}"),
695                (None, Some(mc)) => format!("{mc}\n\n{task_prompt}"),
696                (None, None) => task_prompt,
697            };
698            let temp: Option<f64> = config
699                .model_provider_for_agent(&agent_alias)
700                .and_then(|e| e.temperature);
701            let phase2_fut = Box::pin(crate::agent::run(
702                config.clone(),
703                &agent_alias,
704                Some(prompt),
705                None,
706                None,
707                temp,
708                vec![],
709                false,
710                None,
711                None,
712                crate::agent::loop_::AgentRunOverrides::default(),
713            ));
714            let phase2_result = if config.heartbeat.task_timeout_secs > 0 {
715                match tokio::time::timeout(
716                    Duration::from_secs(config.heartbeat.task_timeout_secs),
717                    phase2_fut,
718                )
719                .await
720                {
721                    Ok(r) => r,
722                    Err(_) => {
723                        ::zeroclaw_log::record!(
724                            WARN,
725                            ::zeroclaw_log::Event::new(
726                                module_path!(),
727                                ::zeroclaw_log::Action::Timeout
728                            )
729                            .with_outcome(::zeroclaw_log::EventOutcome::Failure)
730                            .with_attrs(::serde_json::json!({
731                                "phase": "phase2_heartbeat",
732                                "timeout_secs": config.heartbeat.task_timeout_secs,
733                            })),
734                            "heartbeat task timed out"
735                        );
736                        Err(anyhow::Error::msg(format!(
737                            "Heartbeat task timed out ({}s)",
738                            config.heartbeat.task_timeout_secs
739                        )))
740                    }
741                }
742            } else {
743                phase2_fut.await
744            };
745            match phase2_result {
746                Ok(output) => {
747                    crate::health::mark_component_ok("heartbeat");
748                    #[allow(clippy::cast_possible_truncation)]
749                    let duration_ms = task_start.elapsed().as_millis() as i64;
750                    let now = chrono::Utc::now();
751                    let _ = crate::heartbeat::store::record_run(
752                        &config.data_dir,
753                        &task.text,
754                        &task.priority.to_string(),
755                        now - chrono::Duration::milliseconds(duration_ms),
756                        now,
757                        "ok",
758                        Some(output.as_str()),
759                        duration_ms,
760                        config.heartbeat.max_run_history,
761                    );
762                    // Consolidate heartbeat output to memory for cross-session awareness.
763                    if config.memory.auto_save
764                        && output.chars().count() >= 50
765                        && let Some(ref mem) = heartbeat_memory
766                    {
767                        let key = format!("heartbeat_{}", uuid::Uuid::new_v4());
768                        let summary = if output.len() > 500 {
769                            // Find a valid UTF-8 char boundary at or before 500.
770                            let mut end = 500;
771                            while end > 0 && !output.is_char_boundary(end) {
772                                end -= 1;
773                            }
774                            &output[..end]
775                        } else {
776                            &output
777                        };
778                        let _ = mem
779                            .store(
780                                &key,
781                                &format!("Heartbeat task '{}': {}", task.text, summary),
782                                zeroclaw_memory::MemoryCategory::Daily,
783                                None,
784                            )
785                            .await;
786                    }
787
788                    let announcement = if output.trim().is_empty() {
789                        format!("💓 heartbeat task completed: {}", task.text)
790                    } else {
791                        output
792                    };
793                    if let Some((channel, target)) = &delivery {
794                        let delivery_result = tokio::time::timeout(
795                            Duration::from_secs(30),
796                            crate::cron::scheduler::deliver_announcement(
797                                &config,
798                                channel,
799                                target,
800                                None,
801                                &announcement,
802                            ),
803                        )
804                        .await;
805                        match delivery_result {
806                            Ok(Err(e)) => {
807                                crate::health::mark_component_error(
808                                    "heartbeat",
809                                    format!("delivery failed: {e}"),
810                                );
811                                ::zeroclaw_log::record!(
812                                    WARN,
813                                    ::zeroclaw_log::Event::new(
814                                        module_path!(),
815                                        ::zeroclaw_log::Action::Note
816                                    )
817                                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
818                                    .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
819                                    "Heartbeat delivery failed"
820                                );
821                            }
822                            Err(_) => {
823                                crate::health::mark_component_error(
824                                    "heartbeat",
825                                    "delivery timed out (30s)".to_string(),
826                                );
827                                ::zeroclaw_log::record!(
828                                    WARN,
829                                    ::zeroclaw_log::Event::new(
830                                        module_path!(),
831                                        ::zeroclaw_log::Action::Note
832                                    )
833                                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
834                                    "Heartbeat delivery timed out (30s)"
835                                );
836                            }
837                            Ok(Ok(())) => {}
838                        }
839                    }
840                }
841                Err(e) => {
842                    tick_had_error = true;
843                    #[allow(clippy::cast_possible_truncation)]
844                    let duration_ms = task_start.elapsed().as_millis() as i64;
845                    let now = chrono::Utc::now();
846                    let _ = crate::heartbeat::store::record_run(
847                        &config.data_dir,
848                        &task.text,
849                        &task.priority.to_string(),
850                        now - chrono::Duration::milliseconds(duration_ms),
851                        now,
852                        "error",
853                        Some(&e.to_string()),
854                        duration_ms,
855                        config.heartbeat.max_run_history,
856                    );
857                    crate::health::mark_component_error("heartbeat", e.to_string());
858                    ::zeroclaw_log::record!(
859                        WARN,
860                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
861                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
862                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
863                        "Heartbeat task failed"
864                    );
865                }
866            }
867        }
868
869        // Update metrics
870        #[allow(clippy::cast_precision_loss)]
871        let tick_elapsed = tick_start.elapsed().as_millis() as f64;
872        {
873            let mut m = metrics.lock();
874            if tick_had_error {
875                m.record_failure(tick_elapsed);
876            } else {
877                m.record_success(tick_elapsed);
878            }
879        }
880
881        // Compute next sleep interval
882        if adaptive {
883            let failures = metrics.lock().consecutive_failures;
884            sleep_mins = compute_adaptive_interval(
885                base_interval,
886                config.heartbeat.min_interval_minutes,
887                config.heartbeat.max_interval_minutes,
888                failures,
889                has_high_priority,
890            );
891        } else {
892            sleep_mins = base_interval;
893        }
894    }
895}
896
897/// Resolve delivery target: explicit config > auto-detect first configured channel.
898fn resolve_heartbeat_delivery(config: &Config) -> Result<Option<(String, String)>> {
899    let channel = config
900        .heartbeat
901        .target
902        .as_deref()
903        .map(str::trim)
904        .filter(|value| !value.is_empty());
905    let target = config
906        .heartbeat
907        .to
908        .as_deref()
909        .map(str::trim)
910        .filter(|value| !value.is_empty());
911
912    match (channel, target) {
913        // Both explicitly set — validate and use.
914        (Some(channel), Some(target)) => {
915            validate_heartbeat_channel_config(config, channel)?;
916            Ok(Some((channel.to_string(), target.to_string())))
917        }
918        // Only one set — error.
919        (Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
920        (None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
921        // Neither set — try auto-detect the first configured channel.
922        (None, None) => Ok(auto_detect_heartbeat_channel(config)),
923    }
924}
925
926/// Load recent conversation history for the heartbeat's delivery target and
927/// format it as a text preamble to inject into the task prompt.
928///
929/// Scans `{workspace}/sessions/` for JSONL files whose name starts with
930/// `{channel}_` and ends with `_{to}.jsonl` (or exactly `{channel}_{to}.jsonl`),
931/// then picks the most recently modified match. This handles session key
932/// formats such as `telegram_diskiller.jsonl` and
933/// `telegram_5673725398_diskiller.jsonl`.
934/// Returns `None` when `target`/`to` are not configured or no session exists.
935const HEARTBEAT_SESSION_CONTEXT_MESSAGES: usize = 20;
936
937fn load_heartbeat_session_context(config: &Config) -> Option<String> {
938    use zeroclaw_providers::traits::ChatMessage;
939
940    let channel = config
941        .heartbeat
942        .target
943        .as_deref()
944        .map(str::trim)
945        .filter(|v| !v.is_empty())?;
946    let to = config
947        .heartbeat
948        .to
949        .as_deref()
950        .map(str::trim)
951        .filter(|v| !v.is_empty())?;
952
953    if channel.contains('/') || channel.contains('\\') || to.contains('/') || to.contains('\\') {
954        ::zeroclaw_log::record!(
955            WARN,
956            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
957                .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
958            "heartbeat session context: channel/to contains path separators, skipping"
959        );
960        return None;
961    }
962
963    let sessions_dir = config.data_dir.join("sessions");
964
965    // Find the most recently modified JSONL file that belongs to this target.
966    // Matches both `{channel}_{to}.jsonl` and `{channel}_{anything}_{to}.jsonl`.
967    let prefix = format!("{channel}_");
968    let suffix = format!("_{to}.jsonl");
969    let exact = format!("{channel}_{to}.jsonl");
970    let mid_prefix = format!("{channel}_{to}_");
971
972    let path = std::fs::read_dir(&sessions_dir)
973        .ok()?
974        .filter_map(|e| e.ok())
975        .filter(|e| {
976            let name = e.file_name();
977            let name = name.to_string_lossy();
978            name.ends_with(".jsonl")
979                && (name == exact
980                    || (name.starts_with(&prefix) && name.ends_with(&suffix))
981                    || name.starts_with(&mid_prefix))
982        })
983        .max_by_key(|e| {
984            e.metadata()
985                .and_then(|m| m.modified())
986                .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
987        })
988        .map(|e| e.path())?;
989
990    if !path.exists() {
991        ::zeroclaw_log::record!(
992            DEBUG,
993            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
994                .with_attrs(::serde_json::json!({"channel": channel, "to": to})),
995            "heartbeat session context: no session file found"
996        );
997        return None;
998    }
999
1000    let messages = load_jsonl_messages(&path);
1001    if messages.is_empty() {
1002        return None;
1003    }
1004
1005    let recent: Vec<&ChatMessage> = messages
1006        .iter()
1007        .filter(|m| m.role == "user" || m.role == "assistant")
1008        .rev()
1009        .take(HEARTBEAT_SESSION_CONTEXT_MESSAGES)
1010        .collect::<Vec<_>>()
1011        .into_iter()
1012        .rev()
1013        .collect();
1014
1015    // Only inject context if there is at least one real user message in the
1016    // window. If the JSONL contains only assistant messages (e.g. previous
1017    // heartbeat outputs with no reply yet), skip context to avoid feeding
1018    // Monika's own messages back to her in a loop.
1019    let has_user_message = recent.iter().any(|m| m.role == "user");
1020    if !has_user_message {
1021        ::zeroclaw_log::record!(
1022            DEBUG,
1023            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1024            "💓 Heartbeat session context: no user messages in recent history — skipping"
1025        );
1026        return None;
1027    }
1028
1029    // Use the session file's mtime as a proxy for when the last message arrived.
1030    let last_message_age = std::fs::metadata(&path)
1031        .ok()
1032        .and_then(|m| m.modified().ok())
1033        .and_then(|mtime| mtime.elapsed().ok());
1034
1035    let silence_note = match last_message_age {
1036        Some(age) => {
1037            let mins = age.as_secs() / 60;
1038            if mins < 60 {
1039                format!("(last message ~{mins} minutes ago)\n")
1040            } else {
1041                let hours = mins / 60;
1042                let rem = mins % 60;
1043                if rem == 0 {
1044                    format!("(last message ~{hours}h ago)\n")
1045                } else {
1046                    format!("(last message ~{hours}h {rem}m ago)\n")
1047                }
1048            }
1049        }
1050        None => String::new(),
1051    };
1052
1053    ::zeroclaw_log::record!(
1054        DEBUG,
1055        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
1056        &format!(
1057            "💓 Heartbeat session context: {} messages from {}, silence: {}",
1058            recent.len(),
1059            path.display().to_string(),
1060            silence_note.trim()
1061        )
1062    );
1063
1064    let mut ctx = format!(
1065        "[Recent conversation history — use this for context when composing your message] {silence_note}",
1066    );
1067    for msg in &recent {
1068        let label = if msg.role == "user" { "User" } else { "You" };
1069        // Truncate very long messages to avoid bloating the prompt.
1070        // Use char_indices to avoid panicking on multi-byte UTF-8 characters.
1071        let content = if msg.content.len() > 500 {
1072            let truncate_at = msg
1073                .content
1074                .char_indices()
1075                .map(|(i, _)| i)
1076                .take_while(|&i| i <= 500)
1077                .last()
1078                .unwrap_or(0);
1079            format!("{}…", &msg.content[..truncate_at])
1080        } else {
1081            msg.content.clone()
1082        };
1083        ctx.push_str(label);
1084        ctx.push_str(": ");
1085        ctx.push_str(&content);
1086        ctx.push('\n');
1087    }
1088
1089    Some(ctx)
1090}
1091
1092/// Read the last `HEARTBEAT_SESSION_CONTEXT_MESSAGES` `ChatMessage` lines from
1093/// a JSONL session file using a bounded rolling window so we never hold the
1094/// entire file in memory.
1095fn load_jsonl_messages(path: &std::path::Path) -> Vec<zeroclaw_providers::traits::ChatMessage> {
1096    use std::collections::VecDeque;
1097    use std::io::BufRead;
1098
1099    let file = match std::fs::File::open(path) {
1100        Ok(f) => f,
1101        Err(_) => return Vec::new(),
1102    };
1103    let reader = std::io::BufReader::new(file);
1104    let mut window: VecDeque<zeroclaw_providers::traits::ChatMessage> =
1105        VecDeque::with_capacity(HEARTBEAT_SESSION_CONTEXT_MESSAGES + 1);
1106    for line in reader.lines() {
1107        let Ok(line) = line else { continue };
1108        let trimmed = line.trim();
1109        if trimmed.is_empty() {
1110            continue;
1111        }
1112        if let Ok(msg) = serde_json::from_str::<zeroclaw_providers::traits::ChatMessage>(trimmed) {
1113            window.push_back(msg);
1114            if window.len() > HEARTBEAT_SESSION_CONTEXT_MESSAGES {
1115                window.pop_front();
1116            }
1117        }
1118    }
1119    window.into_iter().collect()
1120}
1121
1122/// Auto-detect the best channel for heartbeat delivery by checking which
1123/// channels are configured. Returns the first match in priority order.
1124fn auto_detect_heartbeat_channel(config: &Config) -> Option<(String, String)> {
1125    // Priority order: telegram > discord > slack > mattermost
1126    // Find the first external peer authorized on a telegram channel
1127    // (peer authorization lives in peer_groups in V3, not on the
1128    // channel block).
1129    if !config.channels.telegram.is_empty() {
1130        for alias in config.channels.telegram.keys() {
1131            let peers = config.channel_external_peers("telegram", alias);
1132            if let Some(target) = peers.into_iter().next() {
1133                return Some(("telegram".to_string(), target));
1134            }
1135        }
1136    }
1137    if !config.channels.discord.is_empty() {
1138        // Discord requires explicit target — can't auto-detect
1139        return None;
1140    }
1141    if !config.channels.slack.is_empty() {
1142        // Slack requires explicit target
1143        return None;
1144    }
1145    if !config.channels.mattermost.is_empty() {
1146        // Mattermost requires explicit target
1147        return None;
1148    }
1149    None
1150}
1151
1152fn validate_heartbeat_channel_config(config: &Config, channel: &str) -> Result<()> {
1153    match channel.to_ascii_lowercase().as_str() {
1154        "telegram" => {
1155            if config.channels.telegram.is_empty() {
1156                anyhow::bail!(
1157                    "heartbeat.target is set to telegram but channels.telegram is not configured"
1158                );
1159            }
1160        }
1161        "discord" => {
1162            if config.channels.discord.is_empty() {
1163                anyhow::bail!(
1164                    "heartbeat.target is set to discord but channels.discord is not configured"
1165                );
1166            }
1167        }
1168        "slack" => {
1169            if config.channels.slack.is_empty() {
1170                anyhow::bail!(
1171                    "heartbeat.target is set to slack but channels.slack is not configured"
1172                );
1173            }
1174        }
1175        "mattermost" => {
1176            if config.channels.mattermost.is_empty() {
1177                anyhow::bail!(
1178                    "heartbeat.target is set to mattermost but channels.mattermost is not configured"
1179                );
1180            }
1181        }
1182        other => anyhow::bail!("unsupported heartbeat.target channel: {other}"),
1183    }
1184
1185    Ok(())
1186}
1187
1188fn has_supervised_channels(config: &Config) -> bool {
1189    config
1190        .channels
1191        .channels()
1192        .iter()
1193        .any(|info| info.configured)
1194}
1195
1196// run_mqtt_sop_listener has been moved to zeroclaw-channels::orchestrator::mqtt.
1197// The daemon now receives it as a callback via DaemonSubsystems::mqtt_start.
1198
1199#[cfg(test)]
1200mod tests {
1201    use super::*;
1202    use tempfile::TempDir;
1203
1204    fn test_config(tmp: &TempDir) -> Config {
1205        let config = Config {
1206            data_dir: tmp.path().join("data"),
1207            config_path: tmp.path().join("config.toml"),
1208            ..Config::default()
1209        };
1210        std::fs::create_dir_all(&config.data_dir).unwrap();
1211        config
1212    }
1213
1214    #[test]
1215    fn state_file_path_uses_config_directory() {
1216        let tmp = TempDir::new().unwrap();
1217        let config = test_config(&tmp);
1218
1219        let path = state_file_path(&config);
1220        assert_eq!(path, tmp.path().join("daemon_state.json"));
1221    }
1222
1223    #[tokio::test]
1224    async fn supervisor_marks_error_and_restart_on_failure() {
1225        let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async {
1226            anyhow::bail!("boom")
1227        });
1228
1229        tokio::time::sleep(Duration::from_millis(50)).await;
1230        handle.abort();
1231        let _ = handle.await;
1232
1233        let snapshot = crate::health::snapshot_json();
1234        let component = &snapshot["components"]["daemon-test-fail"];
1235        assert_eq!(component["status"], "error");
1236        assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
1237        assert!(
1238            component["last_error"]
1239                .as_str()
1240                .unwrap_or("")
1241                .contains("boom")
1242        );
1243    }
1244
1245    #[tokio::test]
1246    async fn supervisor_marks_unexpected_exit_as_error() {
1247        let handle = spawn_component_supervisor("daemon-test-exit", 1, 1, || async { Ok(()) });
1248
1249        tokio::time::sleep(Duration::from_millis(50)).await;
1250        handle.abort();
1251        let _ = handle.await;
1252
1253        let snapshot = crate::health::snapshot_json();
1254        let component = &snapshot["components"]["daemon-test-exit"];
1255        assert_eq!(component["status"], "error");
1256        assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
1257        assert!(
1258            component["last_error"]
1259                .as_str()
1260                .unwrap_or("")
1261                .contains("component exited unexpectedly")
1262        );
1263    }
1264
1265    #[test]
1266    fn detects_no_supervised_channels() {
1267        let config = Config::default();
1268        assert!(!has_supervised_channels(&config));
1269    }
1270
1271    #[test]
1272    fn detects_supervised_channels_present() {
1273        let mut config = Config::default();
1274        config.channels.telegram.insert(
1275            "default".to_string(),
1276            zeroclaw_config::schema::TelegramConfig {
1277                enabled: true,
1278                bot_token: "token".into(),
1279                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1280                draft_update_interval_ms: 1000,
1281                interrupt_on_new_message: false,
1282                mention_only: false,
1283                ack_reactions: None,
1284                proxy_url: None,
1285                approval_timeout_secs: 120,
1286                excluded_tools: vec![],
1287                default_target: None,
1288            },
1289        );
1290        assert!(has_supervised_channels(&config));
1291    }
1292
1293    #[test]
1294    fn detects_dingtalk_as_supervised_channel() {
1295        let mut config = Config::default();
1296        config.channels.dingtalk.insert(
1297            "default".to_string(),
1298            zeroclaw_config::schema::DingTalkConfig {
1299                enabled: true,
1300                client_id: "client_id".into(),
1301                client_secret: "client_secret".into(),
1302                proxy_url: None,
1303                excluded_tools: vec![],
1304            },
1305        );
1306        assert!(has_supervised_channels(&config));
1307    }
1308
1309    #[test]
1310    fn detects_mattermost_as_supervised_channel() {
1311        let mut config = Config::default();
1312        config.channels.mattermost.insert(
1313            "default".to_string(),
1314            zeroclaw_config::schema::MattermostConfig {
1315                enabled: true,
1316                url: "https://mattermost.example.com".into(),
1317                bot_token: Some("token".into()),
1318                login_id: None,
1319                password: None,
1320                channel_ids: vec!["channel-id".into()],
1321                team_ids: vec![],
1322                discover_dms: None,
1323                thread_replies: Some(true),
1324                mention_only: Some(false),
1325                interrupt_on_new_message: false,
1326                proxy_url: None,
1327                excluded_tools: vec![],
1328                default_target: None,
1329            },
1330        );
1331        assert!(has_supervised_channels(&config));
1332    }
1333
1334    #[test]
1335    fn detects_qq_as_supervised_channel() {
1336        let mut config = Config::default();
1337        config.channels.qq.insert(
1338            "default".to_string(),
1339            zeroclaw_config::schema::QQConfig {
1340                enabled: true,
1341                app_id: "app-id".into(),
1342                app_secret: "app-secret".into(),
1343                proxy_url: None,
1344                excluded_tools: vec![],
1345            },
1346        );
1347        assert!(has_supervised_channels(&config));
1348    }
1349
1350    #[test]
1351    fn detects_nextcloud_talk_as_supervised_channel() {
1352        let mut config = Config::default();
1353        config.channels.nextcloud_talk.insert(
1354            "default".to_string(),
1355            zeroclaw_config::schema::NextcloudTalkConfig {
1356                enabled: true,
1357                base_url: "https://cloud.example.com".into(),
1358                app_token: "app-token".into(),
1359                webhook_secret: None,
1360                proxy_url: None,
1361                bot_name: None,
1362                excluded_tools: vec![],
1363                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1364                draft_update_interval_ms: 1000,
1365            },
1366        );
1367        assert!(has_supervised_channels(&config));
1368    }
1369
1370    #[test]
1371    fn webhook_only_config_is_supervised() {
1372        let mut config = Config::default();
1373        config.channels.webhook.insert(
1374            "default".to_string(),
1375            zeroclaw_config::schema::WebhookConfig {
1376                enabled: true,
1377                port: 8080,
1378                listen_path: None,
1379                send_url: None,
1380                send_method: None,
1381                auth_header: None,
1382                secret: None,
1383                excluded_tools: vec![],
1384                max_retries: None,
1385                retry_base_delay_ms: None,
1386                retry_max_delay_ms: None,
1387            },
1388        );
1389        assert!(has_supervised_channels(&config));
1390    }
1391
1392    #[test]
1393    fn resolve_delivery_none_when_unset() {
1394        let config = Config::default();
1395        let target = resolve_heartbeat_delivery(&config).unwrap();
1396        assert!(target.is_none());
1397    }
1398
1399    #[test]
1400    fn resolve_delivery_requires_to_field() {
1401        let mut config = Config::default();
1402        config.heartbeat.target = Some("telegram".into());
1403        let err = resolve_heartbeat_delivery(&config).unwrap_err();
1404        assert!(
1405            err.to_string()
1406                .contains("heartbeat.to is required when heartbeat.target is set")
1407        );
1408    }
1409
1410    #[test]
1411    fn resolve_delivery_requires_target_field() {
1412        let mut config = Config::default();
1413        config.heartbeat.to = Some("123456".into());
1414        let err = resolve_heartbeat_delivery(&config).unwrap_err();
1415        assert!(
1416            err.to_string()
1417                .contains("heartbeat.target is required when heartbeat.to is set")
1418        );
1419    }
1420
1421    #[test]
1422    fn resolve_delivery_rejects_unsupported_channel() {
1423        let mut config = Config::default();
1424        config.heartbeat.target = Some("email".into());
1425        config.heartbeat.to = Some("ops@example.com".into());
1426        let err = resolve_heartbeat_delivery(&config).unwrap_err();
1427        assert!(
1428            err.to_string()
1429                .contains("unsupported heartbeat.target channel")
1430        );
1431    }
1432
1433    #[test]
1434    fn resolve_delivery_requires_channel_configuration() {
1435        let mut config = Config::default();
1436        config.heartbeat.target = Some("telegram".into());
1437        config.heartbeat.to = Some("123456".into());
1438        let err = resolve_heartbeat_delivery(&config).unwrap_err();
1439        assert!(
1440            err.to_string()
1441                .contains("channels.telegram is not configured")
1442        );
1443    }
1444
1445    #[test]
1446    fn resolve_delivery_accepts_telegram_configuration() {
1447        let mut config = Config::default();
1448        config.heartbeat.target = Some("telegram".into());
1449        config.heartbeat.to = Some("123456".into());
1450        config.channels.telegram.insert(
1451            "default".to_string(),
1452            zeroclaw_config::schema::TelegramConfig {
1453                enabled: true,
1454                bot_token: "bot-token".into(),
1455                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1456                draft_update_interval_ms: 1000,
1457                interrupt_on_new_message: false,
1458                mention_only: false,
1459                ack_reactions: None,
1460                proxy_url: None,
1461                approval_timeout_secs: 120,
1462                excluded_tools: vec![],
1463                default_target: None,
1464            },
1465        );
1466
1467        let target = resolve_heartbeat_delivery(&config).unwrap();
1468        assert_eq!(target, Some(("telegram".to_string(), "123456".to_string())));
1469    }
1470
1471    #[test]
1472    fn auto_detect_telegram_when_configured() {
1473        use zeroclaw_config::multi_agent::{PeerGroupConfig, PeerUsername};
1474
1475        let mut config = Config::default();
1476        config.channels.telegram.insert(
1477            "default".to_string(),
1478            zeroclaw_config::schema::TelegramConfig {
1479                enabled: true,
1480                bot_token: "bot-token".into(),
1481                stream_mode: zeroclaw_config::schema::StreamMode::default(),
1482                draft_update_interval_ms: 1000,
1483                interrupt_on_new_message: false,
1484                mention_only: false,
1485                ack_reactions: None,
1486                proxy_url: None,
1487                approval_timeout_secs: 120,
1488                excluded_tools: vec![],
1489                default_target: None,
1490            },
1491        );
1492        // Inbound peer authorization lives in peer_groups in V3.
1493        // Auto-detect picks the first external peer of the synthesized
1494        // `telegram_default` group as the heartbeat target.
1495        config.peer_groups.insert(
1496            "telegram_default".to_string(),
1497            PeerGroupConfig {
1498                channel: "telegram".to_string(),
1499                external_peers: vec![PeerUsername::new("user123")],
1500                ..PeerGroupConfig::default()
1501            },
1502        );
1503
1504        let target = resolve_heartbeat_delivery(&config).unwrap();
1505        assert_eq!(
1506            target,
1507            Some(("telegram".to_string(), "user123".to_string()))
1508        );
1509    }
1510
1511    #[test]
1512    fn auto_detect_none_when_no_channels() {
1513        let config = Config::default();
1514        let target = auto_detect_heartbeat_channel(&config);
1515        assert!(target.is_none());
1516    }
1517
1518    /// Verify that SIGHUP does not cause shutdown — the daemon should ignore it
1519    /// and only terminate on SIGINT or SIGTERM.
1520    #[cfg(unix)]
1521    #[tokio::test]
1522    async fn sighup_does_not_shut_down_daemon() {
1523        use libc;
1524        use tokio::time::{Duration, timeout};
1525
1526        let (_reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1527        let handle = tokio::spawn(wait_for_exit_signal(reload_rx));
1528
1529        // Give the signal handler time to register
1530        tokio::time::sleep(Duration::from_millis(50)).await;
1531
1532        // Send SIGHUP to ourselves — should be ignored by the handler
1533        unsafe { libc::raise(libc::SIGHUP) };
1534
1535        // The future should NOT complete within a short window
1536        let result = timeout(Duration::from_millis(200), handle).await;
1537        assert!(
1538            result.is_err(),
1539            "wait_for_exit_signal should not return after SIGHUP"
1540        );
1541    }
1542
1543    /// In-process reload channel returns DaemonExit::Reload so the outer
1544    /// loop can re-init. Cross-platform — works on Linux, macOS, Windows.
1545    #[tokio::test]
1546    async fn reload_channel_returns_reload() {
1547        use tokio::time::{Duration, timeout};
1548
1549        let (reload_tx, reload_rx) = tokio::sync::watch::channel(false);
1550        let handle = tokio::spawn(wait_for_exit_signal(reload_rx));
1551        tokio::time::sleep(Duration::from_millis(50)).await;
1552        reload_tx.send(true).expect("send reload");
1553
1554        let result = timeout(Duration::from_secs(2), handle)
1555            .await
1556            .expect("wait_for_exit_signal should return after reload signal")
1557            .expect("task should not panic")
1558            .expect("signal handler should not error");
1559        assert_eq!(result, DaemonExit::Reload);
1560    }
1561}