1use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26
27use anyhow::Result;
28use async_trait::async_trait;
29use tokio::sync::{Mutex, oneshot};
30use zeroclaw_api::attribution::{Attributable, Role};
31use zeroclaw_api::channel::{
32 Channel, ChannelApprovalRequest, ChannelApprovalResponse, ChannelMessage, SendMessage,
33};
34use zeroclaw_config::schema::{DEFAULT_REPLY_QUEUE_DEPTH, HasReplyPacing, PACING_RECIPIENT_CAP};
35
36enum PacedOp {
43 Send(SendMessage),
45 FinalizeDraft {
48 recipient: String,
49 message_id: String,
50 text: String,
51 },
52}
53
54impl PacedOp {
55 fn recipient(&self) -> &str {
57 match self {
58 Self::Send(message) => &message.recipient,
59 Self::FinalizeDraft { recipient, .. } => recipient,
60 }
61 }
62
63 fn payload_chars(&self) -> usize {
65 match self {
66 Self::Send(message) => message.content.chars().count(),
67 Self::FinalizeDraft { text, .. } => text.chars().count(),
68 }
69 }
70
71 async fn dispatch(self, inner: &Arc<dyn Channel>) -> Result<()> {
73 match self {
74 Self::Send(message) => inner.send(&message).await,
75 Self::FinalizeDraft {
76 recipient,
77 message_id,
78 text,
79 } => inner.finalize_draft(&recipient, &message_id, &text).await,
80 }
81 }
82}
83
84struct PendingSend {
86 op: PacedOp,
87 reply: oneshot::Sender<Result<()>>,
91}
92
93struct RecipientState {
95 next_allowed_at: Instant,
97 queue: VecDeque<PendingSend>,
99 worker_running: bool,
102 in_flight: bool,
109 last_touched: u64,
112}
113
114pub struct PacedChannel {
115 inner: Arc<dyn Channel>,
116 min_interval: Duration,
117 queue_depth: usize,
118 recipients: Arc<Mutex<RecipientMap>>,
121}
122
123struct RecipientMap {
126 inner: HashMap<String, RecipientState>,
127 touch_counter: u64,
128}
129
130impl RecipientMap {
131 fn new() -> Self {
132 Self {
133 inner: HashMap::new(),
134 touch_counter: 0,
135 }
136 }
137
138 fn touch(&mut self) -> u64 {
141 self.touch_counter = self.touch_counter.wrapping_add(1);
142 self.touch_counter
143 }
144
145 fn evict_if_over_cap(&mut self) {
151 if self.inner.len() < PACING_RECIPIENT_CAP {
152 return;
153 }
154 let victim = self
158 .inner
159 .iter()
160 .filter(|(_, s)| s.queue.is_empty() && !s.worker_running && !s.in_flight)
161 .min_by_key(|(_, s)| s.last_touched)
162 .map(|(k, _)| k.clone());
163 if let Some(key) = victim {
164 self.inner.remove(&key);
165 }
166 }
167}
168
169impl Attributable for PacedChannel {
170 fn role(&self) -> Role {
171 self.inner.role()
172 }
173 fn alias(&self) -> &str {
174 self.inner.alias()
175 }
176}
177
178impl PacedChannel {
179 pub fn wrap(inner: Arc<dyn Channel>, cfg: &dyn HasReplyPacing) -> Arc<dyn Channel> {
184 let min_interval_secs = cfg.reply_min_interval_secs();
185 if min_interval_secs == 0 {
186 return inner;
187 }
188 let depth_cfg = cfg.reply_queue_depth_max();
189 let queue_depth = if depth_cfg == 0 {
190 usize::from(DEFAULT_REPLY_QUEUE_DEPTH)
191 } else {
192 usize::from(depth_cfg)
193 };
194 Arc::new(Self {
195 inner,
196 min_interval: Duration::from_secs(min_interval_secs),
197 queue_depth,
198 recipients: Arc::new(Mutex::new(RecipientMap::new())),
199 })
200 }
201
202 async fn paced_dispatch(&self, op: PacedOp) -> Result<()> {
208 let recipient_key = op.recipient().to_string();
209
210 let decision: (Option<PacedOp>, Option<oneshot::Receiver<Result<()>>>, bool) = {
218 let mut map = self.recipients.lock().await;
219 map.evict_if_over_cap();
220 let now = Instant::now();
221 let touch = map.touch();
222 let state = map
223 .inner
224 .entry(recipient_key.clone())
225 .or_insert(RecipientState {
226 next_allowed_at: now,
227 queue: VecDeque::new(),
228 worker_running: false,
229 in_flight: false,
230 last_touched: touch,
231 });
232 state.last_touched = touch;
233
234 if state.queue.is_empty()
235 && !state.worker_running
236 && !state.in_flight
237 && now >= state.next_allowed_at
238 {
239 state.next_allowed_at = now + self.min_interval;
240 state.in_flight = true;
241 (Some(op), None, false)
242 } else if state.queue.len() >= self.queue_depth {
243 ::zeroclaw_log::record!(
244 WARN,
245 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject,)
246 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
247 .with_attrs(::serde_json::json!({
248 "channel_alias": self.inner.alias(),
249 "recipient": redact_recipient(&recipient_key),
250 "queue_depth": state.queue.len(),
251 "queue_max": self.queue_depth,
252 "dropped_chars": op.payload_chars(),
253 })),
254 "paced channel queue full: dropping newest outbound message"
255 );
256 (None, None, false)
257 } else {
258 let (tx, rx) = oneshot::channel();
259 state.queue.push_back(PendingSend { op, reply: tx });
260 let spawn = !state.worker_running;
261 if spawn {
262 state.worker_running = true;
263 }
264 (None, Some(rx), spawn)
265 }
266 };
267
268 let (immediate, awaited, spawn_worker) = decision;
269 if let Some(op) = immediate {
270 let result = op.dispatch(&self.inner).await;
271 let spawn = {
275 let mut map = self.recipients.lock().await;
276 if let Some(state) = map.inner.get_mut(&recipient_key) {
277 state.in_flight = false;
278 let needs_worker = !state.queue.is_empty() && !state.worker_running;
279 if needs_worker {
280 state.worker_running = true;
281 }
282 needs_worker
283 } else {
284 false
285 }
286 };
287 if spawn {
288 self.spawn_drain_worker(recipient_key);
289 }
290 return result;
291 }
292 if let Some(rx) = awaited {
293 if spawn_worker {
294 self.spawn_drain_worker(recipient_key);
295 }
296 return rx.await.unwrap_or_else(|_| {
297 Err(anyhow::Error::msg(
298 "paced channel worker dropped before send completed",
299 ))
300 });
301 }
302 Ok(())
303 }
304
305 fn spawn_drain_worker(&self, recipient: String) {
309 let recipients = Arc::clone(&self.recipients);
310 let inner = Arc::clone(&self.inner);
311 let min_interval = self.min_interval;
312 zeroclaw_spawn::spawn!(async move {
313 loop {
314 let sleep_for = {
316 let map = recipients.lock().await;
317 let Some(state) = map.inner.get(&recipient) else {
318 return;
319 };
320 state
321 .next_allowed_at
322 .saturating_duration_since(Instant::now())
323 };
324 if !sleep_for.is_zero() {
325 tokio::time::sleep(sleep_for).await;
326 }
327
328 let pending = {
332 let mut map = recipients.lock().await;
333 let Some(state) = map.inner.get_mut(&recipient) else {
334 return;
335 };
336 if state.queue.is_empty() {
337 state.worker_running = false;
338 return;
339 }
340 state.next_allowed_at = Instant::now() + min_interval;
341 state.queue.pop_front()
342 };
343 if let Some(PendingSend { op, reply }) = pending {
344 let result = op.dispatch(&inner).await;
345 let _ = reply.send(result);
346 }
347 }
348 });
349 }
350}
351
352fn redact_recipient(raw: &str) -> String {
357 let chars: Vec<char> = raw.chars().collect();
358 if chars.len() <= 2 {
359 return "***".to_string();
360 }
361 let first = chars.first().copied().unwrap_or('*');
362 let last = chars.last().copied().unwrap_or('*');
363 format!("{first}***{last}<len={}>", chars.len())
364}
365
366#[async_trait]
367impl Channel for PacedChannel {
368 fn name(&self) -> &str {
369 self.inner.name()
370 }
371
372 async fn send(&self, message: &SendMessage) -> Result<()> {
373 self.paced_dispatch(PacedOp::Send(message.clone())).await
374 }
375
376 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
377 self.inner.listen(tx).await
378 }
379
380 async fn health_check(&self) -> bool {
381 self.inner.health_check().await
382 }
383
384 async fn start_typing(&self, recipient: &str) -> Result<()> {
385 self.inner.start_typing(recipient).await
386 }
387
388 async fn stop_typing(&self, recipient: &str) -> Result<()> {
389 self.inner.stop_typing(recipient).await
390 }
391
392 fn supports_draft_updates(&self) -> bool {
393 self.inner.supports_draft_updates()
394 }
395
396 fn supports_multi_message_streaming(&self) -> bool {
397 self.inner.supports_multi_message_streaming()
398 }
399
400 fn multi_message_delay_ms(&self) -> u64 {
401 self.inner.multi_message_delay_ms()
402 }
403
404 async fn send_draft(&self, message: &SendMessage) -> Result<Option<String>> {
405 self.inner.send_draft(message).await
408 }
409
410 async fn update_draft(&self, recipient: &str, message_id: &str, text: &str) -> Result<()> {
411 self.inner.update_draft(recipient, message_id, text).await
412 }
413
414 async fn update_draft_progress(
415 &self,
416 recipient: &str,
417 message_id: &str,
418 text: &str,
419 ) -> Result<()> {
420 self.inner
421 .update_draft_progress(recipient, message_id, text)
422 .await
423 }
424
425 async fn finalize_draft(&self, recipient: &str, message_id: &str, text: &str) -> Result<()> {
426 self.paced_dispatch(PacedOp::FinalizeDraft {
432 recipient: recipient.to_string(),
433 message_id: message_id.to_string(),
434 text: text.to_string(),
435 })
436 .await
437 }
438
439 async fn cancel_draft(&self, recipient: &str, message_id: &str) -> Result<()> {
440 self.inner.cancel_draft(recipient, message_id).await
441 }
442
443 async fn add_reaction(&self, channel_id: &str, message_id: &str, emoji: &str) -> Result<()> {
444 self.inner.add_reaction(channel_id, message_id, emoji).await
445 }
446
447 async fn remove_reaction(&self, channel_id: &str, message_id: &str, emoji: &str) -> Result<()> {
448 self.inner
449 .remove_reaction(channel_id, message_id, emoji)
450 .await
451 }
452
453 async fn pin_message(&self, channel_id: &str, message_id: &str) -> Result<()> {
454 self.inner.pin_message(channel_id, message_id).await
455 }
456
457 async fn unpin_message(&self, channel_id: &str, message_id: &str) -> Result<()> {
458 self.inner.unpin_message(channel_id, message_id).await
459 }
460
461 async fn redact_message(
462 &self,
463 channel_id: &str,
464 message_id: &str,
465 reason: Option<String>,
466 ) -> Result<()> {
467 self.inner
468 .redact_message(channel_id, message_id, reason)
469 .await
470 }
471
472 async fn request_approval(
473 &self,
474 recipient: &str,
475 request: &ChannelApprovalRequest,
476 ) -> Result<Option<ChannelApprovalResponse>> {
477 self.inner.request_approval(recipient, request).await
478 }
479
480 async fn request_choice(
481 &self,
482 question: &str,
483 choices: &[String],
484 timeout: Duration,
485 ) -> Result<Option<String>> {
486 self.inner.request_choice(question, choices, timeout).await
487 }
488
489 fn supports_free_form_ask(&self) -> bool {
490 self.inner.supports_free_form_ask()
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use std::sync::atomic::{AtomicUsize, Ordering};
498
499 struct PacingFixture {
503 interval_secs: u64,
504 depth: u16,
505 }
506 impl HasReplyPacing for PacingFixture {
507 fn reply_min_interval_secs(&self) -> u64 {
508 self.interval_secs
509 }
510 fn reply_queue_depth_max(&self) -> u16 {
511 self.depth
512 }
513 }
514
515 struct CountingChannel {
516 sends: AtomicUsize,
517 finalize_drafts: AtomicUsize,
518 }
519
520 impl Attributable for CountingChannel {
521 fn role(&self) -> Role {
522 Role::Channel(zeroclaw_api::attribution::ChannelKind::Cli)
524 }
525 fn alias(&self) -> &str {
526 "counting"
527 }
528 }
529
530 #[async_trait]
531 impl Channel for CountingChannel {
532 fn name(&self) -> &str {
533 "counting"
534 }
535 async fn send(&self, _message: &SendMessage) -> Result<()> {
536 self.sends.fetch_add(1, Ordering::SeqCst);
537 Ok(())
538 }
539 async fn listen(&self, _tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
540 Ok(())
541 }
542 fn supports_draft_updates(&self) -> bool {
543 true
544 }
545 async fn finalize_draft(
546 &self,
547 _recipient: &str,
548 _message_id: &str,
549 _text: &str,
550 ) -> Result<()> {
551 self.finalize_drafts.fetch_add(1, Ordering::SeqCst);
552 Ok(())
553 }
554 }
555
556 #[tokio::test]
557 async fn zero_interval_is_passthrough() {
558 let inner = Arc::new(CountingChannel {
559 sends: AtomicUsize::new(0),
560 finalize_drafts: AtomicUsize::new(0),
561 });
562 let cfg = PacingFixture {
563 interval_secs: 0,
564 depth: 0,
565 };
566 let wrapped = PacedChannel::wrap(inner.clone(), &cfg);
567 assert!(Arc::ptr_eq(&wrapped, &(inner as Arc<dyn Channel>)));
571 }
572
573 #[tokio::test]
574 async fn first_send_records_recipient_state() {
575 let counting = Arc::new(CountingChannel {
576 sends: AtomicUsize::new(0),
577 finalize_drafts: AtomicUsize::new(0),
578 });
579 let inner: Arc<dyn Channel> = counting.clone();
580 let cfg = PacingFixture {
585 interval_secs: 3600,
586 depth: 0,
587 };
588 let paced = PacedChannel::wrap(inner, &cfg);
589 paced
590 .send(&SendMessage::new("hello", "alice"))
591 .await
592 .unwrap();
593 assert_eq!(
594 counting.sends.load(Ordering::SeqCst),
595 1,
596 "first send to a recipient must forward immediately",
597 );
598 }
599
600 #[tokio::test]
601 async fn different_recipients_track_state_independently() {
602 let counting = Arc::new(CountingChannel {
603 sends: AtomicUsize::new(0),
604 finalize_drafts: AtomicUsize::new(0),
605 });
606 let inner: Arc<dyn Channel> = counting.clone();
607 let cfg = PacingFixture {
610 interval_secs: 3600,
611 depth: 0,
612 };
613 let paced = PacedChannel::wrap(inner, &cfg);
614 paced
615 .send(&SendMessage::new("hi alice", "alice"))
616 .await
617 .unwrap();
618 paced
619 .send(&SendMessage::new("hi bob", "bob"))
620 .await
621 .unwrap();
622 assert_eq!(
623 counting.sends.load(Ordering::SeqCst),
624 2,
625 "each recipient must dispatch on its own; alice's floor must not block bob's send",
626 );
627 }
628
629 #[tokio::test]
630 async fn small_interval_sleeps_long_enough_between_repeats() {
631 let counting = Arc::new(CountingChannel {
632 sends: AtomicUsize::new(0),
633 finalize_drafts: AtomicUsize::new(0),
634 });
635 let inner: Arc<dyn Channel> = counting.clone();
636 let cfg = PacingFixture {
637 interval_secs: 1,
638 depth: 4,
639 };
640 let paced = PacedChannel::wrap(inner, &cfg);
641 paced
642 .send(&SendMessage::new("first", "alice"))
643 .await
644 .unwrap();
645 let t1 = Instant::now();
646 paced
647 .send(&SendMessage::new("second", "alice"))
648 .await
649 .unwrap();
650 let elapsed = t1.elapsed();
651 assert!(
652 elapsed >= Duration::from_millis(900),
653 "second send to same recipient should wait ~min_interval; got {elapsed:?}",
654 );
655 assert_eq!(counting.sends.load(Ordering::SeqCst), 2);
656 }
657
658 #[tokio::test]
659 async fn queue_overflow_drops_newest_and_warns() {
660 let counting = Arc::new(CountingChannel {
661 sends: AtomicUsize::new(0),
662 finalize_drafts: AtomicUsize::new(0),
663 });
664 let inner: Arc<dyn Channel> = counting.clone();
665 let cfg = PacingFixture {
666 interval_secs: 1,
667 depth: 2,
668 };
669 let paced = PacedChannel::wrap(inner, &cfg);
670 paced
674 .send(&SendMessage::new("first", "alice"))
675 .await
676 .unwrap();
677 let paced_a = Arc::clone(&paced);
682 let paced_b = Arc::clone(&paced);
683 let h_a =
684 zeroclaw_spawn::spawn!(
685 async move { paced_a.send(&SendMessage::new("a", "alice")).await }
686 );
687 let h_b =
688 zeroclaw_spawn::spawn!(
689 async move { paced_b.send(&SendMessage::new("b", "alice")).await }
690 );
691 tokio::time::sleep(Duration::from_millis(50)).await;
695 paced
697 .send(&SendMessage::new("overflow", "alice"))
698 .await
699 .unwrap();
700 let (a, b) = tokio::join!(h_a, h_b);
702 a.unwrap().unwrap();
703 b.unwrap().unwrap();
704 assert_eq!(
707 counting.sends.load(Ordering::SeqCst),
708 3,
709 "queue overflow must drop the newest send before the inner channel sees it",
710 );
711 }
712
713 #[tokio::test]
714 async fn finalize_draft_dispatches_to_inner_finalize_not_send() {
715 let counting = Arc::new(CountingChannel {
716 sends: AtomicUsize::new(0),
717 finalize_drafts: AtomicUsize::new(0),
718 });
719 let inner: Arc<dyn Channel> = counting.clone();
720 let cfg = PacingFixture {
722 interval_secs: 3600,
723 depth: 4,
724 };
725 let paced = PacedChannel::wrap(inner, &cfg);
726 paced
727 .finalize_draft("alice", "msg-1", "final text")
728 .await
729 .unwrap();
730 assert_eq!(
734 counting.finalize_drafts.load(Ordering::SeqCst),
735 1,
736 "finalize_draft must dispatch to inner.finalize_draft",
737 );
738 assert_eq!(
739 counting.sends.load(Ordering::SeqCst),
740 0,
741 "finalize_draft must not be routed through inner.send",
742 );
743 }
744
745 #[tokio::test]
746 async fn queued_finalize_draft_preserves_op_through_worker() {
747 let counting = Arc::new(CountingChannel {
748 sends: AtomicUsize::new(0),
749 finalize_drafts: AtomicUsize::new(0),
750 });
751 let inner: Arc<dyn Channel> = counting.clone();
752 let cfg = PacingFixture {
753 interval_secs: 1,
754 depth: 4,
755 };
756 let paced = PacedChannel::wrap(inner, &cfg);
757 paced
759 .send(&SendMessage::new("first", "alice"))
760 .await
761 .unwrap();
762 paced
765 .finalize_draft("alice", "msg-1", "final text")
766 .await
767 .unwrap();
768 assert_eq!(
769 counting.finalize_drafts.load(Ordering::SeqCst),
770 1,
771 "queued finalize_draft must dispatch to inner.finalize_draft via the worker",
772 );
773 assert_eq!(
774 counting.sends.load(Ordering::SeqCst),
775 1,
776 "only the first send should reach inner.send; the finalize must not",
777 );
778 }
779
780 struct GatedChannel {
784 sends: AtomicUsize,
785 gate: tokio::sync::Semaphore,
786 }
787
788 impl Attributable for GatedChannel {
789 fn role(&self) -> Role {
790 Role::Channel(zeroclaw_api::attribution::ChannelKind::Cli)
791 }
792 fn alias(&self) -> &str {
793 "gated"
794 }
795 }
796
797 #[async_trait]
798 impl Channel for GatedChannel {
799 fn name(&self) -> &str {
800 "gated"
801 }
802 async fn send(&self, _message: &SendMessage) -> Result<()> {
803 let permit = self.gate.acquire().await.unwrap();
805 permit.forget();
806 self.sends.fetch_add(1, Ordering::SeqCst);
807 Ok(())
808 }
809 async fn listen(&self, _tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
810 Ok(())
811 }
812 }
813
814 #[tokio::test]
819 async fn slow_immediate_send_forces_concurrent_send_to_enqueue() {
820 let gated = Arc::new(GatedChannel {
821 sends: AtomicUsize::new(0),
822 gate: tokio::sync::Semaphore::new(0),
823 });
824 let inner: Arc<dyn Channel> = gated.clone();
825 let cfg = PacingFixture {
829 interval_secs: 1,
830 depth: 4,
831 };
832 let paced = PacedChannel::wrap(inner, &cfg);
833 let paced_a = Arc::clone(&paced);
834
835 let a = zeroclaw_spawn::spawn!(async move {
837 paced_a.send(&SendMessage::new("a", "alice")).await.unwrap();
838 });
839 tokio::time::sleep(Duration::from_millis(50)).await;
841 assert_eq!(
842 gated.sends.load(Ordering::SeqCst),
843 0,
844 "A is gated; no send has completed yet",
845 );
846
847 let paced_b = Arc::clone(&paced);
849 let b = zeroclaw_spawn::spawn!(async move {
850 paced_b.send(&SendMessage::new("b", "alice")).await.unwrap();
851 });
852 tokio::time::sleep(Duration::from_millis(50)).await;
853 assert_eq!(
856 gated.sends.load(Ordering::SeqCst),
857 0,
858 "B must enqueue behind the in-flight A, not take a second immediate path",
859 );
860
861 gated.gate.add_permits(2);
863 a.await.unwrap();
864 b.await.unwrap();
865 assert_eq!(
866 gated.sends.load(Ordering::SeqCst),
867 2,
868 "both sends eventually dispatch exactly once each",
869 );
870 }
871}