Skip to main content

zeroclaw_channels/
gmail_push.rs

1//! Gmail Pub/Sub push notification channel.
2//!
3//! Instead of polling via IMAP, this channel uses Google's Gmail Pub/Sub push
4//! notifications.  Google sends a POST to our webhook endpoint whenever the
5//! user's mailbox changes.  The notification body contains a base64-encoded
6//! JSON payload with `emailAddress` and `historyId`; we then call the Gmail
7//! History API to fetch newly arrived messages.
8//!
9//! ## Setup
10//!
11//! 1. Create a Google Cloud Pub/Sub topic and grant `gmail-api-push@system.gserviceaccount.com`
12//!    the **Pub/Sub Publisher** role on that topic.
13//! 2. Create a push subscription pointing to `https://<your-domain>/webhook/gmail`.
14//! 3. Configure `[channels_config.gmail_push]` in `config.toml` with `topic` and
15//!    `oauth_token`.
16//!
17//! The channel automatically calls `users.watch` to register the subscription
18//! and renews it before the 7-day expiry.
19
20use anyhow::Result;
21use async_trait::async_trait;
22use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
23use reqwest::Client;
24use serde::{Deserialize, Serialize};
25use std::fmt::Write as _;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime, UNIX_EPOCH};
28use tokio::sync::{Mutex, mpsc};
29
30use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
31
32pub use zeroclaw_config::scattered_types::GmailPushConfig;
33
34// ── Pub/Sub notification payload ─────────────────────────────────
35
36/// The outer JSON envelope that Google Pub/Sub POSTs to the push endpoint.
37#[derive(Debug, Deserialize, Serialize)]
38pub struct PubSubEnvelope {
39    pub message: PubSubMessage,
40    /// Subscription name (informational).
41    #[serde(default)]
42    pub subscription: String,
43}
44
45/// A single Pub/Sub message inside the envelope.
46#[derive(Debug, Deserialize, Serialize)]
47pub struct PubSubMessage {
48    /// Base64-encoded JSON data from Gmail.
49    pub data: String,
50    /// Pub/Sub message ID.
51    #[serde(default, rename = "messageId")]
52    pub message_id: String,
53    /// Publish timestamp (RFC 3339).
54    #[serde(default, rename = "publishTime")]
55    pub publish_time: String,
56}
57
58/// The decoded payload inside `PubSubMessage.data`.
59#[derive(Debug, Deserialize, Serialize)]
60pub struct GmailNotification {
61    /// Email address of the affected mailbox.
62    #[serde(rename = "emailAddress")]
63    pub email_address: String,
64    /// History ID to use as `startHistoryId` for incremental sync.
65    #[serde(rename = "historyId")]
66    pub history_id: u64,
67}
68
69// ── Gmail API response types ─────────────────────────────────────
70
71/// Response from `GET /gmail/v1/users/me/history`.
72#[derive(Debug, Deserialize)]
73pub struct HistoryResponse {
74    pub history: Option<Vec<HistoryRecord>>,
75    #[serde(default, rename = "historyId")]
76    pub history_id: u64,
77    #[serde(default, rename = "nextPageToken")]
78    pub next_page_token: Option<String>,
79}
80
81/// A single history record containing messages added to the mailbox.
82#[derive(Debug, Deserialize)]
83pub struct HistoryRecord {
84    #[serde(default, rename = "messagesAdded")]
85    pub messages_added: Vec<MessageAdded>,
86}
87
88/// Wrapper for a newly added message reference.
89#[derive(Debug, Deserialize)]
90pub struct MessageAdded {
91    pub message: MessageRef,
92}
93
94/// Minimal message reference returned by the history API.
95#[derive(Debug, Deserialize)]
96pub struct MessageRef {
97    pub id: String,
98    #[serde(default, rename = "threadId")]
99    pub thread_id: String,
100}
101
102/// Full message returned by `GET /gmail/v1/users/me/messages/{id}`.
103#[derive(Debug, Deserialize)]
104pub struct GmailMessage {
105    pub id: String,
106    #[serde(default, rename = "threadId")]
107    pub thread_id: String,
108    #[serde(default)]
109    pub snippet: String,
110    pub payload: Option<MessagePayload>,
111    #[serde(default, rename = "internalDate")]
112    pub internal_date: String,
113}
114
115/// Message payload with headers and parts.
116#[derive(Debug, Deserialize)]
117pub struct MessagePayload {
118    #[serde(default)]
119    pub headers: Vec<MessageHeader>,
120    pub body: Option<MessageBody>,
121    #[serde(default)]
122    pub parts: Vec<MessagePart>,
123    #[serde(default, rename = "mimeType")]
124    pub mime_type: String,
125}
126
127/// A single email header (name/value pair).
128#[derive(Debug, Deserialize)]
129pub struct MessageHeader {
130    pub name: String,
131    pub value: String,
132}
133
134/// Message body with optional base64-encoded data.
135#[derive(Debug, Deserialize)]
136pub struct MessageBody {
137    #[serde(default)]
138    pub data: Option<String>,
139    #[serde(default)]
140    pub size: u64,
141}
142
143/// A MIME part of a multipart message.
144#[derive(Debug, Deserialize)]
145pub struct MessagePart {
146    #[serde(default, rename = "mimeType")]
147    pub mime_type: String,
148    pub body: Option<MessageBody>,
149    #[serde(default)]
150    pub parts: Vec<MessagePart>,
151    #[serde(default)]
152    pub filename: String,
153}
154
155/// Response from `POST /gmail/v1/users/me/watch`.
156#[derive(Debug, Deserialize)]
157pub struct WatchResponse {
158    #[serde(default, rename = "historyId")]
159    pub history_id: u64,
160    #[serde(default)]
161    pub expiration: String,
162}
163
164// ── Channel implementation ───────────────────────────────────────
165
166/// Gmail Pub/Sub push notification channel.
167///
168/// Incoming messages arrive via webhook (`POST /webhook/gmail`) and are
169/// dispatched to the agent.  The `listen` method registers the Gmail watch
170/// subscription and periodically renews it.
171///
172/// Inbound sender authorization lives in `peer_groups` in V3; this channel
173/// resolves the authorized senders at message-time via [`Self::peer_resolver`]
174/// rather than reading a per-channel `allowed_senders` field (it no longer
175/// exists on `GmailPushConfig`).
176pub struct GmailPushChannel {
177    pub config: GmailPushConfig,
178    /// The alias key under `[channels.gmail.<alias>]` this handle is
179    /// bound to. Used to scope peer-group writes and resolver lookups.
180    pub alias: String,
181    /// Resolves inbound external peers from canonical state at message-time.
182    /// No cache (see AGENTS.md "ABSOLUTE RULE — SINGLE SOURCE OF TRUTH").
183    pub peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
184    http: Client,
185    last_history_id: Arc<Mutex<u64>>,
186    /// Sender half injected by the gateway to forward webhook-received messages.
187    pub tx: Arc<Mutex<Option<mpsc::Sender<ChannelMessage>>>>,
188}
189
190impl GmailPushChannel {
191    pub fn new(
192        config: GmailPushConfig,
193        alias: impl Into<String>,
194        peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
195    ) -> Self {
196        let http = Client::builder()
197            .timeout(Duration::from_secs(30))
198            .build()
199            .expect("failed to build HTTP client");
200        Self {
201            config,
202            alias: alias.into(),
203            peer_resolver,
204            http,
205            last_history_id: Arc::new(Mutex::new(0)),
206            tx: Arc::new(Mutex::new(None)),
207        }
208    }
209
210    /// Register a Gmail watch subscription via `POST /gmail/v1/users/me/watch`.
211    pub async fn register_watch(&self) -> Result<WatchResponse> {
212        let token = self.config.oauth_token.clone();
213        if token.is_empty() {
214            ::zeroclaw_log::record!(
215                ERROR,
216                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
217                    .with_outcome(::zeroclaw_log::EventOutcome::Failure),
218                "Gmail OAuth token is not configured"
219            );
220            anyhow::bail!("Gmail OAuth token is not configured");
221        }
222
223        let body = serde_json::json!({
224            "topicName": self.config.topic,
225            "labelIds": self.config.label_filter,
226        });
227
228        let resp = self
229            .http
230            .post("https://gmail.googleapis.com/gmail/v1/users/me/watch")
231            .bearer_auth(&token)
232            .json(&body)
233            .send()
234            .await?;
235
236        if !resp.status().is_success() {
237            let status = resp.status();
238            let text = resp.text().await.unwrap_or_default();
239            ::zeroclaw_log::record!(
240                ERROR,
241                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
242                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
243                    .with_attrs(::serde_json::json!({
244                        "phase": "watch_registration",
245                        "status": status.as_u16(),
246                        "body": text,
247                    })),
248                "gmail_push: watch registration failed"
249            );
250            return Err(anyhow::Error::msg(format!(
251                "Gmail watch registration failed ({}): {}",
252                status, text
253            )));
254        }
255
256        let watch: WatchResponse = resp.json().await?;
257        let mut last_id = self.last_history_id.lock().await;
258        if *last_id == 0 {
259            *last_id = watch.history_id;
260        }
261        ::zeroclaw_log::record!(
262            INFO,
263            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
264            &format!(
265                "Gmail watch registered — historyId={}, expiration={}",
266                watch.history_id, watch.expiration
267            )
268        );
269        Ok(watch)
270    }
271
272    /// Fetch new messages since the given `start_history_id` using the History API.
273    pub async fn fetch_history(&self, start_history_id: u64) -> Result<Vec<String>> {
274        let mut last_id = self.last_history_id.lock().await;
275        self.fetch_history_inner(start_history_id, &mut last_id)
276            .await
277    }
278
279    /// Inner history fetch that takes an already-locked history ID reference.
280    /// This allows callers that already hold the lock to avoid deadlock.
281    async fn fetch_history_inner(
282        &self,
283        start_history_id: u64,
284        last_id: &mut u64,
285    ) -> Result<Vec<String>> {
286        let token = self.config.oauth_token.clone();
287        if token.is_empty() {
288            ::zeroclaw_log::record!(
289                ERROR,
290                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
291                    .with_outcome(::zeroclaw_log::EventOutcome::Failure),
292                "Gmail OAuth token is not configured"
293            );
294            anyhow::bail!("Gmail OAuth token is not configured");
295        }
296
297        let mut message_ids = Vec::new();
298        let mut page_token: Option<String> = None;
299
300        loop {
301            let mut url = format!(
302                "https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId={}&historyTypes=messageAdded",
303                start_history_id
304            );
305            if let Some(ref pt) = page_token {
306                let _ = write!(url, "&pageToken={pt}");
307            }
308
309            let resp = self.http.get(&url).bearer_auth(&token).send().await?;
310
311            if !resp.status().is_success() {
312                let status = resp.status();
313                let text = resp.text().await.unwrap_or_default();
314                ::zeroclaw_log::record!(
315                    ERROR,
316                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
317                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
318                        .with_attrs(::serde_json::json!({
319                            "phase": "history_fetch",
320                            "status": status.as_u16(),
321                            "body": text,
322                        })),
323                    "gmail_push: history fetch failed"
324                );
325                return Err(anyhow::Error::msg(format!(
326                    "Gmail history fetch failed ({}): {}",
327                    status, text
328                )));
329            }
330
331            let history_resp: HistoryResponse = resp.json().await?;
332
333            if let Some(records) = history_resp.history {
334                for record in records {
335                    for added in record.messages_added {
336                        message_ids.push(added.message.id);
337                    }
338                }
339            }
340
341            // Update tracked history ID
342            if history_resp.history_id > 0 && history_resp.history_id > *last_id {
343                *last_id = history_resp.history_id;
344            }
345
346            match history_resp.next_page_token {
347                Some(token) => page_token = Some(token),
348                None => break,
349            }
350        }
351
352        Ok(message_ids)
353    }
354
355    /// Fetch a full message by ID from the Gmail API.
356    pub async fn fetch_message(&self, message_id: &str) -> Result<GmailMessage> {
357        let token = self.config.oauth_token.clone();
358        let url = format!(
359            "https://gmail.googleapis.com/gmail/v1/users/me/messages/{}?format=full",
360            message_id
361        );
362
363        let resp = self.http.get(&url).bearer_auth(&token).send().await?;
364
365        if !resp.status().is_success() {
366            let status = resp.status();
367            let text = resp.text().await.unwrap_or_default();
368            ::zeroclaw_log::record!(
369                ERROR,
370                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
371                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
372                    .with_attrs(::serde_json::json!({
373                        "phase": "message_fetch",
374                        "status": status.as_u16(),
375                        "body": text,
376                    })),
377                "gmail_push: message fetch failed"
378            );
379            return Err(anyhow::Error::msg(format!(
380                "Gmail message fetch failed ({}): {}",
381                status, text
382            )));
383        }
384
385        Ok(resp.json().await?)
386    }
387
388    /// Check if a sender email is in the allowlist.
389    ///
390    /// Email allowlist entries support three syntaxes — preserved from
391    /// the legacy `GmailPushConfig::allowed_senders` semantics:
392    /// - `*`                wildcard, allow anyone.
393    /// - `user@host`        full address, case-insensitive.
394    /// - `@host` / `host`   domain match, case-insensitive.
395    pub fn is_sender_allowed(&self, email: &str) -> bool {
396        let peers = (self.peer_resolver)();
397        Self::is_email_sender_allowed(&peers, email)
398    }
399
400    /// Pure, testable predicate that applies the email-allowlist match
401    /// semantics against an already-resolved peer list. Shares the in-tree
402    /// `crate::allowlist::is_user_allowed_by` matcher with `email_channel`;
403    /// domain-class matching is the per-entry comparison.
404    fn is_email_sender_allowed(peers: &[String], email: &str) -> bool {
405        crate::allowlist::is_user_allowed_by(peers, email, |allowed, email| {
406            let email_lower = email.to_lowercase();
407            if allowed.starts_with('@') {
408                email_lower.ends_with(&allowed.to_lowercase())
409            } else if allowed.contains('@') {
410                allowed.eq_ignore_ascii_case(email)
411            } else {
412                email_lower.ends_with(&format!("@{}", allowed.to_lowercase()))
413            }
414        })
415    }
416
417    /// Process a Pub/Sub push notification and dispatch new messages to the agent.
418    pub async fn handle_notification(&self, envelope: &PubSubEnvelope) -> Result<()> {
419        let notification = parse_notification(&envelope.message)?;
420        ::zeroclaw_log::record!(
421            DEBUG,
422            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
423            &format!(
424                "Gmail push notification: email={}, historyId={}",
425                notification.email_address, notification.history_id
426            )
427        );
428
429        // Hold the lock across read-fetch-update to prevent duplicate
430        // processing when concurrent webhook notifications arrive.
431        let mut last_id = self.last_history_id.lock().await;
432
433        if *last_id == 0 {
434            // First notification — just record the history ID.
435            *last_id = notification.history_id;
436            ::zeroclaw_log::record!(
437                INFO,
438                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
439                &format!(
440                    "Gmail push: first notification, seeding historyId={}",
441                    notification.history_id
442                )
443            );
444            return Ok(());
445        }
446
447        let start_id = *last_id;
448        let message_ids = self.fetch_history_inner(start_id, &mut last_id).await?;
449        // Explicitly drop the lock before doing network-heavy message fetching.
450        drop(last_id);
451
452        if message_ids.is_empty() {
453            ::zeroclaw_log::record!(
454                DEBUG,
455                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
456                "Gmail push: no new messages in history"
457            );
458            return Ok(());
459        }
460
461        ::zeroclaw_log::record!(
462            INFO,
463            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
464            &format!(
465                "Gmail push: {} new message(s) to process",
466                message_ids.len()
467            )
468        );
469
470        // Clone the sender and drop the mutex immediately to avoid holding it
471        // across network calls.
472        let tx = {
473            let tx_guard = self.tx.lock().await;
474            match tx_guard.clone() {
475                Some(tx) => tx,
476                None => {
477                    ::zeroclaw_log::record!(
478                        WARN,
479                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
480                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
481                        "Gmail push: no listener registered, dropping messages"
482                    );
483                    return Ok(());
484                }
485            }
486        };
487
488        for msg_id in message_ids {
489            match self.fetch_message(&msg_id).await {
490                Ok(gmail_msg) => {
491                    let sender = extract_header(&gmail_msg, "From").unwrap_or_default();
492                    let sender_email = extract_email_from_header(&sender);
493
494                    if !self.is_sender_allowed(&sender_email) {
495                        ::zeroclaw_log::record!(
496                            WARN,
497                            ::zeroclaw_log::Event::new(
498                                module_path!(),
499                                ::zeroclaw_log::Action::Note
500                            )
501                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
502                            &format!("Gmail push: blocked message from {}", sender_email)
503                        );
504                        continue;
505                    }
506
507                    let subject = extract_header(&gmail_msg, "Subject").unwrap_or_default();
508                    let body_text = extract_body_text(&gmail_msg);
509
510                    let content = format!("Subject: {subject}\n\n{body_text}");
511                    let timestamp = gmail_msg
512                        .internal_date
513                        .parse::<u64>()
514                        .map(|ms| ms / 1000)
515                        .unwrap_or_else(|_| {
516                            SystemTime::now()
517                                .duration_since(UNIX_EPOCH)
518                                .map(|d| d.as_secs())
519                                .unwrap_or(0)
520                        });
521
522                    let channel_msg = ChannelMessage {
523                        id: format!("gmail_{}", gmail_msg.id),
524                        reply_target: sender_email.clone(),
525                        sender: sender_email,
526                        content,
527                        channel: "gmail_push".to_string(),
528                        channel_alias: Some(self.alias.clone()),
529                        timestamp,
530                        thread_ts: Some(gmail_msg.thread_id),
531                        interruption_scope_id: None,
532                        attachments: Vec::new(),
533                        subject: None,
534                    };
535
536                    if tx.send(channel_msg).await.is_err() {
537                        ::zeroclaw_log::record!(
538                            DEBUG,
539                            ::zeroclaw_log::Event::new(
540                                module_path!(),
541                                ::zeroclaw_log::Action::Note
542                            ),
543                            "Gmail push: listener channel closed"
544                        );
545                        return Ok(());
546                    }
547                }
548                Err(e) => {
549                    ::zeroclaw_log::record!(
550                        ERROR,
551                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
552                            .with_outcome(::zeroclaw_log::EventOutcome::Failure),
553                        &format!("Gmail push: failed to fetch message {}: {}", msg_id, e)
554                    );
555                }
556            }
557        }
558
559        Ok(())
560    }
561}
562
563impl ::zeroclaw_api::attribution::Attributable for GmailPushChannel {
564    fn role(&self) -> ::zeroclaw_api::attribution::Role {
565        ::zeroclaw_api::attribution::Role::Channel(
566            ::zeroclaw_api::attribution::ChannelKind::GmailPush,
567        )
568    }
569    fn alias(&self) -> &str {
570        &self.alias
571    }
572}
573
574#[async_trait]
575impl Channel for GmailPushChannel {
576    fn name(&self) -> &str {
577        "gmail_push"
578    }
579
580    async fn send(&self, message: &SendMessage) -> Result<()> {
581        // Send via Gmail API (drafts.send or messages.send)
582        let token = self.config.oauth_token.clone();
583        if token.is_empty() {
584            ::zeroclaw_log::record!(
585                ERROR,
586                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
587                    .with_outcome(::zeroclaw_log::EventOutcome::Failure),
588                "Gmail OAuth token is not configured for sending"
589            );
590            anyhow::bail!("Gmail OAuth token is not configured for sending");
591        }
592
593        let subject = message.subject.as_deref().unwrap_or("ZeroClaw Message");
594        // Sanitize headers to prevent CRLF injection attacks.
595        let safe_recipient = sanitize_header_value(&message.recipient);
596        let safe_subject = sanitize_header_value(subject);
597        let rfc2822 = format!(
598            "To: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n{}",
599            safe_recipient, safe_subject, message.content
600        );
601        let encoded = BASE64.encode(rfc2822.as_bytes());
602        // Gmail API uses URL-safe base64 with no padding
603        let url_safe = encoded.replace('+', "-").replace('/', "_").replace('=', "");
604
605        let body = serde_json::json!({
606            "raw": url_safe,
607        });
608
609        let resp = self
610            .http
611            .post("https://gmail.googleapis.com/gmail/v1/users/me/messages/send")
612            .bearer_auth(&token)
613            .json(&body)
614            .send()
615            .await?;
616
617        if !resp.status().is_success() {
618            let status = resp.status();
619            let text = resp.text().await.unwrap_or_default();
620            ::zeroclaw_log::record!(
621                ERROR,
622                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
623                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
624                    .with_attrs(::serde_json::json!({
625                        "phase": "send",
626                        "status": status.as_u16(),
627                        "body": text,
628                    })),
629                "gmail_push: send failed"
630            );
631            return Err(anyhow::Error::msg(format!(
632                "Gmail send failed ({}): {}",
633                status, text
634            )));
635        }
636
637        ::zeroclaw_log::record!(
638            INFO,
639            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
640            &format!("Gmail message sent to {}", message.recipient)
641        );
642        Ok(())
643    }
644
645    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> Result<()> {
646        // Store the sender for webhook-driven message dispatch
647        {
648            let mut tx_guard = self.tx.lock().await;
649            *tx_guard = Some(tx);
650        }
651
652        ::zeroclaw_log::record!(
653            INFO,
654            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
655            "Gmail push channel started — registering watch subscription"
656        );
657
658        // Register initial watch
659        if !self.config.webhook_url.is_empty()
660            && let Err(e) = self.register_watch().await
661        {
662            ::zeroclaw_log::record!(
663                ERROR,
664                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
665                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
666                    .with_attrs(::serde_json::json!({"e": e.to_string()})),
667                "Gmail watch registration failed"
668            );
669            // Non-fatal — external subscription management may be in use
670        }
671
672        // Renewal loop: Gmail watch subscriptions expire after 7 days.
673        // Re-register every 6 days to maintain continuous coverage.
674        let renewal_interval = Duration::from_secs(6 * 24 * 60 * 60); // 6 days
675        loop {
676            tokio::time::sleep(renewal_interval).await;
677            ::zeroclaw_log::record!(
678                INFO,
679                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
680                "Gmail push: renewing watch subscription"
681            );
682            if let Err(e) = self.register_watch().await {
683                ::zeroclaw_log::record!(
684                    ERROR,
685                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
686                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
687                        .with_attrs(::serde_json::json!({"e": e.to_string()})),
688                    "Gmail watch renewal failed"
689                );
690            }
691        }
692    }
693
694    async fn health_check(&self) -> bool {
695        let token = self.config.oauth_token.clone();
696        if token.is_empty() {
697            return false;
698        }
699
700        match self
701            .http
702            .get("https://gmail.googleapis.com/gmail/v1/users/me/profile")
703            .bearer_auth(&token)
704            .timeout(Duration::from_secs(10))
705            .send()
706            .await
707        {
708            Ok(resp) => resp.status().is_success(),
709            Err(_) => false,
710        }
711    }
712}
713
714// ── Helper functions ─────────────────────────────────────────────
715
716/// Parse and decode the Gmail notification from a Pub/Sub message.
717pub fn parse_notification(msg: &PubSubMessage) -> Result<GmailNotification> {
718    let decoded = BASE64.decode(&msg.data).map_err(|e| {
719        ::zeroclaw_log::record!(
720            WARN,
721            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
722                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
723                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
724            "Invalid base64 in Pub/Sub message"
725        );
726        anyhow::Error::msg(format!("Invalid base64 in Pub/Sub message: {e}"))
727    })?;
728    let notification: GmailNotification = serde_json::from_slice(&decoded).map_err(|e| {
729        ::zeroclaw_log::record!(
730            WARN,
731            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
732                .with_outcome(::zeroclaw_log::EventOutcome::Failure)
733                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
734            "Invalid JSON in Gmail notification"
735        );
736        anyhow::Error::msg(format!("Invalid JSON in Gmail notification: {e}"))
737    })?;
738    Ok(notification)
739}
740
741/// Extract a header value from a Gmail message by name.
742pub fn extract_header(msg: &GmailMessage, name: &str) -> Option<String> {
743    msg.payload.as_ref().and_then(|p| {
744        p.headers
745            .iter()
746            .find(|h| h.name.eq_ignore_ascii_case(name))
747            .map(|h| h.value.clone())
748    })
749}
750
751/// Extract the plain email address from a `From` header value like `"Name <email@example.com>"`.
752pub fn extract_email_from_header(from: &str) -> String {
753    if let Some(start) = from.find('<') {
754        // Use rfind to find the matching '>' after '<', preventing panic
755        // when malformed headers have '>' before '<'.
756        if let Some(end) = from.rfind('>')
757            && end > start + 1
758        {
759            return from[start + 1..end].to_string();
760        }
761    }
762    from.trim().to_string()
763}
764
765/// Sanitize a string for use in an RFC 2822 header value.
766/// Removes CR and LF characters to prevent header injection attacks.
767pub fn sanitize_header_value(value: &str) -> String {
768    value.chars().filter(|c| *c != '\r' && *c != '\n').collect()
769}
770
771/// Extract the plain-text body from a Gmail message.
772///
773/// Walks MIME parts looking for `text/plain`; falls back to `text/html`
774/// with basic tag stripping; finally falls back to the `snippet`.
775pub fn extract_body_text(msg: &GmailMessage) -> String {
776    if let Some(ref payload) = msg.payload {
777        // Single-part message
778        if payload.mime_type == "text/plain"
779            && let Some(text) = decode_body(payload.body.as_ref())
780        {
781            return text;
782        }
783
784        // Multipart — walk parts
785        if let Some(text) = find_text_in_parts(&payload.parts, "text/plain") {
786            return text;
787        }
788        if let Some(html) = find_text_in_parts(&payload.parts, "text/html") {
789            return strip_html(&html);
790        }
791    }
792
793    // Fallback to snippet
794    msg.snippet.clone()
795}
796
797/// Recursively search MIME parts for a given content type.
798fn find_text_in_parts(parts: &[MessagePart], mime_type: &str) -> Option<String> {
799    for part in parts {
800        if part.mime_type == mime_type
801            && let Some(text) = decode_body(part.body.as_ref())
802        {
803            return Some(text);
804        }
805        // Recurse into nested parts
806        if let Some(text) = find_text_in_parts(&part.parts, mime_type) {
807            return Some(text);
808        }
809    }
810    None
811}
812
813/// Decode a base64url-encoded Gmail message body.
814fn decode_body(body: Option<&MessageBody>) -> Option<String> {
815    body.and_then(|b| {
816        b.data.as_ref().and_then(|data| {
817            // Gmail API uses URL-safe base64 without padding
818            let standard = data.replace('-', "+").replace('_', "/");
819            // Restore padding stripped by Gmail API
820            let padded = match standard.len() % 4 {
821                2 => format!("{standard}=="),
822                3 => format!("{standard}="),
823                _ => standard,
824            };
825            BASE64
826                .decode(&padded)
827                .ok()
828                .and_then(|bytes| String::from_utf8(bytes).ok())
829        })
830    })
831}
832
833/// Basic HTML tag stripper (reuses the pattern from email_channel).
834fn strip_html(html: &str) -> String {
835    let mut result = String::new();
836    let mut in_tag = false;
837    for ch in html.chars() {
838        match ch {
839            '<' => in_tag = true,
840            '>' => in_tag = false,
841            _ if !in_tag => result.push(ch),
842            _ => {}
843        }
844    }
845    let mut normalized = String::with_capacity(result.len());
846    for word in result.split_whitespace() {
847        if !normalized.is_empty() {
848            normalized.push(' ');
849        }
850        normalized.push_str(word);
851    }
852    normalized
853}
854
855// ── Tests ────────────────────────────────────────────────────────
856
857#[cfg(test)]
858mod tests {
859    use super::*;
860
861    // ── Notification parsing ─────────────────────────────────────
862
863    #[test]
864    fn parse_notification_valid() {
865        let payload = serde_json::json!({
866            "emailAddress": "user@example.com",
867            "historyId": 12345
868        });
869        let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
870
871        let msg = PubSubMessage {
872            data: encoded,
873            message_id: "msg-1".into(),
874            publish_time: "2026-03-21T08:00:00Z".into(),
875        };
876
877        let notification = parse_notification(&msg).unwrap();
878        assert_eq!(notification.email_address, "user@example.com");
879        assert_eq!(notification.history_id, 12345);
880    }
881
882    #[test]
883    fn parse_notification_invalid_base64() {
884        let msg = PubSubMessage {
885            data: "!!!not-base64!!!".into(),
886            message_id: "msg-2".into(),
887            publish_time: String::new(),
888        };
889        assert!(parse_notification(&msg).is_err());
890    }
891
892    #[test]
893    fn parse_notification_invalid_json() {
894        let encoded = BASE64.encode(b"not json at all");
895        let msg = PubSubMessage {
896            data: encoded,
897            message_id: "msg-3".into(),
898            publish_time: String::new(),
899        };
900        assert!(parse_notification(&msg).is_err());
901    }
902
903    // ── Envelope deserialization ─────────────────────────────────
904
905    #[test]
906    fn pubsub_envelope_deserialize() {
907        let payload = serde_json::json!({
908            "emailAddress": "test@gmail.com",
909            "historyId": 999
910        });
911        let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
912
913        let json = serde_json::json!({
914            "message": {
915                "data": encoded,
916                "messageId": "pubsub-1",
917                "publishTime": "2026-03-21T10:00:00Z"
918            },
919            "subscription": "projects/my-project/subscriptions/gmail-push"
920        });
921
922        let envelope: PubSubEnvelope = serde_json::from_value(json).unwrap();
923        assert_eq!(envelope.message.message_id, "pubsub-1");
924        assert_eq!(
925            envelope.subscription,
926            "projects/my-project/subscriptions/gmail-push"
927        );
928
929        let notification = parse_notification(&envelope.message).unwrap();
930        assert_eq!(notification.email_address, "test@gmail.com");
931        assert_eq!(notification.history_id, 999);
932    }
933
934    // ── Email extraction from From header ────────────────────────
935
936    #[test]
937    fn extract_email_from_header_angle_brackets() {
938        assert_eq!(
939            extract_email_from_header("John Doe <john@example.com>"),
940            "john@example.com"
941        );
942    }
943
944    #[test]
945    fn extract_email_from_header_bare_email() {
946        assert_eq!(
947            extract_email_from_header("user@example.com"),
948            "user@example.com"
949        );
950    }
951
952    #[test]
953    fn extract_email_from_header_empty() {
954        assert_eq!(extract_email_from_header(""), "");
955    }
956
957    #[test]
958    fn extract_email_with_quotes() {
959        assert_eq!(
960            extract_email_from_header("\"Doe, John\" <john@example.com>"),
961            "john@example.com"
962        );
963    }
964
965    #[test]
966    fn extract_email_malformed_angle_brackets() {
967        // '>' before '<' with no proper closing — falls back to full trimmed string
968        assert_eq!(
969            extract_email_from_header("attacker> <victim@example.com"),
970            "attacker> <victim@example.com"
971        );
972        // Properly closed after the second '<'
973        assert_eq!(
974            extract_email_from_header("attacker> <victim@example.com>"),
975            "victim@example.com"
976        );
977        // No closing '>' at all
978        assert_eq!(extract_email_from_header("Name <broken"), "Name <broken");
979    }
980
981    #[test]
982    fn sanitize_header_strips_crlf() {
983        assert_eq!(
984            sanitize_header_value("normal@example.com"),
985            "normal@example.com"
986        );
987        assert_eq!(
988            sanitize_header_value("evil@example.com\r\nBcc: spy@evil.com"),
989            "evil@example.comBcc: spy@evil.com"
990        );
991        assert_eq!(
992            sanitize_header_value("inject\nSubject: fake"),
993            "injectSubject: fake"
994        );
995    }
996
997    // ── Header extraction ────────────────────────────────────────
998
999    #[test]
1000    fn extract_header_found() {
1001        let msg = GmailMessage {
1002            id: "msg-1".into(),
1003            thread_id: "thread-1".into(),
1004            snippet: String::new(),
1005            payload: Some(MessagePayload {
1006                headers: vec![
1007                    MessageHeader {
1008                        name: "From".into(),
1009                        value: "sender@example.com".into(),
1010                    },
1011                    MessageHeader {
1012                        name: "Subject".into(),
1013                        value: "Test Subject".into(),
1014                    },
1015                ],
1016                body: None,
1017                parts: Vec::new(),
1018                mime_type: String::new(),
1019            }),
1020            internal_date: "0".into(),
1021        };
1022
1023        assert_eq!(
1024            extract_header(&msg, "Subject"),
1025            Some("Test Subject".to_string())
1026        );
1027        assert_eq!(
1028            extract_header(&msg, "from"), // case-insensitive
1029            Some("sender@example.com".to_string())
1030        );
1031        assert_eq!(extract_header(&msg, "X-Missing"), None);
1032    }
1033
1034    #[test]
1035    fn extract_header_no_payload() {
1036        let msg = GmailMessage {
1037            id: "msg-2".into(),
1038            thread_id: String::new(),
1039            snippet: String::new(),
1040            payload: None,
1041            internal_date: "0".into(),
1042        };
1043        assert_eq!(extract_header(&msg, "Subject"), None);
1044    }
1045
1046    // ── Body text extraction ─────────────────────────────────────
1047
1048    #[test]
1049    fn extract_body_text_plain() {
1050        let plain_b64 = BASE64
1051            .encode(b"Hello, world!")
1052            .replace('+', "-")
1053            .replace('/', "_")
1054            .replace('=', "");
1055
1056        let msg = GmailMessage {
1057            id: "msg-3".into(),
1058            thread_id: String::new(),
1059            snippet: "snippet".into(),
1060            payload: Some(MessagePayload {
1061                headers: Vec::new(),
1062                body: Some(MessageBody {
1063                    data: Some(plain_b64),
1064                    size: 13,
1065                }),
1066                parts: Vec::new(),
1067                mime_type: "text/plain".into(),
1068            }),
1069            internal_date: "0".into(),
1070        };
1071
1072        assert_eq!(extract_body_text(&msg), "Hello, world!");
1073    }
1074
1075    #[test]
1076    fn extract_body_text_multipart() {
1077        let html_b64 = BASE64
1078            .encode(b"<p>Hello</p>")
1079            .replace('+', "-")
1080            .replace('/', "_")
1081            .replace('=', "");
1082
1083        let msg = GmailMessage {
1084            id: "msg-4".into(),
1085            thread_id: String::new(),
1086            snippet: "snippet".into(),
1087            payload: Some(MessagePayload {
1088                headers: Vec::new(),
1089                body: None,
1090                parts: vec![MessagePart {
1091                    mime_type: "text/html".into(),
1092                    body: Some(MessageBody {
1093                        data: Some(html_b64),
1094                        size: 12,
1095                    }),
1096                    parts: Vec::new(),
1097                    filename: String::new(),
1098                }],
1099                mime_type: "multipart/alternative".into(),
1100            }),
1101            internal_date: "0".into(),
1102        };
1103
1104        assert_eq!(extract_body_text(&msg), "Hello");
1105    }
1106
1107    #[test]
1108    fn extract_body_text_fallback_to_snippet() {
1109        let msg = GmailMessage {
1110            id: "msg-5".into(),
1111            thread_id: String::new(),
1112            snippet: "My snippet text".into(),
1113            payload: Some(MessagePayload {
1114                headers: Vec::new(),
1115                body: None,
1116                parts: Vec::new(),
1117                mime_type: "multipart/mixed".into(),
1118            }),
1119            internal_date: "0".into(),
1120        };
1121
1122        assert_eq!(extract_body_text(&msg), "My snippet text");
1123    }
1124
1125    // ── Sender allowlist ─────────────────────────────────────────
1126
1127    fn empty_resolver() -> Arc<dyn Fn() -> Vec<String> + Send + Sync> {
1128        Arc::new(Vec::new)
1129    }
1130
1131    fn resolver_from(peers: Vec<String>) -> Arc<dyn Fn() -> Vec<String> + Send + Sync> {
1132        Arc::new(move || peers.clone())
1133    }
1134
1135    #[test]
1136    fn sender_allowed_empty_denies() {
1137        let ch = GmailPushChannel::new(
1138            GmailPushConfig::default(),
1139            "gmail_push_test_alias",
1140            empty_resolver(),
1141        );
1142        assert!(!ch.is_sender_allowed("anyone@example.com"));
1143    }
1144
1145    #[test]
1146    fn sender_allowed_wildcard() {
1147        let ch = GmailPushChannel::new(
1148            GmailPushConfig::default(),
1149            "gmail_push_test_alias",
1150            resolver_from(vec!["*".into()]),
1151        );
1152        assert!(ch.is_sender_allowed("anyone@example.com"));
1153    }
1154
1155    #[test]
1156    fn sender_allowed_specific_email() {
1157        let ch = GmailPushChannel::new(
1158            GmailPushConfig::default(),
1159            "gmail_push_test_alias",
1160            resolver_from(vec!["user@example.com".into()]),
1161        );
1162        assert!(ch.is_sender_allowed("user@example.com"));
1163        assert!(!ch.is_sender_allowed("other@example.com"));
1164    }
1165
1166    #[test]
1167    fn sender_allowed_domain_with_at() {
1168        let ch = GmailPushChannel::new(
1169            GmailPushConfig::default(),
1170            "gmail_push_test_alias",
1171            resolver_from(vec!["@example.com".into()]),
1172        );
1173        assert!(ch.is_sender_allowed("user@example.com"));
1174        assert!(ch.is_sender_allowed("admin@example.com"));
1175        assert!(!ch.is_sender_allowed("user@other.com"));
1176    }
1177
1178    #[test]
1179    fn sender_allowed_domain_without_at() {
1180        let ch = GmailPushChannel::new(
1181            GmailPushConfig::default(),
1182            "gmail_push_test_alias",
1183            resolver_from(vec!["example.com".into()]),
1184        );
1185        assert!(ch.is_sender_allowed("user@example.com"));
1186        assert!(!ch.is_sender_allowed("user@other.com"));
1187    }
1188
1189    // ── Strip HTML ───────────────────────────────────────────────
1190
1191    #[test]
1192    fn strip_html_basic() {
1193        assert_eq!(strip_html("<p>Hello</p>"), "Hello");
1194    }
1195
1196    #[test]
1197    fn strip_html_nested() {
1198        assert_eq!(
1199            strip_html("<div><p>Hello <b>World</b></p></div>"),
1200            "Hello World"
1201        );
1202    }
1203
1204    // ── Config defaults ──────────────────────────────────────────
1205
1206    #[test]
1207    fn config_default_values() {
1208        let config = GmailPushConfig::default();
1209        assert!(config.topic.is_empty());
1210        assert_eq!(config.label_filter, vec!["INBOX"]);
1211        assert!(config.oauth_token.is_empty());
1212        assert!(config.webhook_url.is_empty());
1213    }
1214
1215    #[test]
1216    fn config_deserialize_with_defaults() {
1217        let json = r#"{"topic": "projects/my-proj/topics/gmail"}"#;
1218        let config: GmailPushConfig = serde_json::from_str(json).unwrap();
1219        assert_eq!(config.topic, "projects/my-proj/topics/gmail");
1220        assert_eq!(config.label_filter, vec!["INBOX"]);
1221    }
1222
1223    #[test]
1224    fn config_serialize_roundtrip() {
1225        let config = GmailPushConfig {
1226            enabled: true,
1227            topic: "projects/test/topics/gmail".into(),
1228            label_filter: vec!["INBOX".into(), "IMPORTANT".into()],
1229            oauth_token: "test-token".into(),
1230            webhook_url: "https://example.com/webhook/gmail".into(),
1231            webhook_secret: "my-secret".into(),
1232            excluded_tools: vec![],
1233        };
1234        let json = serde_json::to_string(&config).unwrap();
1235        let deserialized: GmailPushConfig = serde_json::from_str(&json).unwrap();
1236        assert_eq!(deserialized.topic, config.topic);
1237        assert_eq!(deserialized.label_filter, config.label_filter);
1238        assert_eq!(deserialized.webhook_url, config.webhook_url);
1239    }
1240
1241    // ── Channel name ─────────────────────────────────────────────
1242
1243    #[test]
1244    fn channel_name() {
1245        let ch = GmailPushChannel::new(
1246            GmailPushConfig::default(),
1247            "gmail_push_test_alias",
1248            empty_resolver(),
1249        );
1250        assert_eq!(ch.name(), "gmail_push");
1251    }
1252
1253    // ── Decode body ──────────────────────────────────────────────
1254
1255    #[test]
1256    fn decode_body_none() {
1257        assert!(decode_body(None).is_none());
1258    }
1259
1260    #[test]
1261    fn decode_body_empty_data() {
1262        let body = MessageBody {
1263            data: None,
1264            size: 0,
1265        };
1266        assert!(decode_body(Some(&body)).is_none());
1267    }
1268
1269    #[test]
1270    fn decode_body_valid() {
1271        let b64 = BASE64
1272            .encode(b"test content")
1273            .replace('+', "-")
1274            .replace('/', "_")
1275            .replace('=', "");
1276        let body = MessageBody {
1277            data: Some(b64),
1278            size: 12,
1279        };
1280        assert_eq!(decode_body(Some(&body)), Some("test content".to_string()));
1281    }
1282}