1use anyhow::Result;
21use async_trait::async_trait;
22use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
23use reqwest::Client;
24use serde::{Deserialize, Serialize};
25use std::fmt::Write as _;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime, UNIX_EPOCH};
28use tokio::sync::{Mutex, mpsc};
29
30use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
31
32pub use zeroclaw_config::scattered_types::GmailPushConfig;
33
34#[derive(Debug, Deserialize, Serialize)]
38pub struct PubSubEnvelope {
39 pub message: PubSubMessage,
40 #[serde(default)]
42 pub subscription: String,
43}
44
45#[derive(Debug, Deserialize, Serialize)]
47pub struct PubSubMessage {
48 pub data: String,
50 #[serde(default, rename = "messageId")]
52 pub message_id: String,
53 #[serde(default, rename = "publishTime")]
55 pub publish_time: String,
56}
57
58#[derive(Debug, Deserialize, Serialize)]
60pub struct GmailNotification {
61 #[serde(rename = "emailAddress")]
63 pub email_address: String,
64 #[serde(rename = "historyId")]
66 pub history_id: u64,
67}
68
69#[derive(Debug, Deserialize)]
73pub struct HistoryResponse {
74 pub history: Option<Vec<HistoryRecord>>,
75 #[serde(default, rename = "historyId")]
76 pub history_id: u64,
77 #[serde(default, rename = "nextPageToken")]
78 pub next_page_token: Option<String>,
79}
80
81#[derive(Debug, Deserialize)]
83pub struct HistoryRecord {
84 #[serde(default, rename = "messagesAdded")]
85 pub messages_added: Vec<MessageAdded>,
86}
87
88#[derive(Debug, Deserialize)]
90pub struct MessageAdded {
91 pub message: MessageRef,
92}
93
94#[derive(Debug, Deserialize)]
96pub struct MessageRef {
97 pub id: String,
98 #[serde(default, rename = "threadId")]
99 pub thread_id: String,
100}
101
102#[derive(Debug, Deserialize)]
104pub struct GmailMessage {
105 pub id: String,
106 #[serde(default, rename = "threadId")]
107 pub thread_id: String,
108 #[serde(default)]
109 pub snippet: String,
110 pub payload: Option<MessagePayload>,
111 #[serde(default, rename = "internalDate")]
112 pub internal_date: String,
113}
114
115#[derive(Debug, Deserialize)]
117pub struct MessagePayload {
118 #[serde(default)]
119 pub headers: Vec<MessageHeader>,
120 pub body: Option<MessageBody>,
121 #[serde(default)]
122 pub parts: Vec<MessagePart>,
123 #[serde(default, rename = "mimeType")]
124 pub mime_type: String,
125}
126
127#[derive(Debug, Deserialize)]
129pub struct MessageHeader {
130 pub name: String,
131 pub value: String,
132}
133
134#[derive(Debug, Deserialize)]
136pub struct MessageBody {
137 #[serde(default)]
138 pub data: Option<String>,
139 #[serde(default)]
140 pub size: u64,
141}
142
143#[derive(Debug, Deserialize)]
145pub struct MessagePart {
146 #[serde(default, rename = "mimeType")]
147 pub mime_type: String,
148 pub body: Option<MessageBody>,
149 #[serde(default)]
150 pub parts: Vec<MessagePart>,
151 #[serde(default)]
152 pub filename: String,
153}
154
155#[derive(Debug, Deserialize)]
157pub struct WatchResponse {
158 #[serde(default, rename = "historyId")]
159 pub history_id: u64,
160 #[serde(default)]
161 pub expiration: String,
162}
163
164pub struct GmailPushChannel {
177 pub config: GmailPushConfig,
178 pub alias: String,
181 pub peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
184 http: Client,
185 last_history_id: Arc<Mutex<u64>>,
186 pub tx: Arc<Mutex<Option<mpsc::Sender<ChannelMessage>>>>,
188}
189
190impl GmailPushChannel {
191 pub fn new(
192 config: GmailPushConfig,
193 alias: impl Into<String>,
194 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
195 ) -> Self {
196 let http = Client::builder()
197 .timeout(Duration::from_secs(30))
198 .build()
199 .expect("failed to build HTTP client");
200 Self {
201 config,
202 alias: alias.into(),
203 peer_resolver,
204 http,
205 last_history_id: Arc::new(Mutex::new(0)),
206 tx: Arc::new(Mutex::new(None)),
207 }
208 }
209
210 pub async fn register_watch(&self) -> Result<WatchResponse> {
212 let token = self.config.oauth_token.clone();
213 if token.is_empty() {
214 ::zeroclaw_log::record!(
215 ERROR,
216 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
217 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
218 "Gmail OAuth token is not configured"
219 );
220 anyhow::bail!("Gmail OAuth token is not configured");
221 }
222
223 let body = serde_json::json!({
224 "topicName": self.config.topic,
225 "labelIds": self.config.label_filter,
226 });
227
228 let resp = self
229 .http
230 .post("https://gmail.googleapis.com/gmail/v1/users/me/watch")
231 .bearer_auth(&token)
232 .json(&body)
233 .send()
234 .await?;
235
236 if !resp.status().is_success() {
237 let status = resp.status();
238 let text = resp.text().await.unwrap_or_default();
239 ::zeroclaw_log::record!(
240 ERROR,
241 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
242 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
243 .with_attrs(::serde_json::json!({
244 "phase": "watch_registration",
245 "status": status.as_u16(),
246 "body": text,
247 })),
248 "gmail_push: watch registration failed"
249 );
250 return Err(anyhow::Error::msg(format!(
251 "Gmail watch registration failed ({}): {}",
252 status, text
253 )));
254 }
255
256 let watch: WatchResponse = resp.json().await?;
257 let mut last_id = self.last_history_id.lock().await;
258 if *last_id == 0 {
259 *last_id = watch.history_id;
260 }
261 ::zeroclaw_log::record!(
262 INFO,
263 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
264 &format!(
265 "Gmail watch registered — historyId={}, expiration={}",
266 watch.history_id, watch.expiration
267 )
268 );
269 Ok(watch)
270 }
271
272 pub async fn fetch_history(&self, start_history_id: u64) -> Result<Vec<String>> {
274 let mut last_id = self.last_history_id.lock().await;
275 self.fetch_history_inner(start_history_id, &mut last_id)
276 .await
277 }
278
279 async fn fetch_history_inner(
282 &self,
283 start_history_id: u64,
284 last_id: &mut u64,
285 ) -> Result<Vec<String>> {
286 let token = self.config.oauth_token.clone();
287 if token.is_empty() {
288 ::zeroclaw_log::record!(
289 ERROR,
290 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
291 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
292 "Gmail OAuth token is not configured"
293 );
294 anyhow::bail!("Gmail OAuth token is not configured");
295 }
296
297 let mut message_ids = Vec::new();
298 let mut page_token: Option<String> = None;
299
300 loop {
301 let mut url = format!(
302 "https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId={}&historyTypes=messageAdded",
303 start_history_id
304 );
305 if let Some(ref pt) = page_token {
306 let _ = write!(url, "&pageToken={pt}");
307 }
308
309 let resp = self.http.get(&url).bearer_auth(&token).send().await?;
310
311 if !resp.status().is_success() {
312 let status = resp.status();
313 let text = resp.text().await.unwrap_or_default();
314 ::zeroclaw_log::record!(
315 ERROR,
316 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
317 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
318 .with_attrs(::serde_json::json!({
319 "phase": "history_fetch",
320 "status": status.as_u16(),
321 "body": text,
322 })),
323 "gmail_push: history fetch failed"
324 );
325 return Err(anyhow::Error::msg(format!(
326 "Gmail history fetch failed ({}): {}",
327 status, text
328 )));
329 }
330
331 let history_resp: HistoryResponse = resp.json().await?;
332
333 if let Some(records) = history_resp.history {
334 for record in records {
335 for added in record.messages_added {
336 message_ids.push(added.message.id);
337 }
338 }
339 }
340
341 if history_resp.history_id > 0 && history_resp.history_id > *last_id {
343 *last_id = history_resp.history_id;
344 }
345
346 match history_resp.next_page_token {
347 Some(token) => page_token = Some(token),
348 None => break,
349 }
350 }
351
352 Ok(message_ids)
353 }
354
355 pub async fn fetch_message(&self, message_id: &str) -> Result<GmailMessage> {
357 let token = self.config.oauth_token.clone();
358 let url = format!(
359 "https://gmail.googleapis.com/gmail/v1/users/me/messages/{}?format=full",
360 message_id
361 );
362
363 let resp = self.http.get(&url).bearer_auth(&token).send().await?;
364
365 if !resp.status().is_success() {
366 let status = resp.status();
367 let text = resp.text().await.unwrap_or_default();
368 ::zeroclaw_log::record!(
369 ERROR,
370 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
371 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
372 .with_attrs(::serde_json::json!({
373 "phase": "message_fetch",
374 "status": status.as_u16(),
375 "body": text,
376 })),
377 "gmail_push: message fetch failed"
378 );
379 return Err(anyhow::Error::msg(format!(
380 "Gmail message fetch failed ({}): {}",
381 status, text
382 )));
383 }
384
385 Ok(resp.json().await?)
386 }
387
388 pub fn is_sender_allowed(&self, email: &str) -> bool {
396 let peers = (self.peer_resolver)();
397 Self::is_email_sender_allowed(&peers, email)
398 }
399
400 fn is_email_sender_allowed(peers: &[String], email: &str) -> bool {
405 crate::allowlist::is_user_allowed_by(peers, email, |allowed, email| {
406 let email_lower = email.to_lowercase();
407 if allowed.starts_with('@') {
408 email_lower.ends_with(&allowed.to_lowercase())
409 } else if allowed.contains('@') {
410 allowed.eq_ignore_ascii_case(email)
411 } else {
412 email_lower.ends_with(&format!("@{}", allowed.to_lowercase()))
413 }
414 })
415 }
416
417 pub async fn handle_notification(&self, envelope: &PubSubEnvelope) -> Result<()> {
419 let notification = parse_notification(&envelope.message)?;
420 ::zeroclaw_log::record!(
421 DEBUG,
422 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
423 &format!(
424 "Gmail push notification: email={}, historyId={}",
425 notification.email_address, notification.history_id
426 )
427 );
428
429 let mut last_id = self.last_history_id.lock().await;
432
433 if *last_id == 0 {
434 *last_id = notification.history_id;
436 ::zeroclaw_log::record!(
437 INFO,
438 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
439 &format!(
440 "Gmail push: first notification, seeding historyId={}",
441 notification.history_id
442 )
443 );
444 return Ok(());
445 }
446
447 let start_id = *last_id;
448 let message_ids = self.fetch_history_inner(start_id, &mut last_id).await?;
449 drop(last_id);
451
452 if message_ids.is_empty() {
453 ::zeroclaw_log::record!(
454 DEBUG,
455 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
456 "Gmail push: no new messages in history"
457 );
458 return Ok(());
459 }
460
461 ::zeroclaw_log::record!(
462 INFO,
463 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
464 &format!(
465 "Gmail push: {} new message(s) to process",
466 message_ids.len()
467 )
468 );
469
470 let tx = {
473 let tx_guard = self.tx.lock().await;
474 match tx_guard.clone() {
475 Some(tx) => tx,
476 None => {
477 ::zeroclaw_log::record!(
478 WARN,
479 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
480 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
481 "Gmail push: no listener registered, dropping messages"
482 );
483 return Ok(());
484 }
485 }
486 };
487
488 for msg_id in message_ids {
489 match self.fetch_message(&msg_id).await {
490 Ok(gmail_msg) => {
491 let sender = extract_header(&gmail_msg, "From").unwrap_or_default();
492 let sender_email = extract_email_from_header(&sender);
493
494 if !self.is_sender_allowed(&sender_email) {
495 ::zeroclaw_log::record!(
496 WARN,
497 ::zeroclaw_log::Event::new(
498 module_path!(),
499 ::zeroclaw_log::Action::Note
500 )
501 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
502 &format!("Gmail push: blocked message from {}", sender_email)
503 );
504 continue;
505 }
506
507 let subject = extract_header(&gmail_msg, "Subject").unwrap_or_default();
508 let body_text = extract_body_text(&gmail_msg);
509
510 let content = format!("Subject: {subject}\n\n{body_text}");
511 let timestamp = gmail_msg
512 .internal_date
513 .parse::<u64>()
514 .map(|ms| ms / 1000)
515 .unwrap_or_else(|_| {
516 SystemTime::now()
517 .duration_since(UNIX_EPOCH)
518 .map(|d| d.as_secs())
519 .unwrap_or(0)
520 });
521
522 let channel_msg = ChannelMessage {
523 id: format!("gmail_{}", gmail_msg.id),
524 reply_target: sender_email.clone(),
525 sender: sender_email,
526 content,
527 channel: "gmail_push".to_string(),
528 channel_alias: Some(self.alias.clone()),
529 timestamp,
530 thread_ts: Some(gmail_msg.thread_id),
531 interruption_scope_id: None,
532 attachments: Vec::new(),
533 subject: None,
534 };
535
536 if tx.send(channel_msg).await.is_err() {
537 ::zeroclaw_log::record!(
538 DEBUG,
539 ::zeroclaw_log::Event::new(
540 module_path!(),
541 ::zeroclaw_log::Action::Note
542 ),
543 "Gmail push: listener channel closed"
544 );
545 return Ok(());
546 }
547 }
548 Err(e) => {
549 ::zeroclaw_log::record!(
550 ERROR,
551 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
552 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
553 &format!("Gmail push: failed to fetch message {}: {}", msg_id, e)
554 );
555 }
556 }
557 }
558
559 Ok(())
560 }
561}
562
563impl ::zeroclaw_api::attribution::Attributable for GmailPushChannel {
564 fn role(&self) -> ::zeroclaw_api::attribution::Role {
565 ::zeroclaw_api::attribution::Role::Channel(
566 ::zeroclaw_api::attribution::ChannelKind::GmailPush,
567 )
568 }
569 fn alias(&self) -> &str {
570 &self.alias
571 }
572}
573
574#[async_trait]
575impl Channel for GmailPushChannel {
576 fn name(&self) -> &str {
577 "gmail_push"
578 }
579
580 async fn send(&self, message: &SendMessage) -> Result<()> {
581 let token = self.config.oauth_token.clone();
583 if token.is_empty() {
584 ::zeroclaw_log::record!(
585 ERROR,
586 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
587 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
588 "Gmail OAuth token is not configured for sending"
589 );
590 anyhow::bail!("Gmail OAuth token is not configured for sending");
591 }
592
593 let subject = message.subject.as_deref().unwrap_or("ZeroClaw Message");
594 let safe_recipient = sanitize_header_value(&message.recipient);
596 let safe_subject = sanitize_header_value(subject);
597 let rfc2822 = format!(
598 "To: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n{}",
599 safe_recipient, safe_subject, message.content
600 );
601 let encoded = BASE64.encode(rfc2822.as_bytes());
602 let url_safe = encoded.replace('+', "-").replace('/', "_").replace('=', "");
604
605 let body = serde_json::json!({
606 "raw": url_safe,
607 });
608
609 let resp = self
610 .http
611 .post("https://gmail.googleapis.com/gmail/v1/users/me/messages/send")
612 .bearer_auth(&token)
613 .json(&body)
614 .send()
615 .await?;
616
617 if !resp.status().is_success() {
618 let status = resp.status();
619 let text = resp.text().await.unwrap_or_default();
620 ::zeroclaw_log::record!(
621 ERROR,
622 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
623 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
624 .with_attrs(::serde_json::json!({
625 "phase": "send",
626 "status": status.as_u16(),
627 "body": text,
628 })),
629 "gmail_push: send failed"
630 );
631 return Err(anyhow::Error::msg(format!(
632 "Gmail send failed ({}): {}",
633 status, text
634 )));
635 }
636
637 ::zeroclaw_log::record!(
638 INFO,
639 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
640 &format!("Gmail message sent to {}", message.recipient)
641 );
642 Ok(())
643 }
644
645 async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> Result<()> {
646 {
648 let mut tx_guard = self.tx.lock().await;
649 *tx_guard = Some(tx);
650 }
651
652 ::zeroclaw_log::record!(
653 INFO,
654 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
655 "Gmail push channel started — registering watch subscription"
656 );
657
658 if !self.config.webhook_url.is_empty()
660 && let Err(e) = self.register_watch().await
661 {
662 ::zeroclaw_log::record!(
663 ERROR,
664 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
665 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
666 .with_attrs(::serde_json::json!({"e": e.to_string()})),
667 "Gmail watch registration failed"
668 );
669 }
671
672 let renewal_interval = Duration::from_secs(6 * 24 * 60 * 60); loop {
676 tokio::time::sleep(renewal_interval).await;
677 ::zeroclaw_log::record!(
678 INFO,
679 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
680 "Gmail push: renewing watch subscription"
681 );
682 if let Err(e) = self.register_watch().await {
683 ::zeroclaw_log::record!(
684 ERROR,
685 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
686 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
687 .with_attrs(::serde_json::json!({"e": e.to_string()})),
688 "Gmail watch renewal failed"
689 );
690 }
691 }
692 }
693
694 async fn health_check(&self) -> bool {
695 let token = self.config.oauth_token.clone();
696 if token.is_empty() {
697 return false;
698 }
699
700 match self
701 .http
702 .get("https://gmail.googleapis.com/gmail/v1/users/me/profile")
703 .bearer_auth(&token)
704 .timeout(Duration::from_secs(10))
705 .send()
706 .await
707 {
708 Ok(resp) => resp.status().is_success(),
709 Err(_) => false,
710 }
711 }
712}
713
714pub fn parse_notification(msg: &PubSubMessage) -> Result<GmailNotification> {
718 let decoded = BASE64.decode(&msg.data).map_err(|e| {
719 ::zeroclaw_log::record!(
720 WARN,
721 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
722 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
723 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
724 "Invalid base64 in Pub/Sub message"
725 );
726 anyhow::Error::msg(format!("Invalid base64 in Pub/Sub message: {e}"))
727 })?;
728 let notification: GmailNotification = serde_json::from_slice(&decoded).map_err(|e| {
729 ::zeroclaw_log::record!(
730 WARN,
731 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
732 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
733 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
734 "Invalid JSON in Gmail notification"
735 );
736 anyhow::Error::msg(format!("Invalid JSON in Gmail notification: {e}"))
737 })?;
738 Ok(notification)
739}
740
741pub fn extract_header(msg: &GmailMessage, name: &str) -> Option<String> {
743 msg.payload.as_ref().and_then(|p| {
744 p.headers
745 .iter()
746 .find(|h| h.name.eq_ignore_ascii_case(name))
747 .map(|h| h.value.clone())
748 })
749}
750
751pub fn extract_email_from_header(from: &str) -> String {
753 if let Some(start) = from.find('<') {
754 if let Some(end) = from.rfind('>')
757 && end > start + 1
758 {
759 return from[start + 1..end].to_string();
760 }
761 }
762 from.trim().to_string()
763}
764
765pub fn sanitize_header_value(value: &str) -> String {
768 value.chars().filter(|c| *c != '\r' && *c != '\n').collect()
769}
770
771pub fn extract_body_text(msg: &GmailMessage) -> String {
776 if let Some(ref payload) = msg.payload {
777 if payload.mime_type == "text/plain"
779 && let Some(text) = decode_body(payload.body.as_ref())
780 {
781 return text;
782 }
783
784 if let Some(text) = find_text_in_parts(&payload.parts, "text/plain") {
786 return text;
787 }
788 if let Some(html) = find_text_in_parts(&payload.parts, "text/html") {
789 return strip_html(&html);
790 }
791 }
792
793 msg.snippet.clone()
795}
796
797fn find_text_in_parts(parts: &[MessagePart], mime_type: &str) -> Option<String> {
799 for part in parts {
800 if part.mime_type == mime_type
801 && let Some(text) = decode_body(part.body.as_ref())
802 {
803 return Some(text);
804 }
805 if let Some(text) = find_text_in_parts(&part.parts, mime_type) {
807 return Some(text);
808 }
809 }
810 None
811}
812
813fn decode_body(body: Option<&MessageBody>) -> Option<String> {
815 body.and_then(|b| {
816 b.data.as_ref().and_then(|data| {
817 let standard = data.replace('-', "+").replace('_', "/");
819 let padded = match standard.len() % 4 {
821 2 => format!("{standard}=="),
822 3 => format!("{standard}="),
823 _ => standard,
824 };
825 BASE64
826 .decode(&padded)
827 .ok()
828 .and_then(|bytes| String::from_utf8(bytes).ok())
829 })
830 })
831}
832
833fn strip_html(html: &str) -> String {
835 let mut result = String::new();
836 let mut in_tag = false;
837 for ch in html.chars() {
838 match ch {
839 '<' => in_tag = true,
840 '>' => in_tag = false,
841 _ if !in_tag => result.push(ch),
842 _ => {}
843 }
844 }
845 let mut normalized = String::with_capacity(result.len());
846 for word in result.split_whitespace() {
847 if !normalized.is_empty() {
848 normalized.push(' ');
849 }
850 normalized.push_str(word);
851 }
852 normalized
853}
854
855#[cfg(test)]
858mod tests {
859 use super::*;
860
861 #[test]
864 fn parse_notification_valid() {
865 let payload = serde_json::json!({
866 "emailAddress": "user@example.com",
867 "historyId": 12345
868 });
869 let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
870
871 let msg = PubSubMessage {
872 data: encoded,
873 message_id: "msg-1".into(),
874 publish_time: "2026-03-21T08:00:00Z".into(),
875 };
876
877 let notification = parse_notification(&msg).unwrap();
878 assert_eq!(notification.email_address, "user@example.com");
879 assert_eq!(notification.history_id, 12345);
880 }
881
882 #[test]
883 fn parse_notification_invalid_base64() {
884 let msg = PubSubMessage {
885 data: "!!!not-base64!!!".into(),
886 message_id: "msg-2".into(),
887 publish_time: String::new(),
888 };
889 assert!(parse_notification(&msg).is_err());
890 }
891
892 #[test]
893 fn parse_notification_invalid_json() {
894 let encoded = BASE64.encode(b"not json at all");
895 let msg = PubSubMessage {
896 data: encoded,
897 message_id: "msg-3".into(),
898 publish_time: String::new(),
899 };
900 assert!(parse_notification(&msg).is_err());
901 }
902
903 #[test]
906 fn pubsub_envelope_deserialize() {
907 let payload = serde_json::json!({
908 "emailAddress": "test@gmail.com",
909 "historyId": 999
910 });
911 let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
912
913 let json = serde_json::json!({
914 "message": {
915 "data": encoded,
916 "messageId": "pubsub-1",
917 "publishTime": "2026-03-21T10:00:00Z"
918 },
919 "subscription": "projects/my-project/subscriptions/gmail-push"
920 });
921
922 let envelope: PubSubEnvelope = serde_json::from_value(json).unwrap();
923 assert_eq!(envelope.message.message_id, "pubsub-1");
924 assert_eq!(
925 envelope.subscription,
926 "projects/my-project/subscriptions/gmail-push"
927 );
928
929 let notification = parse_notification(&envelope.message).unwrap();
930 assert_eq!(notification.email_address, "test@gmail.com");
931 assert_eq!(notification.history_id, 999);
932 }
933
934 #[test]
937 fn extract_email_from_header_angle_brackets() {
938 assert_eq!(
939 extract_email_from_header("John Doe <john@example.com>"),
940 "john@example.com"
941 );
942 }
943
944 #[test]
945 fn extract_email_from_header_bare_email() {
946 assert_eq!(
947 extract_email_from_header("user@example.com"),
948 "user@example.com"
949 );
950 }
951
952 #[test]
953 fn extract_email_from_header_empty() {
954 assert_eq!(extract_email_from_header(""), "");
955 }
956
957 #[test]
958 fn extract_email_with_quotes() {
959 assert_eq!(
960 extract_email_from_header("\"Doe, John\" <john@example.com>"),
961 "john@example.com"
962 );
963 }
964
965 #[test]
966 fn extract_email_malformed_angle_brackets() {
967 assert_eq!(
969 extract_email_from_header("attacker> <victim@example.com"),
970 "attacker> <victim@example.com"
971 );
972 assert_eq!(
974 extract_email_from_header("attacker> <victim@example.com>"),
975 "victim@example.com"
976 );
977 assert_eq!(extract_email_from_header("Name <broken"), "Name <broken");
979 }
980
981 #[test]
982 fn sanitize_header_strips_crlf() {
983 assert_eq!(
984 sanitize_header_value("normal@example.com"),
985 "normal@example.com"
986 );
987 assert_eq!(
988 sanitize_header_value("evil@example.com\r\nBcc: spy@evil.com"),
989 "evil@example.comBcc: spy@evil.com"
990 );
991 assert_eq!(
992 sanitize_header_value("inject\nSubject: fake"),
993 "injectSubject: fake"
994 );
995 }
996
997 #[test]
1000 fn extract_header_found() {
1001 let msg = GmailMessage {
1002 id: "msg-1".into(),
1003 thread_id: "thread-1".into(),
1004 snippet: String::new(),
1005 payload: Some(MessagePayload {
1006 headers: vec![
1007 MessageHeader {
1008 name: "From".into(),
1009 value: "sender@example.com".into(),
1010 },
1011 MessageHeader {
1012 name: "Subject".into(),
1013 value: "Test Subject".into(),
1014 },
1015 ],
1016 body: None,
1017 parts: Vec::new(),
1018 mime_type: String::new(),
1019 }),
1020 internal_date: "0".into(),
1021 };
1022
1023 assert_eq!(
1024 extract_header(&msg, "Subject"),
1025 Some("Test Subject".to_string())
1026 );
1027 assert_eq!(
1028 extract_header(&msg, "from"), Some("sender@example.com".to_string())
1030 );
1031 assert_eq!(extract_header(&msg, "X-Missing"), None);
1032 }
1033
1034 #[test]
1035 fn extract_header_no_payload() {
1036 let msg = GmailMessage {
1037 id: "msg-2".into(),
1038 thread_id: String::new(),
1039 snippet: String::new(),
1040 payload: None,
1041 internal_date: "0".into(),
1042 };
1043 assert_eq!(extract_header(&msg, "Subject"), None);
1044 }
1045
1046 #[test]
1049 fn extract_body_text_plain() {
1050 let plain_b64 = BASE64
1051 .encode(b"Hello, world!")
1052 .replace('+', "-")
1053 .replace('/', "_")
1054 .replace('=', "");
1055
1056 let msg = GmailMessage {
1057 id: "msg-3".into(),
1058 thread_id: String::new(),
1059 snippet: "snippet".into(),
1060 payload: Some(MessagePayload {
1061 headers: Vec::new(),
1062 body: Some(MessageBody {
1063 data: Some(plain_b64),
1064 size: 13,
1065 }),
1066 parts: Vec::new(),
1067 mime_type: "text/plain".into(),
1068 }),
1069 internal_date: "0".into(),
1070 };
1071
1072 assert_eq!(extract_body_text(&msg), "Hello, world!");
1073 }
1074
1075 #[test]
1076 fn extract_body_text_multipart() {
1077 let html_b64 = BASE64
1078 .encode(b"<p>Hello</p>")
1079 .replace('+', "-")
1080 .replace('/', "_")
1081 .replace('=', "");
1082
1083 let msg = GmailMessage {
1084 id: "msg-4".into(),
1085 thread_id: String::new(),
1086 snippet: "snippet".into(),
1087 payload: Some(MessagePayload {
1088 headers: Vec::new(),
1089 body: None,
1090 parts: vec![MessagePart {
1091 mime_type: "text/html".into(),
1092 body: Some(MessageBody {
1093 data: Some(html_b64),
1094 size: 12,
1095 }),
1096 parts: Vec::new(),
1097 filename: String::new(),
1098 }],
1099 mime_type: "multipart/alternative".into(),
1100 }),
1101 internal_date: "0".into(),
1102 };
1103
1104 assert_eq!(extract_body_text(&msg), "Hello");
1105 }
1106
1107 #[test]
1108 fn extract_body_text_fallback_to_snippet() {
1109 let msg = GmailMessage {
1110 id: "msg-5".into(),
1111 thread_id: String::new(),
1112 snippet: "My snippet text".into(),
1113 payload: Some(MessagePayload {
1114 headers: Vec::new(),
1115 body: None,
1116 parts: Vec::new(),
1117 mime_type: "multipart/mixed".into(),
1118 }),
1119 internal_date: "0".into(),
1120 };
1121
1122 assert_eq!(extract_body_text(&msg), "My snippet text");
1123 }
1124
1125 fn empty_resolver() -> Arc<dyn Fn() -> Vec<String> + Send + Sync> {
1128 Arc::new(Vec::new)
1129 }
1130
1131 fn resolver_from(peers: Vec<String>) -> Arc<dyn Fn() -> Vec<String> + Send + Sync> {
1132 Arc::new(move || peers.clone())
1133 }
1134
1135 #[test]
1136 fn sender_allowed_empty_denies() {
1137 let ch = GmailPushChannel::new(
1138 GmailPushConfig::default(),
1139 "gmail_push_test_alias",
1140 empty_resolver(),
1141 );
1142 assert!(!ch.is_sender_allowed("anyone@example.com"));
1143 }
1144
1145 #[test]
1146 fn sender_allowed_wildcard() {
1147 let ch = GmailPushChannel::new(
1148 GmailPushConfig::default(),
1149 "gmail_push_test_alias",
1150 resolver_from(vec!["*".into()]),
1151 );
1152 assert!(ch.is_sender_allowed("anyone@example.com"));
1153 }
1154
1155 #[test]
1156 fn sender_allowed_specific_email() {
1157 let ch = GmailPushChannel::new(
1158 GmailPushConfig::default(),
1159 "gmail_push_test_alias",
1160 resolver_from(vec!["user@example.com".into()]),
1161 );
1162 assert!(ch.is_sender_allowed("user@example.com"));
1163 assert!(!ch.is_sender_allowed("other@example.com"));
1164 }
1165
1166 #[test]
1167 fn sender_allowed_domain_with_at() {
1168 let ch = GmailPushChannel::new(
1169 GmailPushConfig::default(),
1170 "gmail_push_test_alias",
1171 resolver_from(vec!["@example.com".into()]),
1172 );
1173 assert!(ch.is_sender_allowed("user@example.com"));
1174 assert!(ch.is_sender_allowed("admin@example.com"));
1175 assert!(!ch.is_sender_allowed("user@other.com"));
1176 }
1177
1178 #[test]
1179 fn sender_allowed_domain_without_at() {
1180 let ch = GmailPushChannel::new(
1181 GmailPushConfig::default(),
1182 "gmail_push_test_alias",
1183 resolver_from(vec!["example.com".into()]),
1184 );
1185 assert!(ch.is_sender_allowed("user@example.com"));
1186 assert!(!ch.is_sender_allowed("user@other.com"));
1187 }
1188
1189 #[test]
1192 fn strip_html_basic() {
1193 assert_eq!(strip_html("<p>Hello</p>"), "Hello");
1194 }
1195
1196 #[test]
1197 fn strip_html_nested() {
1198 assert_eq!(
1199 strip_html("<div><p>Hello <b>World</b></p></div>"),
1200 "Hello World"
1201 );
1202 }
1203
1204 #[test]
1207 fn config_default_values() {
1208 let config = GmailPushConfig::default();
1209 assert!(config.topic.is_empty());
1210 assert_eq!(config.label_filter, vec!["INBOX"]);
1211 assert!(config.oauth_token.is_empty());
1212 assert!(config.webhook_url.is_empty());
1213 }
1214
1215 #[test]
1216 fn config_deserialize_with_defaults() {
1217 let json = r#"{"topic": "projects/my-proj/topics/gmail"}"#;
1218 let config: GmailPushConfig = serde_json::from_str(json).unwrap();
1219 assert_eq!(config.topic, "projects/my-proj/topics/gmail");
1220 assert_eq!(config.label_filter, vec!["INBOX"]);
1221 }
1222
1223 #[test]
1224 fn config_serialize_roundtrip() {
1225 let config = GmailPushConfig {
1226 enabled: true,
1227 topic: "projects/test/topics/gmail".into(),
1228 label_filter: vec!["INBOX".into(), "IMPORTANT".into()],
1229 oauth_token: "test-token".into(),
1230 webhook_url: "https://example.com/webhook/gmail".into(),
1231 webhook_secret: "my-secret".into(),
1232 excluded_tools: vec![],
1233 };
1234 let json = serde_json::to_string(&config).unwrap();
1235 let deserialized: GmailPushConfig = serde_json::from_str(&json).unwrap();
1236 assert_eq!(deserialized.topic, config.topic);
1237 assert_eq!(deserialized.label_filter, config.label_filter);
1238 assert_eq!(deserialized.webhook_url, config.webhook_url);
1239 }
1240
1241 #[test]
1244 fn channel_name() {
1245 let ch = GmailPushChannel::new(
1246 GmailPushConfig::default(),
1247 "gmail_push_test_alias",
1248 empty_resolver(),
1249 );
1250 assert_eq!(ch.name(), "gmail_push");
1251 }
1252
1253 #[test]
1256 fn decode_body_none() {
1257 assert!(decode_body(None).is_none());
1258 }
1259
1260 #[test]
1261 fn decode_body_empty_data() {
1262 let body = MessageBody {
1263 data: None,
1264 size: 0,
1265 };
1266 assert!(decode_body(Some(&body)).is_none());
1267 }
1268
1269 #[test]
1270 fn decode_body_valid() {
1271 let b64 = BASE64
1272 .encode(b"test content")
1273 .replace('+', "-")
1274 .replace('/', "_")
1275 .replace('=', "");
1276 let body = MessageBody {
1277 data: Some(b64),
1278 size: 12,
1279 };
1280 assert_eq!(decode_body(Some(&body)), Some("test content".to_string()));
1281 }
1282}