Skip to main content

zeroclaw_channels/
paced_channel.rs

1//! Per-(channel, peer) outbound pacing wrapper.
2//!
3//! Wraps a `dyn Channel` so consecutive `send` calls to the same recipient
4//! honour a configured floor on cadence. Drafts and progress updates are
5//! NOT paced — they are streaming UX events where slowing down would
6//! visibly degrade the live response. Only the final `send` (the wire-
7//! level outbound message) and `finalize_draft` enter the queue.
8//!
9//! `min_interval_secs == 0` returns the inner channel unchanged so the
10//! pacing path has zero overhead for the default config.
11//!
12//! When the floor is active the wrapper holds a bounded FIFO queue
13//! per recipient. A send that arrives while the floor still has time
14//! left enqueues. A worker task drains the queue at the floor rate.
15//! When the queue is full the newest send is dropped and a `WARN` is
16//! emitted carrying enough attribution to diagnose the source without
17//! leaking message body. `PACING_RECIPIENT_CAP` bounds the number of
18//! distinct recipient rows retained via idle-state LRU eviction — only
19//! rows with no queued work and no running worker are eligible, so the
20//! cap is a target for idle state, not an unconditional hard bound on a
21//! pathological all-active burst.
22
23use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26
27use anyhow::Result;
28use async_trait::async_trait;
29use tokio::sync::{Mutex, oneshot};
30use zeroclaw_api::attribution::{Attributable, Role};
31use zeroclaw_api::channel::{
32    Channel, ChannelApprovalRequest, ChannelApprovalResponse, ChannelMessage, SendMessage,
33};
34use zeroclaw_config::schema::{DEFAULT_REPLY_QUEUE_DEPTH, HasReplyPacing, PACING_RECIPIENT_CAP};
35
36/// The outbound operation a queued slot will perform when its turn through
37/// the pacing floor arrives. Both `send` and `finalize_draft` are paced, but
38/// they dispatch to different inner-channel methods — normalizing both to a
39/// plain `send` would route a draft finalization (an edit of an existing
40/// message identified by `message_id`) through `send`, creating a new message
41/// and leaving the draft stale on channels that support drafts.
42enum PacedOp {
43    /// A final outbound message. Dispatches to `inner.send`.
44    Send(SendMessage),
45    /// A terminal draft write. Dispatches to `inner.finalize_draft` so the
46    /// channel edits the existing draft rather than posting a new message.
47    FinalizeDraft {
48        recipient: String,
49        message_id: String,
50        text: String,
51    },
52}
53
54impl PacedOp {
55    /// The recipient key this op paces against.
56    fn recipient(&self) -> &str {
57        match self {
58            Self::Send(message) => &message.recipient,
59            Self::FinalizeDraft { recipient, .. } => recipient,
60        }
61    }
62
63    /// Character count of the payload, for the overflow-drop log.
64    fn payload_chars(&self) -> usize {
65        match self {
66            Self::Send(message) => message.content.chars().count(),
67            Self::FinalizeDraft { text, .. } => text.chars().count(),
68        }
69    }
70
71    /// Dispatch to the correct inner-channel method for this op.
72    async fn dispatch(self, inner: &Arc<dyn Channel>) -> Result<()> {
73        match self {
74            Self::Send(message) => inner.send(&message).await,
75            Self::FinalizeDraft {
76                recipient,
77                message_id,
78                text,
79            } => inner.finalize_draft(&recipient, &message_id, &text).await,
80        }
81    }
82}
83
84/// Per-recipient queued operation waiting on its turn through the pacing floor.
85struct PendingSend {
86    op: PacedOp,
87    /// One-shot back-channel for delivering the eventual send result to
88    /// the caller. The caller awaits this so a paced `send()` still
89    /// returns the inner channel's result rather than swallowing it.
90    reply: oneshot::Sender<Result<()>>,
91}
92
93/// Per-recipient pacing state.
94struct RecipientState {
95    /// Wall-clock time after which the next send to this recipient may fire.
96    next_allowed_at: Instant,
97    /// Pending sends queued behind the floor. Drained FIFO by the worker.
98    queue: VecDeque<PendingSend>,
99    /// `true` while a worker task owns this recipient's queue. Prevents
100    /// spawning a second worker for the same recipient.
101    worker_running: bool,
102    /// `true` while an immediate-path dispatch for this recipient is awaiting
103    /// the inner channel's wire call. Set under the lock before the immediate
104    /// dispatch is released and cleared under the lock when it returns. A send
105    /// that arrives while this is set enqueues instead of taking a second
106    /// immediate path, so a single slow inner send cannot put two wire calls
107    /// in flight to the same recipient and undercut the floor.
108    in_flight: bool,
109    /// Sequence counter so the LRU eviction picks the least-recently-touched
110    /// recipient when the cap is hit.
111    last_touched: u64,
112}
113
114pub struct PacedChannel {
115    inner: Arc<dyn Channel>,
116    min_interval: Duration,
117    queue_depth: usize,
118    /// Per-recipient state. `tokio::sync::Mutex` so the worker can hold
119    /// the lock across `.await` while draining.
120    recipients: Arc<Mutex<RecipientMap>>,
121}
122
123/// Bounded recipient map with LRU eviction. Wrapped so the eviction logic
124/// stays next to the touch counter rather than scattering.
125struct RecipientMap {
126    inner: HashMap<String, RecipientState>,
127    touch_counter: u64,
128}
129
130impl RecipientMap {
131    fn new() -> Self {
132        Self {
133            inner: HashMap::new(),
134            touch_counter: 0,
135        }
136    }
137
138    /// Bump the touch counter and return the new value. Used to stamp
139    /// `last_touched` on whatever recipient row is being modified.
140    fn touch(&mut self) -> u64 {
141        self.touch_counter = self.touch_counter.wrapping_add(1);
142        self.touch_counter
143    }
144
145    /// Evict the least-recently-touched idle recipient when the cap is
146    /// reached. Only rows with no queue, no running worker, and no in-flight
147    /// dispatch are eligible, so an active recipient is never discarded out
148    /// from under its worker or a pending immediate send. If every row is
149    /// active the cap is exceeded until one becomes idle.
150    fn evict_if_over_cap(&mut self) {
151        if self.inner.len() < PACING_RECIPIENT_CAP {
152            return;
153        }
154        // `iter()` walks the recipients to find the smallest `last_touched`.
155        // Idle rows (no queue, no running worker) are preferred; an active
156        // row only loses its slot if every other row is even more recent.
157        let victim = self
158            .inner
159            .iter()
160            .filter(|(_, s)| s.queue.is_empty() && !s.worker_running && !s.in_flight)
161            .min_by_key(|(_, s)| s.last_touched)
162            .map(|(k, _)| k.clone());
163        if let Some(key) = victim {
164            self.inner.remove(&key);
165        }
166    }
167}
168
169impl Attributable for PacedChannel {
170    fn role(&self) -> Role {
171        self.inner.role()
172    }
173    fn alias(&self) -> &str {
174        self.inner.alias()
175    }
176}
177
178impl PacedChannel {
179    /// Wrap `inner` with a pacing floor sourced from `cfg`. When
180    /// `cfg.reply_min_interval_secs() == 0` the inner `Arc` is returned
181    /// unchanged so the default config has zero overhead — no wrapper,
182    /// no mutex, no allocation.
183    pub fn wrap(inner: Arc<dyn Channel>, cfg: &dyn HasReplyPacing) -> Arc<dyn Channel> {
184        let min_interval_secs = cfg.reply_min_interval_secs();
185        if min_interval_secs == 0 {
186            return inner;
187        }
188        let depth_cfg = cfg.reply_queue_depth_max();
189        let queue_depth = if depth_cfg == 0 {
190            usize::from(DEFAULT_REPLY_QUEUE_DEPTH)
191        } else {
192            usize::from(depth_cfg)
193        };
194        Arc::new(Self {
195            inner,
196            min_interval: Duration::from_secs(min_interval_secs),
197            queue_depth,
198            recipients: Arc::new(Mutex::new(RecipientMap::new())),
199        })
200    }
201
202    /// Enqueue or immediately dispatch a paced operation. Returns the inner
203    /// channel's result (immediate path) or the worker's result awaited on a
204    /// oneshot (queued path). Drops the newest op with a `WARN` when the queue
205    /// is full and returns `Ok(())` — overflow is intentional behaviour, not an
206    /// error the agent loop should retry.
207    async fn paced_dispatch(&self, op: PacedOp) -> Result<()> {
208        let recipient_key = op.recipient().to_string();
209
210        // `decision` is built under the lock and consumed after release.
211        // Three shapes share the same outcome carrier so the post-lock
212        // section can use plain `if let` instead of branching on an enum.
213        //
214        // - `(Some(op), None, false)`  — immediate dispatch via inner channel
215        // - `(None, Some(rx), spawn)` — enqueued; await result; maybe spawn worker
216        // - `(None, None, false)` — overflow drop; return Ok
217        let decision: (Option<PacedOp>, Option<oneshot::Receiver<Result<()>>>, bool) = {
218            let mut map = self.recipients.lock().await;
219            map.evict_if_over_cap();
220            let now = Instant::now();
221            let touch = map.touch();
222            let state = map
223                .inner
224                .entry(recipient_key.clone())
225                .or_insert(RecipientState {
226                    next_allowed_at: now,
227                    queue: VecDeque::new(),
228                    worker_running: false,
229                    in_flight: false,
230                    last_touched: touch,
231                });
232            state.last_touched = touch;
233
234            if state.queue.is_empty()
235                && !state.worker_running
236                && !state.in_flight
237                && now >= state.next_allowed_at
238            {
239                state.next_allowed_at = now + self.min_interval;
240                state.in_flight = true;
241                (Some(op), None, false)
242            } else if state.queue.len() >= self.queue_depth {
243                ::zeroclaw_log::record!(
244                    WARN,
245                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject,)
246                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
247                        .with_attrs(::serde_json::json!({
248                            "channel_alias": self.inner.alias(),
249                            "recipient": redact_recipient(&recipient_key),
250                            "queue_depth": state.queue.len(),
251                            "queue_max": self.queue_depth,
252                            "dropped_chars": op.payload_chars(),
253                        })),
254                    "paced channel queue full: dropping newest outbound message"
255                );
256                (None, None, false)
257            } else {
258                let (tx, rx) = oneshot::channel();
259                state.queue.push_back(PendingSend { op, reply: tx });
260                let spawn = !state.worker_running;
261                if spawn {
262                    state.worker_running = true;
263                }
264                (None, Some(rx), spawn)
265            }
266        };
267
268        let (immediate, awaited, spawn_worker) = decision;
269        if let Some(op) = immediate {
270            let result = op.dispatch(&self.inner).await;
271            // Clear the in-flight marker under the lock. Sends that arrived
272            // during this dispatch enqueued behind it (the `in_flight` gate);
273            // hand them to a drain worker so they still observe the floor.
274            let spawn = {
275                let mut map = self.recipients.lock().await;
276                if let Some(state) = map.inner.get_mut(&recipient_key) {
277                    state.in_flight = false;
278                    let needs_worker = !state.queue.is_empty() && !state.worker_running;
279                    if needs_worker {
280                        state.worker_running = true;
281                    }
282                    needs_worker
283                } else {
284                    false
285                }
286            };
287            if spawn {
288                self.spawn_drain_worker(recipient_key);
289            }
290            return result;
291        }
292        if let Some(rx) = awaited {
293            if spawn_worker {
294                self.spawn_drain_worker(recipient_key);
295            }
296            return rx.await.unwrap_or_else(|_| {
297                Err(anyhow::Error::msg(
298                    "paced channel worker dropped before send completed",
299                ))
300            });
301        }
302        Ok(())
303    }
304
305    /// Spawn the worker that drains a recipient's queue at the floor rate.
306    /// One worker per recipient — re-entry is prevented by the
307    /// `worker_running` flag held under the same lock that enqueues.
308    fn spawn_drain_worker(&self, recipient: String) {
309        let recipients = Arc::clone(&self.recipients);
310        let inner = Arc::clone(&self.inner);
311        let min_interval = self.min_interval;
312        zeroclaw_spawn::spawn!(async move {
313            loop {
314                // Wait until the floor has elapsed for this recipient.
315                let sleep_for = {
316                    let map = recipients.lock().await;
317                    let Some(state) = map.inner.get(&recipient) else {
318                        return;
319                    };
320                    state
321                        .next_allowed_at
322                        .saturating_duration_since(Instant::now())
323                };
324                if !sleep_for.is_zero() {
325                    tokio::time::sleep(sleep_for).await;
326                }
327
328                // Pop the next pending send, release the lock before
329                // awaiting the actual wire call. Re-stamp the floor based
330                // on when we dispatched.
331                let pending = {
332                    let mut map = recipients.lock().await;
333                    let Some(state) = map.inner.get_mut(&recipient) else {
334                        return;
335                    };
336                    if state.queue.is_empty() {
337                        state.worker_running = false;
338                        return;
339                    }
340                    state.next_allowed_at = Instant::now() + min_interval;
341                    state.queue.pop_front()
342                };
343                if let Some(PendingSend { op, reply }) = pending {
344                    let result = op.dispatch(&inner).await;
345                    let _ = reply.send(result);
346                }
347            }
348        });
349    }
350}
351
352/// Redact a recipient identifier for log surfaces. The privacy contract
353/// forbids raw JIDs / phones / user IDs in production logs; the redaction
354/// preserves the channel-alias-scoped shape (length + first/last char) so
355/// operators can still correlate dropped sends to recipient cohorts.
356fn redact_recipient(raw: &str) -> String {
357    let chars: Vec<char> = raw.chars().collect();
358    if chars.len() <= 2 {
359        return "***".to_string();
360    }
361    let first = chars.first().copied().unwrap_or('*');
362    let last = chars.last().copied().unwrap_or('*');
363    format!("{first}***{last}<len={}>", chars.len())
364}
365
366#[async_trait]
367impl Channel for PacedChannel {
368    fn name(&self) -> &str {
369        self.inner.name()
370    }
371
372    async fn send(&self, message: &SendMessage) -> Result<()> {
373        self.paced_dispatch(PacedOp::Send(message.clone())).await
374    }
375
376    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
377        self.inner.listen(tx).await
378    }
379
380    async fn health_check(&self) -> bool {
381        self.inner.health_check().await
382    }
383
384    async fn start_typing(&self, recipient: &str) -> Result<()> {
385        self.inner.start_typing(recipient).await
386    }
387
388    async fn stop_typing(&self, recipient: &str) -> Result<()> {
389        self.inner.stop_typing(recipient).await
390    }
391
392    fn supports_draft_updates(&self) -> bool {
393        self.inner.supports_draft_updates()
394    }
395
396    fn supports_multi_message_streaming(&self) -> bool {
397        self.inner.supports_multi_message_streaming()
398    }
399
400    fn multi_message_delay_ms(&self) -> u64 {
401        self.inner.multi_message_delay_ms()
402    }
403
404    async fn send_draft(&self, message: &SendMessage) -> Result<Option<String>> {
405        // Drafts are streaming UX, not final outbound replies — pacing
406        // them would freeze the live preview. Forward unchanged.
407        self.inner.send_draft(message).await
408    }
409
410    async fn update_draft(&self, recipient: &str, message_id: &str, text: &str) -> Result<()> {
411        self.inner.update_draft(recipient, message_id, text).await
412    }
413
414    async fn update_draft_progress(
415        &self,
416        recipient: &str,
417        message_id: &str,
418        text: &str,
419    ) -> Result<()> {
420        self.inner
421            .update_draft_progress(recipient, message_id, text)
422            .await
423    }
424
425    async fn finalize_draft(&self, recipient: &str, message_id: &str, text: &str) -> Result<()> {
426        // Finalise is the terminal write to the draft — route it through the
427        // same pacing queue as `send` so a burst of streamed replies respects
428        // the floor and the overflow contract. The op preserves its identity
429        // so the worker dispatches to `inner.finalize_draft` (editing the
430        // existing draft) rather than `inner.send` (posting a new message).
431        self.paced_dispatch(PacedOp::FinalizeDraft {
432            recipient: recipient.to_string(),
433            message_id: message_id.to_string(),
434            text: text.to_string(),
435        })
436        .await
437    }
438
439    async fn cancel_draft(&self, recipient: &str, message_id: &str) -> Result<()> {
440        self.inner.cancel_draft(recipient, message_id).await
441    }
442
443    async fn add_reaction(&self, channel_id: &str, message_id: &str, emoji: &str) -> Result<()> {
444        self.inner.add_reaction(channel_id, message_id, emoji).await
445    }
446
447    async fn remove_reaction(&self, channel_id: &str, message_id: &str, emoji: &str) -> Result<()> {
448        self.inner
449            .remove_reaction(channel_id, message_id, emoji)
450            .await
451    }
452
453    async fn pin_message(&self, channel_id: &str, message_id: &str) -> Result<()> {
454        self.inner.pin_message(channel_id, message_id).await
455    }
456
457    async fn unpin_message(&self, channel_id: &str, message_id: &str) -> Result<()> {
458        self.inner.unpin_message(channel_id, message_id).await
459    }
460
461    async fn redact_message(
462        &self,
463        channel_id: &str,
464        message_id: &str,
465        reason: Option<String>,
466    ) -> Result<()> {
467        self.inner
468            .redact_message(channel_id, message_id, reason)
469            .await
470    }
471
472    async fn request_approval(
473        &self,
474        recipient: &str,
475        request: &ChannelApprovalRequest,
476    ) -> Result<Option<ChannelApprovalResponse>> {
477        self.inner.request_approval(recipient, request).await
478    }
479
480    async fn request_choice(
481        &self,
482        question: &str,
483        choices: &[String],
484        timeout: Duration,
485    ) -> Result<Option<String>> {
486        self.inner.request_choice(question, choices, timeout).await
487    }
488
489    fn supports_free_form_ask(&self) -> bool {
490        self.inner.supports_free_form_ask()
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use std::sync::atomic::{AtomicUsize, Ordering};
498
499    /// Minimal `HasReplyPacing` for tests so we can construct pacing
500    /// configs without dragging a full `*Config` literal into every
501    /// case. Mirrors the production trait shape exactly.
502    struct PacingFixture {
503        interval_secs: u64,
504        depth: u16,
505    }
506    impl HasReplyPacing for PacingFixture {
507        fn reply_min_interval_secs(&self) -> u64 {
508            self.interval_secs
509        }
510        fn reply_queue_depth_max(&self) -> u16 {
511            self.depth
512        }
513    }
514
515    struct CountingChannel {
516        sends: AtomicUsize,
517        finalize_drafts: AtomicUsize,
518    }
519
520    impl Attributable for CountingChannel {
521        fn role(&self) -> Role {
522            // Reuse an existing channel kind for testing only.
523            Role::Channel(zeroclaw_api::attribution::ChannelKind::Cli)
524        }
525        fn alias(&self) -> &str {
526            "counting"
527        }
528    }
529
530    #[async_trait]
531    impl Channel for CountingChannel {
532        fn name(&self) -> &str {
533            "counting"
534        }
535        async fn send(&self, _message: &SendMessage) -> Result<()> {
536            self.sends.fetch_add(1, Ordering::SeqCst);
537            Ok(())
538        }
539        async fn listen(&self, _tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
540            Ok(())
541        }
542        fn supports_draft_updates(&self) -> bool {
543            true
544        }
545        async fn finalize_draft(
546            &self,
547            _recipient: &str,
548            _message_id: &str,
549            _text: &str,
550        ) -> Result<()> {
551            self.finalize_drafts.fetch_add(1, Ordering::SeqCst);
552            Ok(())
553        }
554    }
555
556    #[tokio::test]
557    async fn zero_interval_is_passthrough() {
558        let inner = Arc::new(CountingChannel {
559            sends: AtomicUsize::new(0),
560            finalize_drafts: AtomicUsize::new(0),
561        });
562        let cfg = PacingFixture {
563            interval_secs: 0,
564            depth: 0,
565        };
566        let wrapped = PacedChannel::wrap(inner.clone(), &cfg);
567        // wrap() returns the inner Arc unchanged when interval == 0 — no
568        // wrapper allocated, no atomic overhead, the default config pays
569        // nothing for pacing it never asked for.
570        assert!(Arc::ptr_eq(&wrapped, &(inner as Arc<dyn Channel>)));
571    }
572
573    #[tokio::test]
574    async fn first_send_records_recipient_state() {
575        let counting = Arc::new(CountingChannel {
576            sends: AtomicUsize::new(0),
577            finalize_drafts: AtomicUsize::new(0),
578        });
579        let inner: Arc<dyn Channel> = counting.clone();
580        // Use 1h to make the wait long enough that we can assert the
581        // recipient row landed in the map without the test actually
582        // sleeping. We never trigger a second send to the same peer,
583        // so no real time elapses.
584        let cfg = PacingFixture {
585            interval_secs: 3600,
586            depth: 0,
587        };
588        let paced = PacedChannel::wrap(inner, &cfg);
589        paced
590            .send(&SendMessage::new("hello", "alice"))
591            .await
592            .unwrap();
593        assert_eq!(
594            counting.sends.load(Ordering::SeqCst),
595            1,
596            "first send to a recipient must forward immediately",
597        );
598    }
599
600    #[tokio::test]
601    async fn different_recipients_track_state_independently() {
602        let counting = Arc::new(CountingChannel {
603            sends: AtomicUsize::new(0),
604            finalize_drafts: AtomicUsize::new(0),
605        });
606        let inner: Arc<dyn Channel> = counting.clone();
607        // 1h interval again — we only ever send once per recipient, so
608        // pacing never actually triggers a sleep on the immediate path.
609        let cfg = PacingFixture {
610            interval_secs: 3600,
611            depth: 0,
612        };
613        let paced = PacedChannel::wrap(inner, &cfg);
614        paced
615            .send(&SendMessage::new("hi alice", "alice"))
616            .await
617            .unwrap();
618        paced
619            .send(&SendMessage::new("hi bob", "bob"))
620            .await
621            .unwrap();
622        assert_eq!(
623            counting.sends.load(Ordering::SeqCst),
624            2,
625            "each recipient must dispatch on its own; alice's floor must not block bob's send",
626        );
627    }
628
629    #[tokio::test]
630    async fn small_interval_sleeps_long_enough_between_repeats() {
631        let counting = Arc::new(CountingChannel {
632            sends: AtomicUsize::new(0),
633            finalize_drafts: AtomicUsize::new(0),
634        });
635        let inner: Arc<dyn Channel> = counting.clone();
636        let cfg = PacingFixture {
637            interval_secs: 1,
638            depth: 4,
639        };
640        let paced = PacedChannel::wrap(inner, &cfg);
641        paced
642            .send(&SendMessage::new("first", "alice"))
643            .await
644            .unwrap();
645        let t1 = Instant::now();
646        paced
647            .send(&SendMessage::new("second", "alice"))
648            .await
649            .unwrap();
650        let elapsed = t1.elapsed();
651        assert!(
652            elapsed >= Duration::from_millis(900),
653            "second send to same recipient should wait ~min_interval; got {elapsed:?}",
654        );
655        assert_eq!(counting.sends.load(Ordering::SeqCst), 2);
656    }
657
658    #[tokio::test]
659    async fn queue_overflow_drops_newest_and_warns() {
660        let counting = Arc::new(CountingChannel {
661            sends: AtomicUsize::new(0),
662            finalize_drafts: AtomicUsize::new(0),
663        });
664        let inner: Arc<dyn Channel> = counting.clone();
665        let cfg = PacingFixture {
666            interval_secs: 1,
667            depth: 2,
668        };
669        let paced = PacedChannel::wrap(inner, &cfg);
670        // First send fires immediately and starts the floor (next allowed
671        // ~1s out). It also takes no path through the queue, so the worker
672        // is not yet spawned.
673        paced
674            .send(&SendMessage::new("first", "alice"))
675            .await
676            .unwrap();
677        // Enqueue `a` and `b` by spawning the sends as tasks so they
678        // actually drive the recipient state's `queue.push_back`. If we
679        // just held them as un-polled futures the queue would stay empty
680        // and the overflow path would never trigger.
681        let paced_a = Arc::clone(&paced);
682        let paced_b = Arc::clone(&paced);
683        let h_a =
684            zeroclaw_spawn::spawn!(
685                async move { paced_a.send(&SendMessage::new("a", "alice")).await }
686            );
687        let h_b =
688            zeroclaw_spawn::spawn!(
689                async move { paced_b.send(&SendMessage::new("b", "alice")).await }
690            );
691        // Yield enough times for both spawned tasks to make it past the
692        // lock acquire and into the queue. 50ms is well inside the 1s
693        // pacing floor so the worker hasn't drained anything yet.
694        tokio::time::sleep(Duration::from_millis(50)).await;
695        // The third lands while the queue is full → drop + WARN, returns Ok.
696        paced
697            .send(&SendMessage::new("overflow", "alice"))
698            .await
699            .unwrap();
700        // Allow the workers to drain at the 1s floor.
701        let (a, b) = tokio::join!(h_a, h_b);
702        a.unwrap().unwrap();
703        b.unwrap().unwrap();
704        // 1 immediate + 2 queued = 3 dispatches; the overflow drop must not
705        // have reached the inner channel.
706        assert_eq!(
707            counting.sends.load(Ordering::SeqCst),
708            3,
709            "queue overflow must drop the newest send before the inner channel sees it",
710        );
711    }
712
713    #[tokio::test]
714    async fn finalize_draft_dispatches_to_inner_finalize_not_send() {
715        let counting = Arc::new(CountingChannel {
716            sends: AtomicUsize::new(0),
717            finalize_drafts: AtomicUsize::new(0),
718        });
719        let inner: Arc<dyn Channel> = counting.clone();
720        // 1h floor: the first op fires immediately, so no real time elapses.
721        let cfg = PacingFixture {
722            interval_secs: 3600,
723            depth: 4,
724        };
725        let paced = PacedChannel::wrap(inner, &cfg);
726        paced
727            .finalize_draft("alice", "msg-1", "final text")
728            .await
729            .unwrap();
730        // A draft finalization must edit the existing draft via the inner
731        // channel's finalize_draft — routing it through send would post a new
732        // message and leave the draft stale.
733        assert_eq!(
734            counting.finalize_drafts.load(Ordering::SeqCst),
735            1,
736            "finalize_draft must dispatch to inner.finalize_draft",
737        );
738        assert_eq!(
739            counting.sends.load(Ordering::SeqCst),
740            0,
741            "finalize_draft must not be routed through inner.send",
742        );
743    }
744
745    #[tokio::test]
746    async fn queued_finalize_draft_preserves_op_through_worker() {
747        let counting = Arc::new(CountingChannel {
748            sends: AtomicUsize::new(0),
749            finalize_drafts: AtomicUsize::new(0),
750        });
751        let inner: Arc<dyn Channel> = counting.clone();
752        let cfg = PacingFixture {
753            interval_secs: 1,
754            depth: 4,
755        };
756        let paced = PacedChannel::wrap(inner, &cfg);
757        // First op fires immediately and starts the floor.
758        paced
759            .send(&SendMessage::new("first", "alice"))
760            .await
761            .unwrap();
762        // Second op (a finalize) is queued behind the floor and drained by
763        // the worker — it must still dispatch as a finalize, not a send.
764        paced
765            .finalize_draft("alice", "msg-1", "final text")
766            .await
767            .unwrap();
768        assert_eq!(
769            counting.finalize_drafts.load(Ordering::SeqCst),
770            1,
771            "queued finalize_draft must dispatch to inner.finalize_draft via the worker",
772        );
773        assert_eq!(
774            counting.sends.load(Ordering::SeqCst),
775            1,
776            "only the first send should reach inner.send; the finalize must not",
777        );
778    }
779
780    /// A channel whose `send` blocks until the test releases a gate, so the
781    /// test can hold an immediate-path dispatch in flight and race a second
782    /// send against it.
783    struct GatedChannel {
784        sends: AtomicUsize,
785        gate: tokio::sync::Semaphore,
786    }
787
788    impl Attributable for GatedChannel {
789        fn role(&self) -> Role {
790            Role::Channel(zeroclaw_api::attribution::ChannelKind::Cli)
791        }
792        fn alias(&self) -> &str {
793            "gated"
794        }
795    }
796
797    #[async_trait]
798    impl Channel for GatedChannel {
799        fn name(&self) -> &str {
800            "gated"
801        }
802        async fn send(&self, _message: &SendMessage) -> Result<()> {
803            // Block until the test grants a permit, then count the send.
804            let permit = self.gate.acquire().await.unwrap();
805            permit.forget();
806            self.sends.fetch_add(1, Ordering::SeqCst);
807            Ok(())
808        }
809        async fn listen(&self, _tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
810            Ok(())
811        }
812    }
813
814    /// A slow inner send must not let a second concurrent send to the same
815    /// recipient take a second immediate path. The `in_flight` marker forces
816    /// the racing send to enqueue, so only one wire call is ever in flight to
817    /// a recipient at a time even when the inner send outlasts the floor.
818    #[tokio::test]
819    async fn slow_immediate_send_forces_concurrent_send_to_enqueue() {
820        let gated = Arc::new(GatedChannel {
821            sends: AtomicUsize::new(0),
822            gate: tokio::sync::Semaphore::new(0),
823        });
824        let inner: Arc<dyn Channel> = gated.clone();
825        // Sub-second floor: by the time the second send arrives the floor has
826        // already elapsed, so only the `in_flight` marker — not the floor —
827        // can keep the second send off the immediate path.
828        let cfg = PacingFixture {
829            interval_secs: 1,
830            depth: 4,
831        };
832        let paced = PacedChannel::wrap(inner, &cfg);
833        let paced_a = Arc::clone(&paced);
834
835        // Send A takes the immediate path and blocks inside inner.send.
836        let a = zeroclaw_spawn::spawn!(async move {
837            paced_a.send(&SendMessage::new("a", "alice")).await.unwrap();
838        });
839        // Wait until A is parked inside inner.send (in flight).
840        tokio::time::sleep(Duration::from_millis(50)).await;
841        assert_eq!(
842            gated.sends.load(Ordering::SeqCst),
843            0,
844            "A is gated; no send has completed yet",
845        );
846
847        // Send B races in while A is in flight and the floor has elapsed.
848        let paced_b = Arc::clone(&paced);
849        let b = zeroclaw_spawn::spawn!(async move {
850            paced_b.send(&SendMessage::new("b", "alice")).await.unwrap();
851        });
852        tokio::time::sleep(Duration::from_millis(50)).await;
853        // B must have enqueued, not dispatched: still zero completed sends and
854        // nothing new reached the inner channel while A holds the gate.
855        assert_eq!(
856            gated.sends.load(Ordering::SeqCst),
857            0,
858            "B must enqueue behind the in-flight A, not take a second immediate path",
859        );
860
861        // Release both: A completes, then the worker drains B at the floor.
862        gated.gate.add_permits(2);
863        a.await.unwrap();
864        b.await.unwrap();
865        assert_eq!(
866            gated.sends.load(Ordering::SeqCst),
867            2,
868            "both sends eventually dispatch exactly once each",
869        );
870    }
871}