Skip to main content

zeroclaw_channels/
gmail_push.rs

1//! Gmail Pub/Sub push notification channel.
2//!
3//! Instead of polling via IMAP, this channel uses Google's Gmail Pub/Sub push
4//! notifications.  Google sends a POST to our webhook endpoint whenever the
5//! user's mailbox changes.  The notification body contains a base64-encoded
6//! JSON payload with `emailAddress` and `historyId`; we then call the Gmail
7//! History API to fetch newly arrived messages.
8//!
9//! ## Setup
10//!
11//! 1. Create a Google Cloud Pub/Sub topic and grant `gmail-api-push@system.gserviceaccount.com`
12//!    the **Pub/Sub Publisher** role on that topic.
13//! 2. Create a push subscription pointing to `https://<your-domain>/webhook/gmail`.
14//! 3. Configure `[channels_config.gmail_push]` in `config.toml` with `topic` and
15//!    `oauth_token`.
16//!
17//! The channel automatically calls `users.watch` to register the subscription
18//! and renews it before the 7-day expiry.
19
20use anyhow::Result;
21use async_trait::async_trait;
22use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
23use reqwest::Client;
24use serde::{Deserialize, Serialize};
25use std::fmt::Write as _;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime, UNIX_EPOCH};
28use tokio::sync::{Mutex, mpsc};
29
30use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
31
32pub use zeroclaw_config::scattered_types::GmailPushConfig;
33
34// ── Pub/Sub notification payload ─────────────────────────────────
35
36/// The outer JSON envelope that Google Pub/Sub POSTs to the push endpoint.
37#[derive(Debug, Deserialize, Serialize)]
38pub struct PubSubEnvelope {
39    pub message: PubSubMessage,
40    /// Subscription name (informational).
41    #[serde(default)]
42    pub subscription: String,
43}
44
45/// A single Pub/Sub message inside the envelope.
46#[derive(Debug, Deserialize, Serialize)]
47pub struct PubSubMessage {
48    /// Base64-encoded JSON data from Gmail.
49    pub data: String,
50    /// Pub/Sub message ID.
51    #[serde(default, rename = "messageId")]
52    pub message_id: String,
53    /// Publish timestamp (RFC 3339).
54    #[serde(default, rename = "publishTime")]
55    pub publish_time: String,
56}
57
58/// The decoded payload inside `PubSubMessage.data`.
59#[derive(Debug, Deserialize, Serialize)]
60pub struct GmailNotification {
61    /// Email address of the affected mailbox.
62    #[serde(rename = "emailAddress")]
63    pub email_address: String,
64    /// History ID to use as `startHistoryId` for incremental sync.
65    #[serde(rename = "historyId")]
66    pub history_id: u64,
67}
68
69// ── Gmail API response types ─────────────────────────────────────
70
71/// Response from `GET /gmail/v1/users/me/history`.
72#[derive(Debug, Deserialize)]
73pub struct HistoryResponse {
74    pub history: Option<Vec<HistoryRecord>>,
75    #[serde(default, rename = "historyId")]
76    pub history_id: u64,
77    #[serde(default, rename = "nextPageToken")]
78    pub next_page_token: Option<String>,
79}
80
81/// A single history record containing messages added to the mailbox.
82#[derive(Debug, Deserialize)]
83pub struct HistoryRecord {
84    #[serde(default, rename = "messagesAdded")]
85    pub messages_added: Vec<MessageAdded>,
86}
87
88/// Wrapper for a newly added message reference.
89#[derive(Debug, Deserialize)]
90pub struct MessageAdded {
91    pub message: MessageRef,
92}
93
94/// Minimal message reference returned by the history API.
95#[derive(Debug, Deserialize)]
96pub struct MessageRef {
97    pub id: String,
98    #[serde(default, rename = "threadId")]
99    pub thread_id: String,
100}
101
102/// Full message returned by `GET /gmail/v1/users/me/messages/{id}`.
103#[derive(Debug, Deserialize)]
104pub struct GmailMessage {
105    pub id: String,
106    #[serde(default, rename = "threadId")]
107    pub thread_id: String,
108    #[serde(default)]
109    pub snippet: String,
110    pub payload: Option<MessagePayload>,
111    #[serde(default, rename = "internalDate")]
112    pub internal_date: String,
113}
114
115/// Message payload with headers and parts.
116#[derive(Debug, Deserialize)]
117pub struct MessagePayload {
118    #[serde(default)]
119    pub headers: Vec<MessageHeader>,
120    pub body: Option<MessageBody>,
121    #[serde(default)]
122    pub parts: Vec<MessagePart>,
123    #[serde(default, rename = "mimeType")]
124    pub mime_type: String,
125}
126
127/// A single email header (name/value pair).
128#[derive(Debug, Deserialize)]
129pub struct MessageHeader {
130    pub name: String,
131    pub value: String,
132}
133
134/// Message body with optional base64-encoded data.
135#[derive(Debug, Deserialize)]
136pub struct MessageBody {
137    #[serde(default)]
138    pub data: Option<String>,
139    #[serde(default)]
140    pub size: u64,
141}
142
143/// A MIME part of a multipart message.
144#[derive(Debug, Deserialize)]
145pub struct MessagePart {
146    #[serde(default, rename = "mimeType")]
147    pub mime_type: String,
148    pub body: Option<MessageBody>,
149    #[serde(default)]
150    pub parts: Vec<MessagePart>,
151    #[serde(default)]
152    pub filename: String,
153}
154
155/// Response from `POST /gmail/v1/users/me/watch`.
156#[derive(Debug, Deserialize)]
157pub struct WatchResponse {
158    #[serde(default, rename = "historyId")]
159    pub history_id: u64,
160    #[serde(default)]
161    pub expiration: String,
162}
163
164// ── Channel implementation ───────────────────────────────────────
165
166/// Gmail Pub/Sub push notification channel.
167///
168/// Incoming messages arrive via webhook (`POST /webhook/gmail`) and are
169/// dispatched to the agent.  The `listen` method registers the Gmail watch
170/// subscription and periodically renews it.
171///
172/// Inbound sender authorization lives in `peer_groups` in V3; this channel
173/// resolves the authorized senders at message-time via [`Self::peer_resolver`]
174/// rather than reading a per-channel `allowed_senders` field (it no longer
175/// exists on `GmailPushConfig`).
176pub struct GmailPushChannel {
177    pub config: GmailPushConfig,
178    /// The alias key under `[channels.gmail.<alias>]` this handle is
179    /// bound to. Used to scope peer-group writes and resolver lookups.
180    pub alias: String,
181    /// Resolves inbound external peers from canonical state at message-time.
182    /// No cache (see AGENTS.md "ABSOLUTE RULE — SINGLE SOURCE OF TRUTH").
183    pub peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
184    http: Client,
185    last_history_id: Arc<Mutex<u64>>,
186    /// Sender half injected by the gateway to forward webhook-received messages.
187    pub tx: Arc<Mutex<Option<mpsc::Sender<ChannelMessage>>>>,
188}
189
190impl GmailPushChannel {
191    pub fn new(
192        config: GmailPushConfig,
193        alias: impl Into<String>,
194        peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
195    ) -> Self {
196        let http = Client::builder()
197            .timeout(Duration::from_secs(30))
198            .build()
199            .expect("failed to build HTTP client");
200        Self {
201            config,
202            alias: alias.into(),
203            peer_resolver,
204            http,
205            last_history_id: Arc::new(Mutex::new(0)),
206            tx: Arc::new(Mutex::new(None)),
207        }
208    }
209
210    /// Register a Gmail watch subscription via `POST /gmail/v1/users/me/watch`.
211    pub async fn register_watch(&self) -> Result<WatchResponse> {
212        let token = self.config.oauth_token.clone();
213        if token.is_empty() {
214            ::zeroclaw_log::record!(
215                ERROR,
216                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
217                    .with_outcome(::zeroclaw_log::EventOutcome::Failure),
218                "Gmail OAuth token is not configured"
219            );
220            anyhow::bail!("Gmail OAuth token is not configured");
221        }
222
223        let body = serde_json::json!({
224            "topicName": self.config.topic,
225            "labelIds": self.config.label_filter,
226        });
227
228        let resp = self
229            .http
230            .post("https://gmail.googleapis.com/gmail/v1/users/me/watch")
231            .bearer_auth(&token)
232            .json(&body)
233            .send()
234            .await?;
235
236        if !resp.status().is_success() {
237            let status = resp.status();
238            let text = resp.text().await.unwrap_or_default();
239            ::zeroclaw_log::record!(
240                ERROR,
241                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
242                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
243                    .with_attrs(::serde_json::json!({
244                        "phase": "watch_registration",
245                        "status": status.as_u16(),
246                        "body": text,
247                    })),
248                "gmail_push: watch registration failed"
249            );
250            return Err(anyhow::Error::msg(format!(
251                "Gmail watch registration failed ({}): {}",
252                status, text
253            )));
254        }
255
256        let watch: WatchResponse = resp.json().await?;
257        let mut last_id = self.last_history_id.lock().await;
258        if *last_id == 0 {
259            *last_id = watch.history_id;
260        }
261        ::zeroclaw_log::record!(
262            INFO,
263            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
264            &format!(
265                "Gmail watch registered — historyId={}, expiration={}",
266                watch.history_id, watch.expiration
267            )
268        );
269        Ok(watch)
270    }
271
272    /// Fetch new messages since the given `start_history_id` using the History API.
273    pub async fn fetch_history(&self, start_history_id: u64) -> Result<Vec<String>> {
274        let mut last_id = self.last_history_id.lock().await;
275        self.fetch_history_inner(start_history_id, &mut last_id)
276            .await
277    }
278
279    /// Inner history fetch that takes an already-locked history ID reference.
280    /// This allows callers that already hold the lock to avoid deadlock.
281    async fn fetch_history_inner(
282        &self,
283        start_history_id: u64,
284        last_id: &mut u64,
285    ) -> Result<Vec<String>> {
286        let token = self.config.oauth_token.clone();
287        if token.is_empty() {
288            ::zeroclaw_log::record!(
289                ERROR,
290                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
291                    .with_outcome(::zeroclaw_log::EventOutcome::Failure),
292                "Gmail OAuth token is not configured"
293            );
294            anyhow::bail!("Gmail OAuth token is not configured");
295        }
296
297        let mut message_ids = Vec::new();
298        let mut page_token: Option<String> = None;
299
300        loop {
301            let mut url = format!(
302                "https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId={}&historyTypes=messageAdded",
303                start_history_id
304            );
305            if let Some(ref pt) = page_token {
306                let _ = write!(url, "&pageToken={pt}");
307            }
308
309            let resp = self.http.get(&url).bearer_auth(&token).send().await?;
310
311            if !resp.status().is_success() {
312                let status = resp.status();
313                let text = resp.text().await.unwrap_or_default();
314                ::zeroclaw_log::record!(
315                    ERROR,
316                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
317                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
318                        .with_attrs(::serde_json::json!({
319                            "phase": "history_fetch",
320                            "status": status.as_u16(),
321                            "body": text,
322                        })),
323                    "gmail_push: history fetch failed"
324                );
325                return Err(anyhow::Error::msg(format!(
326                    "Gmail history fetch failed ({}): {}",
327                    status, text
328                )));
329            }
330
331            let history_resp: HistoryResponse = resp.json().await?;
332
333            if let Some(records) = history_resp.history {
334                for record in records {
335                    for added in record.messages_added {
336                        message_ids.push(added.message.id);
337                    }
338                }
339            }
340
341            // Update tracked history ID
342            if history_resp.history_id > 0 && history_resp.history_id > *last_id {
343                *last_id = history_resp.history_id;
344            }
345
346            match history_resp.next_page_token {
347                Some(token) => page_token = Some(token),
348                None => break,
349            }
350        }
351
352        Ok(message_ids)
353    }
354
355    /// Fetch a full message by ID from the Gmail API.
356    pub async fn fetch_message(&self, message_id: &str) -> Result<GmailMessage> {
357        let token = self.config.oauth_token.clone();
358        let url = format!(
359            "https://gmail.googleapis.com/gmail/v1/users/me/messages/{}?format=full",
360            message_id
361        );
362
363        let resp = self.http.get(&url).bearer_auth(&token).send().await?;
364
365        if !resp.status().is_success() {
366            let status = resp.status();
367            let text = resp.text().await.unwrap_or_default();
368            ::zeroclaw_log::record!(
369                ERROR,
370                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
371                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
372                    .with_attrs(::serde_json::json!({
373                        "phase": "message_fetch",
374                        "status": status.as_u16(),
375                        "body": text,
376                    })),
377                "gmail_push: message fetch failed"
378            );
379            return Err(anyhow::Error::msg(format!(
380                "Gmail message fetch failed ({}): {}",
381                status, text
382            )));
383        }
384
385        Ok(resp.json().await?)
386    }
387
388    /// Check if a sender email is in the allowlist.
389    ///
390    /// Email allowlist entries support three syntaxes — preserved from
391    /// the legacy `GmailPushConfig::allowed_senders` semantics:
392    /// - `*`                wildcard, allow anyone.
393    /// - `user@host`        full address, case-insensitive.
394    /// - `@host` / `host`   domain match, case-insensitive.
395    pub fn is_sender_allowed(&self, email: &str) -> bool {
396        let peers = (self.peer_resolver)();
397        Self::is_email_sender_allowed(&peers, email)
398    }
399
400    /// Pure, testable predicate that applies the email-allowlist match
401    /// semantics against an already-resolved peer list.
402    fn is_email_sender_allowed(peers: &[String], email: &str) -> bool {
403        if peers.is_empty() {
404            return false;
405        }
406        if peers.iter().any(|a| a == "*") {
407            return true;
408        }
409        let email_lower = email.to_lowercase();
410        peers.iter().any(|allowed| {
411            if allowed.starts_with('@') {
412                email_lower.ends_with(&allowed.to_lowercase())
413            } else if allowed.contains('@') {
414                allowed.eq_ignore_ascii_case(email)
415            } else {
416                email_lower.ends_with(&format!("@{}", allowed.to_lowercase()))
417            }
418        })
419    }
420
421    /// Process a Pub/Sub push notification and dispatch new messages to the agent.
422    pub async fn handle_notification(&self, envelope: &PubSubEnvelope) -> Result<()> {
423        let notification = parse_notification(&envelope.message)?;
424        ::zeroclaw_log::record!(
425            DEBUG,
426            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
427            &format!(
428                "Gmail push notification: email={}, historyId={}",
429                notification.email_address, notification.history_id
430            )
431        );
432
433        // Hold the lock across read-fetch-update to prevent duplicate
434        // processing when concurrent webhook notifications arrive.
435        let mut last_id = self.last_history_id.lock().await;
436
437        if *last_id == 0 {
438            // First notification — just record the history ID.
439            *last_id = notification.history_id;
440            ::zeroclaw_log::record!(
441                INFO,
442                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
443                &format!(
444                    "Gmail push: first notification, seeding historyId={}",
445                    notification.history_id
446                )
447            );
448            return Ok(());
449        }
450
451        let start_id = *last_id;
452        let message_ids = self.fetch_history_inner(start_id, &mut last_id).await?;
453        // Explicitly drop the lock before doing network-heavy message fetching.
454        drop(last_id);
455
456        if message_ids.is_empty() {
457            ::zeroclaw_log::record!(
458                DEBUG,
459                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
460                "Gmail push: no new messages in history"
461            );
462            return Ok(());
463        }
464
465        ::zeroclaw_log::record!(
466            INFO,
467            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
468            &format!(
469                "Gmail push: {} new message(s) to process",
470                message_ids.len()
471            )
472        );
473
474        // Clone the sender and drop the mutex immediately to avoid holding it
475        // across network calls.
476        let tx = {
477            let tx_guard = self.tx.lock().await;
478            match tx_guard.clone() {
479                Some(tx) => tx,
480                None => {
481                    ::zeroclaw_log::record!(
482                        WARN,
483                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
484                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
485                        "Gmail push: no listener registered, dropping messages"
486                    );
487                    return Ok(());
488                }
489            }
490        };
491
492        for msg_id in message_ids {
493            match self.fetch_message(&msg_id).await {
494                Ok(gmail_msg) => {
495                    let sender = extract_header(&gmail_msg, "From").unwrap_or_default();
496                    let sender_email = extract_email_from_header(&sender);
497
498                    if !self.is_sender_allowed(&sender_email) {
499                        ::zeroclaw_log::record!(
500                            WARN,
501                            ::zeroclaw_log::Event::new(
502                                module_path!(),
503                                ::zeroclaw_log::Action::Note
504                            )
505                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
506                            &format!("Gmail push: blocked message from {}", sender_email)
507                        );
508                        continue;
509                    }
510
511                    let subject = extract_header(&gmail_msg, "Subject").unwrap_or_default();
512                    let body_text = extract_body_text(&gmail_msg);
513
514                    let content = format!("Subject: {subject}\n\n{body_text}");
515                    let timestamp = gmail_msg
516                        .internal_date
517                        .parse::<u64>()
518                        .map(|ms| ms / 1000)
519                        .unwrap_or_else(|_| {
520                            SystemTime::now()
521                                .duration_since(UNIX_EPOCH)
522                                .map(|d| d.as_secs())
523                                .unwrap_or(0)
524                        });
525
526                    let channel_msg = ChannelMessage {
527                        id: format!("gmail_{}", gmail_msg.id),
528                        reply_target: sender_email.clone(),
529                        sender: sender_email,
530                        content,
531                        channel: "gmail_push".to_string(),
532                        channel_alias: Some(self.alias.clone()),
533                        timestamp,
534                        thread_ts: Some(gmail_msg.thread_id),
535                        interruption_scope_id: None,
536                        attachments: Vec::new(),
537                        subject: None,
538                    };
539
540                    if tx.send(channel_msg).await.is_err() {
541                        ::zeroclaw_log::record!(
542                            DEBUG,
543                            ::zeroclaw_log::Event::new(
544                                module_path!(),
545                                ::zeroclaw_log::Action::Note
546                            ),
547                            "Gmail push: listener channel closed"
548                        );
549                        return Ok(());
550                    }
551                }
552                Err(e) => {
553                    ::zeroclaw_log::record!(
554                        ERROR,
555                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
556                            .with_outcome(::zeroclaw_log::EventOutcome::Failure),
557                        &format!("Gmail push: failed to fetch message {}: {}", msg_id, e)
558                    );
559                }
560            }
561        }
562
563        Ok(())
564    }
565}
566
567impl ::zeroclaw_api::attribution::Attributable for GmailPushChannel {
568    fn role(&self) -> ::zeroclaw_api::attribution::Role {
569        ::zeroclaw_api::attribution::Role::Channel(
570            ::zeroclaw_api::attribution::ChannelKind::GmailPush,
571        )
572    }
573    fn alias(&self) -> &str {
574        &self.alias
575    }
576}
577
578#[async_trait]
579impl Channel for GmailPushChannel {
580    fn name(&self) -> &str {
581        "gmail_push"
582    }
583
584    async fn send(&self, message: &SendMessage) -> Result<()> {
585        // Send via Gmail API (drafts.send or messages.send)
586        let token = self.config.oauth_token.clone();
587        if token.is_empty() {
588            ::zeroclaw_log::record!(
589                ERROR,
590                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
591                    .with_outcome(::zeroclaw_log::EventOutcome::Failure),
592                "Gmail OAuth token is not configured for sending"
593            );
594            anyhow::bail!("Gmail OAuth token is not configured for sending");
595        }
596
597        let subject = message.subject.as_deref().unwrap_or("ZeroClaw Message");
598        // Sanitize headers to prevent CRLF injection attacks.
599        let safe_recipient = sanitize_header_value(&message.recipient);
600        let safe_subject = sanitize_header_value(subject);
601        let rfc2822 = format!(
602            "To: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n{}",
603            safe_recipient, safe_subject, message.content
604        );
605        let encoded = BASE64.encode(rfc2822.as_bytes());
606        // Gmail API uses URL-safe base64 with no padding
607        let url_safe = encoded.replace('+', "-").replace('/', "_").replace('=', "");
608
609        let body = serde_json::json!({
610            "raw": url_safe,
611        });
612
613        let resp = self
614            .http
615            .post("https://gmail.googleapis.com/gmail/v1/users/me/messages/send")
616            .bearer_auth(&token)
617            .json(&body)
618            .send()
619            .await?;
620
621        if !resp.status().is_success() {
622            let status = resp.status();
623            let text = resp.text().await.unwrap_or_default();
624            ::zeroclaw_log::record!(
625                ERROR,
626                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
627                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
628                    .with_attrs(::serde_json::json!({
629                        "phase": "send",
630                        "status": status.as_u16(),
631                        "body": text,
632                    })),
633                "gmail_push: send failed"
634            );
635            return Err(anyhow::Error::msg(format!(
636                "Gmail send failed ({}): {}",
637                status, text
638            )));
639        }
640
641        ::zeroclaw_log::record!(
642            INFO,
643            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
644            &format!("Gmail message sent to {}", message.recipient)
645        );
646        Ok(())
647    }
648
649    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> Result<()> {
650        // Store the sender for webhook-driven message dispatch
651        {
652            let mut tx_guard = self.tx.lock().await;
653            *tx_guard = Some(tx);
654        }
655
656        ::zeroclaw_log::record!(
657            INFO,
658            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
659            "Gmail push channel started — registering watch subscription"
660        );
661
662        // Register initial watch
663        if !self.config.webhook_url.is_empty()
664            && let Err(e) = self.register_watch().await
665        {
666            ::zeroclaw_log::record!(
667                ERROR,
668                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
669                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
670                    .with_attrs(::serde_json::json!({"e": e.to_string()})),
671                "Gmail watch registration failed"
672            );
673            // Non-fatal — external subscription management may be in use
674        }
675
676        // Renewal loop: Gmail watch subscriptions expire after 7 days.
677        // Re-register every 6 days to maintain continuous coverage.
678        let renewal_interval = Duration::from_secs(6 * 24 * 60 * 60); // 6 days
679        loop {
680            tokio::time::sleep(renewal_interval).await;
681            ::zeroclaw_log::record!(
682                INFO,
683                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
684                "Gmail push: renewing watch subscription"
685            );
686            if let Err(e) = self.register_watch().await {
687                ::zeroclaw_log::record!(
688                    ERROR,
689                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
690                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
691                        .with_attrs(::serde_json::json!({"e": e.to_string()})),
692                    "Gmail watch renewal failed"
693                );
694            }
695        }
696    }
697
698    async fn health_check(&self) -> bool {
699        let token = self.config.oauth_token.clone();
700        if token.is_empty() {
701            return false;
702        }
703
704        match self
705            .http
706            .get("https://gmail.googleapis.com/gmail/v1/users/me/profile")
707            .bearer_auth(&token)
708            .timeout(Duration::from_secs(10))
709            .send()
710            .await
711        {
712            Ok(resp) => resp.status().is_success(),
713            Err(_) => false,
714        }
715    }
716}
717
718// ── Helper functions ─────────────────────────────────────────────
719
720/// Parse and decode the Gmail notification from a Pub/Sub message.
721pub fn parse_notification(msg: &PubSubMessage) -> Result<GmailNotification> {
722    let decoded = BASE64.decode(&msg.data).map_err(|e| {
723        ::zeroclaw_log::record!(
724            WARN,
725            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
726                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
727                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
728            "Invalid base64 in Pub/Sub message"
729        );
730        anyhow::Error::msg(format!("Invalid base64 in Pub/Sub message: {e}"))
731    })?;
732    let notification: GmailNotification = serde_json::from_slice(&decoded).map_err(|e| {
733        ::zeroclaw_log::record!(
734            WARN,
735            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
736                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
737                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
738            "Invalid JSON in Gmail notification"
739        );
740        anyhow::Error::msg(format!("Invalid JSON in Gmail notification: {e}"))
741    })?;
742    Ok(notification)
743}
744
745/// Extract a header value from a Gmail message by name.
746pub fn extract_header(msg: &GmailMessage, name: &str) -> Option<String> {
747    msg.payload.as_ref().and_then(|p| {
748        p.headers
749            .iter()
750            .find(|h| h.name.eq_ignore_ascii_case(name))
751            .map(|h| h.value.clone())
752    })
753}
754
755/// Extract the plain email address from a `From` header value like `"Name <email@example.com>"`.
756pub fn extract_email_from_header(from: &str) -> String {
757    if let Some(start) = from.find('<') {
758        // Use rfind to find the matching '>' after '<', preventing panic
759        // when malformed headers have '>' before '<'.
760        if let Some(end) = from.rfind('>')
761            && end > start + 1
762        {
763            return from[start + 1..end].to_string();
764        }
765    }
766    from.trim().to_string()
767}
768
769/// Sanitize a string for use in an RFC 2822 header value.
770/// Removes CR and LF characters to prevent header injection attacks.
771pub fn sanitize_header_value(value: &str) -> String {
772    value.chars().filter(|c| *c != '\r' && *c != '\n').collect()
773}
774
775/// Extract the plain-text body from a Gmail message.
776///
777/// Walks MIME parts looking for `text/plain`; falls back to `text/html`
778/// with basic tag stripping; finally falls back to the `snippet`.
779pub fn extract_body_text(msg: &GmailMessage) -> String {
780    if let Some(ref payload) = msg.payload {
781        // Single-part message
782        if payload.mime_type == "text/plain"
783            && let Some(text) = decode_body(payload.body.as_ref())
784        {
785            return text;
786        }
787
788        // Multipart — walk parts
789        if let Some(text) = find_text_in_parts(&payload.parts, "text/plain") {
790            return text;
791        }
792        if let Some(html) = find_text_in_parts(&payload.parts, "text/html") {
793            return strip_html(&html);
794        }
795    }
796
797    // Fallback to snippet
798    msg.snippet.clone()
799}
800
801/// Recursively search MIME parts for a given content type.
802fn find_text_in_parts(parts: &[MessagePart], mime_type: &str) -> Option<String> {
803    for part in parts {
804        if part.mime_type == mime_type
805            && let Some(text) = decode_body(part.body.as_ref())
806        {
807            return Some(text);
808        }
809        // Recurse into nested parts
810        if let Some(text) = find_text_in_parts(&part.parts, mime_type) {
811            return Some(text);
812        }
813    }
814    None
815}
816
817/// Decode a base64url-encoded Gmail message body.
818fn decode_body(body: Option<&MessageBody>) -> Option<String> {
819    body.and_then(|b| {
820        b.data.as_ref().and_then(|data| {
821            // Gmail API uses URL-safe base64 without padding
822            let standard = data.replace('-', "+").replace('_', "/");
823            // Restore padding stripped by Gmail API
824            let padded = match standard.len() % 4 {
825                2 => format!("{standard}=="),
826                3 => format!("{standard}="),
827                _ => standard,
828            };
829            BASE64
830                .decode(&padded)
831                .ok()
832                .and_then(|bytes| String::from_utf8(bytes).ok())
833        })
834    })
835}
836
837/// Basic HTML tag stripper (reuses the pattern from email_channel).
838fn strip_html(html: &str) -> String {
839    let mut result = String::new();
840    let mut in_tag = false;
841    for ch in html.chars() {
842        match ch {
843            '<' => in_tag = true,
844            '>' => in_tag = false,
845            _ if !in_tag => result.push(ch),
846            _ => {}
847        }
848    }
849    let mut normalized = String::with_capacity(result.len());
850    for word in result.split_whitespace() {
851        if !normalized.is_empty() {
852            normalized.push(' ');
853        }
854        normalized.push_str(word);
855    }
856    normalized
857}
858
859// ── Tests ────────────────────────────────────────────────────────
860
861#[cfg(test)]
862mod tests {
863    use super::*;
864
865    // ── Notification parsing ─────────────────────────────────────
866
867    #[test]
868    fn parse_notification_valid() {
869        let payload = serde_json::json!({
870            "emailAddress": "user@example.com",
871            "historyId": 12345
872        });
873        let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
874
875        let msg = PubSubMessage {
876            data: encoded,
877            message_id: "msg-1".into(),
878            publish_time: "2026-03-21T08:00:00Z".into(),
879        };
880
881        let notification = parse_notification(&msg).unwrap();
882        assert_eq!(notification.email_address, "user@example.com");
883        assert_eq!(notification.history_id, 12345);
884    }
885
886    #[test]
887    fn parse_notification_invalid_base64() {
888        let msg = PubSubMessage {
889            data: "!!!not-base64!!!".into(),
890            message_id: "msg-2".into(),
891            publish_time: String::new(),
892        };
893        assert!(parse_notification(&msg).is_err());
894    }
895
896    #[test]
897    fn parse_notification_invalid_json() {
898        let encoded = BASE64.encode(b"not json at all");
899        let msg = PubSubMessage {
900            data: encoded,
901            message_id: "msg-3".into(),
902            publish_time: String::new(),
903        };
904        assert!(parse_notification(&msg).is_err());
905    }
906
907    // ── Envelope deserialization ─────────────────────────────────
908
909    #[test]
910    fn pubsub_envelope_deserialize() {
911        let payload = serde_json::json!({
912            "emailAddress": "test@gmail.com",
913            "historyId": 999
914        });
915        let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
916
917        let json = serde_json::json!({
918            "message": {
919                "data": encoded,
920                "messageId": "pubsub-1",
921                "publishTime": "2026-03-21T10:00:00Z"
922            },
923            "subscription": "projects/my-project/subscriptions/gmail-push"
924        });
925
926        let envelope: PubSubEnvelope = serde_json::from_value(json).unwrap();
927        assert_eq!(envelope.message.message_id, "pubsub-1");
928        assert_eq!(
929            envelope.subscription,
930            "projects/my-project/subscriptions/gmail-push"
931        );
932
933        let notification = parse_notification(&envelope.message).unwrap();
934        assert_eq!(notification.email_address, "test@gmail.com");
935        assert_eq!(notification.history_id, 999);
936    }
937
938    // ── Email extraction from From header ────────────────────────
939
940    #[test]
941    fn extract_email_from_header_angle_brackets() {
942        assert_eq!(
943            extract_email_from_header("John Doe <john@example.com>"),
944            "john@example.com"
945        );
946    }
947
948    #[test]
949    fn extract_email_from_header_bare_email() {
950        assert_eq!(
951            extract_email_from_header("user@example.com"),
952            "user@example.com"
953        );
954    }
955
956    #[test]
957    fn extract_email_from_header_empty() {
958        assert_eq!(extract_email_from_header(""), "");
959    }
960
961    #[test]
962    fn extract_email_with_quotes() {
963        assert_eq!(
964            extract_email_from_header("\"Doe, John\" <john@example.com>"),
965            "john@example.com"
966        );
967    }
968
969    #[test]
970    fn extract_email_malformed_angle_brackets() {
971        // '>' before '<' with no proper closing — falls back to full trimmed string
972        assert_eq!(
973            extract_email_from_header("attacker> <victim@example.com"),
974            "attacker> <victim@example.com"
975        );
976        // Properly closed after the second '<'
977        assert_eq!(
978            extract_email_from_header("attacker> <victim@example.com>"),
979            "victim@example.com"
980        );
981        // No closing '>' at all
982        assert_eq!(extract_email_from_header("Name <broken"), "Name <broken");
983    }
984
985    #[test]
986    fn sanitize_header_strips_crlf() {
987        assert_eq!(
988            sanitize_header_value("normal@example.com"),
989            "normal@example.com"
990        );
991        assert_eq!(
992            sanitize_header_value("evil@example.com\r\nBcc: spy@evil.com"),
993            "evil@example.comBcc: spy@evil.com"
994        );
995        assert_eq!(
996            sanitize_header_value("inject\nSubject: fake"),
997            "injectSubject: fake"
998        );
999    }
1000
1001    // ── Header extraction ────────────────────────────────────────
1002
1003    #[test]
1004    fn extract_header_found() {
1005        let msg = GmailMessage {
1006            id: "msg-1".into(),
1007            thread_id: "thread-1".into(),
1008            snippet: String::new(),
1009            payload: Some(MessagePayload {
1010                headers: vec![
1011                    MessageHeader {
1012                        name: "From".into(),
1013                        value: "sender@example.com".into(),
1014                    },
1015                    MessageHeader {
1016                        name: "Subject".into(),
1017                        value: "Test Subject".into(),
1018                    },
1019                ],
1020                body: None,
1021                parts: Vec::new(),
1022                mime_type: String::new(),
1023            }),
1024            internal_date: "0".into(),
1025        };
1026
1027        assert_eq!(
1028            extract_header(&msg, "Subject"),
1029            Some("Test Subject".to_string())
1030        );
1031        assert_eq!(
1032            extract_header(&msg, "from"), // case-insensitive
1033            Some("sender@example.com".to_string())
1034        );
1035        assert_eq!(extract_header(&msg, "X-Missing"), None);
1036    }
1037
1038    #[test]
1039    fn extract_header_no_payload() {
1040        let msg = GmailMessage {
1041            id: "msg-2".into(),
1042            thread_id: String::new(),
1043            snippet: String::new(),
1044            payload: None,
1045            internal_date: "0".into(),
1046        };
1047        assert_eq!(extract_header(&msg, "Subject"), None);
1048    }
1049
1050    // ── Body text extraction ─────────────────────────────────────
1051
1052    #[test]
1053    fn extract_body_text_plain() {
1054        let plain_b64 = BASE64
1055            .encode(b"Hello, world!")
1056            .replace('+', "-")
1057            .replace('/', "_")
1058            .replace('=', "");
1059
1060        let msg = GmailMessage {
1061            id: "msg-3".into(),
1062            thread_id: String::new(),
1063            snippet: "snippet".into(),
1064            payload: Some(MessagePayload {
1065                headers: Vec::new(),
1066                body: Some(MessageBody {
1067                    data: Some(plain_b64),
1068                    size: 13,
1069                }),
1070                parts: Vec::new(),
1071                mime_type: "text/plain".into(),
1072            }),
1073            internal_date: "0".into(),
1074        };
1075
1076        assert_eq!(extract_body_text(&msg), "Hello, world!");
1077    }
1078
1079    #[test]
1080    fn extract_body_text_multipart() {
1081        let html_b64 = BASE64
1082            .encode(b"<p>Hello</p>")
1083            .replace('+', "-")
1084            .replace('/', "_")
1085            .replace('=', "");
1086
1087        let msg = GmailMessage {
1088            id: "msg-4".into(),
1089            thread_id: String::new(),
1090            snippet: "snippet".into(),
1091            payload: Some(MessagePayload {
1092                headers: Vec::new(),
1093                body: None,
1094                parts: vec![MessagePart {
1095                    mime_type: "text/html".into(),
1096                    body: Some(MessageBody {
1097                        data: Some(html_b64),
1098                        size: 12,
1099                    }),
1100                    parts: Vec::new(),
1101                    filename: String::new(),
1102                }],
1103                mime_type: "multipart/alternative".into(),
1104            }),
1105            internal_date: "0".into(),
1106        };
1107
1108        assert_eq!(extract_body_text(&msg), "Hello");
1109    }
1110
1111    #[test]
1112    fn extract_body_text_fallback_to_snippet() {
1113        let msg = GmailMessage {
1114            id: "msg-5".into(),
1115            thread_id: String::new(),
1116            snippet: "My snippet text".into(),
1117            payload: Some(MessagePayload {
1118                headers: Vec::new(),
1119                body: None,
1120                parts: Vec::new(),
1121                mime_type: "multipart/mixed".into(),
1122            }),
1123            internal_date: "0".into(),
1124        };
1125
1126        assert_eq!(extract_body_text(&msg), "My snippet text");
1127    }
1128
1129    // ── Sender allowlist ─────────────────────────────────────────
1130
1131    fn empty_resolver() -> Arc<dyn Fn() -> Vec<String> + Send + Sync> {
1132        Arc::new(Vec::new)
1133    }
1134
1135    fn resolver_from(peers: Vec<String>) -> Arc<dyn Fn() -> Vec<String> + Send + Sync> {
1136        Arc::new(move || peers.clone())
1137    }
1138
1139    #[test]
1140    fn sender_allowed_empty_denies() {
1141        let ch = GmailPushChannel::new(
1142            GmailPushConfig::default(),
1143            "gmail_push_test_alias",
1144            empty_resolver(),
1145        );
1146        assert!(!ch.is_sender_allowed("anyone@example.com"));
1147    }
1148
1149    #[test]
1150    fn sender_allowed_wildcard() {
1151        let ch = GmailPushChannel::new(
1152            GmailPushConfig::default(),
1153            "gmail_push_test_alias",
1154            resolver_from(vec!["*".into()]),
1155        );
1156        assert!(ch.is_sender_allowed("anyone@example.com"));
1157    }
1158
1159    #[test]
1160    fn sender_allowed_specific_email() {
1161        let ch = GmailPushChannel::new(
1162            GmailPushConfig::default(),
1163            "gmail_push_test_alias",
1164            resolver_from(vec!["user@example.com".into()]),
1165        );
1166        assert!(ch.is_sender_allowed("user@example.com"));
1167        assert!(!ch.is_sender_allowed("other@example.com"));
1168    }
1169
1170    #[test]
1171    fn sender_allowed_domain_with_at() {
1172        let ch = GmailPushChannel::new(
1173            GmailPushConfig::default(),
1174            "gmail_push_test_alias",
1175            resolver_from(vec!["@example.com".into()]),
1176        );
1177        assert!(ch.is_sender_allowed("user@example.com"));
1178        assert!(ch.is_sender_allowed("admin@example.com"));
1179        assert!(!ch.is_sender_allowed("user@other.com"));
1180    }
1181
1182    #[test]
1183    fn sender_allowed_domain_without_at() {
1184        let ch = GmailPushChannel::new(
1185            GmailPushConfig::default(),
1186            "gmail_push_test_alias",
1187            resolver_from(vec!["example.com".into()]),
1188        );
1189        assert!(ch.is_sender_allowed("user@example.com"));
1190        assert!(!ch.is_sender_allowed("user@other.com"));
1191    }
1192
1193    // ── Strip HTML ───────────────────────────────────────────────
1194
1195    #[test]
1196    fn strip_html_basic() {
1197        assert_eq!(strip_html("<p>Hello</p>"), "Hello");
1198    }
1199
1200    #[test]
1201    fn strip_html_nested() {
1202        assert_eq!(
1203            strip_html("<div><p>Hello <b>World</b></p></div>"),
1204            "Hello World"
1205        );
1206    }
1207
1208    // ── Config defaults ──────────────────────────────────────────
1209
1210    #[test]
1211    fn config_default_values() {
1212        let config = GmailPushConfig::default();
1213        assert!(config.topic.is_empty());
1214        assert_eq!(config.label_filter, vec!["INBOX"]);
1215        assert!(config.oauth_token.is_empty());
1216        assert!(config.webhook_url.is_empty());
1217    }
1218
1219    #[test]
1220    fn config_deserialize_with_defaults() {
1221        let json = r#"{"topic": "projects/my-proj/topics/gmail"}"#;
1222        let config: GmailPushConfig = serde_json::from_str(json).unwrap();
1223        assert_eq!(config.topic, "projects/my-proj/topics/gmail");
1224        assert_eq!(config.label_filter, vec!["INBOX"]);
1225    }
1226
1227    #[test]
1228    fn config_serialize_roundtrip() {
1229        let config = GmailPushConfig {
1230            enabled: true,
1231            topic: "projects/test/topics/gmail".into(),
1232            label_filter: vec!["INBOX".into(), "IMPORTANT".into()],
1233            oauth_token: "test-token".into(),
1234            webhook_url: "https://example.com/webhook/gmail".into(),
1235            webhook_secret: "my-secret".into(),
1236            excluded_tools: vec![],
1237        };
1238        let json = serde_json::to_string(&config).unwrap();
1239        let deserialized: GmailPushConfig = serde_json::from_str(&json).unwrap();
1240        assert_eq!(deserialized.topic, config.topic);
1241        assert_eq!(deserialized.label_filter, config.label_filter);
1242        assert_eq!(deserialized.webhook_url, config.webhook_url);
1243    }
1244
1245    // ── Channel name ─────────────────────────────────────────────
1246
1247    #[test]
1248    fn channel_name() {
1249        let ch = GmailPushChannel::new(
1250            GmailPushConfig::default(),
1251            "gmail_push_test_alias",
1252            empty_resolver(),
1253        );
1254        assert_eq!(ch.name(), "gmail_push");
1255    }
1256
1257    // ── Decode body ──────────────────────────────────────────────
1258
1259    #[test]
1260    fn decode_body_none() {
1261        assert!(decode_body(None).is_none());
1262    }
1263
1264    #[test]
1265    fn decode_body_empty_data() {
1266        let body = MessageBody {
1267            data: None,
1268            size: 0,
1269        };
1270        assert!(decode_body(Some(&body)).is_none());
1271    }
1272
1273    #[test]
1274    fn decode_body_valid() {
1275        let b64 = BASE64
1276            .encode(b"test content")
1277            .replace('+', "-")
1278            .replace('/', "_")
1279            .replace('=', "");
1280        let body = MessageBody {
1281            data: Some(b64),
1282            size: 12,
1283        };
1284        assert_eq!(decode_body(Some(&body)), Some("test content".to_string()));
1285    }
1286}