1use anyhow::Result;
21use async_trait::async_trait;
22use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
23use reqwest::Client;
24use serde::{Deserialize, Serialize};
25use std::fmt::Write as _;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime, UNIX_EPOCH};
28use tokio::sync::{Mutex, mpsc};
29
30use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
31
32pub use zeroclaw_config::scattered_types::GmailPushConfig;
33
34#[derive(Debug, Deserialize, Serialize)]
38pub struct PubSubEnvelope {
39 pub message: PubSubMessage,
40 #[serde(default)]
42 pub subscription: String,
43}
44
45#[derive(Debug, Deserialize, Serialize)]
47pub struct PubSubMessage {
48 pub data: String,
50 #[serde(default, rename = "messageId")]
52 pub message_id: String,
53 #[serde(default, rename = "publishTime")]
55 pub publish_time: String,
56}
57
58#[derive(Debug, Deserialize, Serialize)]
60pub struct GmailNotification {
61 #[serde(rename = "emailAddress")]
63 pub email_address: String,
64 #[serde(rename = "historyId")]
66 pub history_id: u64,
67}
68
69#[derive(Debug, Deserialize)]
73pub struct HistoryResponse {
74 pub history: Option<Vec<HistoryRecord>>,
75 #[serde(default, rename = "historyId")]
76 pub history_id: u64,
77 #[serde(default, rename = "nextPageToken")]
78 pub next_page_token: Option<String>,
79}
80
81#[derive(Debug, Deserialize)]
83pub struct HistoryRecord {
84 #[serde(default, rename = "messagesAdded")]
85 pub messages_added: Vec<MessageAdded>,
86}
87
88#[derive(Debug, Deserialize)]
90pub struct MessageAdded {
91 pub message: MessageRef,
92}
93
94#[derive(Debug, Deserialize)]
96pub struct MessageRef {
97 pub id: String,
98 #[serde(default, rename = "threadId")]
99 pub thread_id: String,
100}
101
102#[derive(Debug, Deserialize)]
104pub struct GmailMessage {
105 pub id: String,
106 #[serde(default, rename = "threadId")]
107 pub thread_id: String,
108 #[serde(default)]
109 pub snippet: String,
110 pub payload: Option<MessagePayload>,
111 #[serde(default, rename = "internalDate")]
112 pub internal_date: String,
113}
114
115#[derive(Debug, Deserialize)]
117pub struct MessagePayload {
118 #[serde(default)]
119 pub headers: Vec<MessageHeader>,
120 pub body: Option<MessageBody>,
121 #[serde(default)]
122 pub parts: Vec<MessagePart>,
123 #[serde(default, rename = "mimeType")]
124 pub mime_type: String,
125}
126
127#[derive(Debug, Deserialize)]
129pub struct MessageHeader {
130 pub name: String,
131 pub value: String,
132}
133
134#[derive(Debug, Deserialize)]
136pub struct MessageBody {
137 #[serde(default)]
138 pub data: Option<String>,
139 #[serde(default)]
140 pub size: u64,
141}
142
143#[derive(Debug, Deserialize)]
145pub struct MessagePart {
146 #[serde(default, rename = "mimeType")]
147 pub mime_type: String,
148 pub body: Option<MessageBody>,
149 #[serde(default)]
150 pub parts: Vec<MessagePart>,
151 #[serde(default)]
152 pub filename: String,
153}
154
155#[derive(Debug, Deserialize)]
157pub struct WatchResponse {
158 #[serde(default, rename = "historyId")]
159 pub history_id: u64,
160 #[serde(default)]
161 pub expiration: String,
162}
163
164pub struct GmailPushChannel {
177 pub config: GmailPushConfig,
178 pub alias: String,
181 pub peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
184 http: Client,
185 last_history_id: Arc<Mutex<u64>>,
186 pub tx: Arc<Mutex<Option<mpsc::Sender<ChannelMessage>>>>,
188}
189
190impl GmailPushChannel {
191 pub fn new(
192 config: GmailPushConfig,
193 alias: impl Into<String>,
194 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
195 ) -> Self {
196 let http = Client::builder()
197 .timeout(Duration::from_secs(30))
198 .build()
199 .expect("failed to build HTTP client");
200 Self {
201 config,
202 alias: alias.into(),
203 peer_resolver,
204 http,
205 last_history_id: Arc::new(Mutex::new(0)),
206 tx: Arc::new(Mutex::new(None)),
207 }
208 }
209
210 pub async fn register_watch(&self) -> Result<WatchResponse> {
212 let token = self.config.oauth_token.clone();
213 if token.is_empty() {
214 ::zeroclaw_log::record!(
215 ERROR,
216 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
217 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
218 "Gmail OAuth token is not configured"
219 );
220 anyhow::bail!("Gmail OAuth token is not configured");
221 }
222
223 let body = serde_json::json!({
224 "topicName": self.config.topic,
225 "labelIds": self.config.label_filter,
226 });
227
228 let resp = self
229 .http
230 .post("https://gmail.googleapis.com/gmail/v1/users/me/watch")
231 .bearer_auth(&token)
232 .json(&body)
233 .send()
234 .await?;
235
236 if !resp.status().is_success() {
237 let status = resp.status();
238 let text = resp.text().await.unwrap_or_default();
239 ::zeroclaw_log::record!(
240 ERROR,
241 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
242 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
243 .with_attrs(::serde_json::json!({
244 "phase": "watch_registration",
245 "status": status.as_u16(),
246 "body": text,
247 })),
248 "gmail_push: watch registration failed"
249 );
250 return Err(anyhow::Error::msg(format!(
251 "Gmail watch registration failed ({}): {}",
252 status, text
253 )));
254 }
255
256 let watch: WatchResponse = resp.json().await?;
257 let mut last_id = self.last_history_id.lock().await;
258 if *last_id == 0 {
259 *last_id = watch.history_id;
260 }
261 ::zeroclaw_log::record!(
262 INFO,
263 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
264 &format!(
265 "Gmail watch registered — historyId={}, expiration={}",
266 watch.history_id, watch.expiration
267 )
268 );
269 Ok(watch)
270 }
271
272 pub async fn fetch_history(&self, start_history_id: u64) -> Result<Vec<String>> {
274 let mut last_id = self.last_history_id.lock().await;
275 self.fetch_history_inner(start_history_id, &mut last_id)
276 .await
277 }
278
279 async fn fetch_history_inner(
282 &self,
283 start_history_id: u64,
284 last_id: &mut u64,
285 ) -> Result<Vec<String>> {
286 let token = self.config.oauth_token.clone();
287 if token.is_empty() {
288 ::zeroclaw_log::record!(
289 ERROR,
290 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
291 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
292 "Gmail OAuth token is not configured"
293 );
294 anyhow::bail!("Gmail OAuth token is not configured");
295 }
296
297 let mut message_ids = Vec::new();
298 let mut page_token: Option<String> = None;
299
300 loop {
301 let mut url = format!(
302 "https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId={}&historyTypes=messageAdded",
303 start_history_id
304 );
305 if let Some(ref pt) = page_token {
306 let _ = write!(url, "&pageToken={pt}");
307 }
308
309 let resp = self.http.get(&url).bearer_auth(&token).send().await?;
310
311 if !resp.status().is_success() {
312 let status = resp.status();
313 let text = resp.text().await.unwrap_or_default();
314 ::zeroclaw_log::record!(
315 ERROR,
316 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
317 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
318 .with_attrs(::serde_json::json!({
319 "phase": "history_fetch",
320 "status": status.as_u16(),
321 "body": text,
322 })),
323 "gmail_push: history fetch failed"
324 );
325 return Err(anyhow::Error::msg(format!(
326 "Gmail history fetch failed ({}): {}",
327 status, text
328 )));
329 }
330
331 let history_resp: HistoryResponse = resp.json().await?;
332
333 if let Some(records) = history_resp.history {
334 for record in records {
335 for added in record.messages_added {
336 message_ids.push(added.message.id);
337 }
338 }
339 }
340
341 if history_resp.history_id > 0 && history_resp.history_id > *last_id {
343 *last_id = history_resp.history_id;
344 }
345
346 match history_resp.next_page_token {
347 Some(token) => page_token = Some(token),
348 None => break,
349 }
350 }
351
352 Ok(message_ids)
353 }
354
355 pub async fn fetch_message(&self, message_id: &str) -> Result<GmailMessage> {
357 let token = self.config.oauth_token.clone();
358 let url = format!(
359 "https://gmail.googleapis.com/gmail/v1/users/me/messages/{}?format=full",
360 message_id
361 );
362
363 let resp = self.http.get(&url).bearer_auth(&token).send().await?;
364
365 if !resp.status().is_success() {
366 let status = resp.status();
367 let text = resp.text().await.unwrap_or_default();
368 ::zeroclaw_log::record!(
369 ERROR,
370 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
371 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
372 .with_attrs(::serde_json::json!({
373 "phase": "message_fetch",
374 "status": status.as_u16(),
375 "body": text,
376 })),
377 "gmail_push: message fetch failed"
378 );
379 return Err(anyhow::Error::msg(format!(
380 "Gmail message fetch failed ({}): {}",
381 status, text
382 )));
383 }
384
385 Ok(resp.json().await?)
386 }
387
388 pub fn is_sender_allowed(&self, email: &str) -> bool {
396 let peers = (self.peer_resolver)();
397 Self::is_email_sender_allowed(&peers, email)
398 }
399
400 fn is_email_sender_allowed(peers: &[String], email: &str) -> bool {
403 if peers.is_empty() {
404 return false;
405 }
406 if peers.iter().any(|a| a == "*") {
407 return true;
408 }
409 let email_lower = email.to_lowercase();
410 peers.iter().any(|allowed| {
411 if allowed.starts_with('@') {
412 email_lower.ends_with(&allowed.to_lowercase())
413 } else if allowed.contains('@') {
414 allowed.eq_ignore_ascii_case(email)
415 } else {
416 email_lower.ends_with(&format!("@{}", allowed.to_lowercase()))
417 }
418 })
419 }
420
421 pub async fn handle_notification(&self, envelope: &PubSubEnvelope) -> Result<()> {
423 let notification = parse_notification(&envelope.message)?;
424 ::zeroclaw_log::record!(
425 DEBUG,
426 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
427 &format!(
428 "Gmail push notification: email={}, historyId={}",
429 notification.email_address, notification.history_id
430 )
431 );
432
433 let mut last_id = self.last_history_id.lock().await;
436
437 if *last_id == 0 {
438 *last_id = notification.history_id;
440 ::zeroclaw_log::record!(
441 INFO,
442 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
443 &format!(
444 "Gmail push: first notification, seeding historyId={}",
445 notification.history_id
446 )
447 );
448 return Ok(());
449 }
450
451 let start_id = *last_id;
452 let message_ids = self.fetch_history_inner(start_id, &mut last_id).await?;
453 drop(last_id);
455
456 if message_ids.is_empty() {
457 ::zeroclaw_log::record!(
458 DEBUG,
459 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
460 "Gmail push: no new messages in history"
461 );
462 return Ok(());
463 }
464
465 ::zeroclaw_log::record!(
466 INFO,
467 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
468 &format!(
469 "Gmail push: {} new message(s) to process",
470 message_ids.len()
471 )
472 );
473
474 let tx = {
477 let tx_guard = self.tx.lock().await;
478 match tx_guard.clone() {
479 Some(tx) => tx,
480 None => {
481 ::zeroclaw_log::record!(
482 WARN,
483 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
484 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
485 "Gmail push: no listener registered, dropping messages"
486 );
487 return Ok(());
488 }
489 }
490 };
491
492 for msg_id in message_ids {
493 match self.fetch_message(&msg_id).await {
494 Ok(gmail_msg) => {
495 let sender = extract_header(&gmail_msg, "From").unwrap_or_default();
496 let sender_email = extract_email_from_header(&sender);
497
498 if !self.is_sender_allowed(&sender_email) {
499 ::zeroclaw_log::record!(
500 WARN,
501 ::zeroclaw_log::Event::new(
502 module_path!(),
503 ::zeroclaw_log::Action::Note
504 )
505 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
506 &format!("Gmail push: blocked message from {}", sender_email)
507 );
508 continue;
509 }
510
511 let subject = extract_header(&gmail_msg, "Subject").unwrap_or_default();
512 let body_text = extract_body_text(&gmail_msg);
513
514 let content = format!("Subject: {subject}\n\n{body_text}");
515 let timestamp = gmail_msg
516 .internal_date
517 .parse::<u64>()
518 .map(|ms| ms / 1000)
519 .unwrap_or_else(|_| {
520 SystemTime::now()
521 .duration_since(UNIX_EPOCH)
522 .map(|d| d.as_secs())
523 .unwrap_or(0)
524 });
525
526 let channel_msg = ChannelMessage {
527 id: format!("gmail_{}", gmail_msg.id),
528 reply_target: sender_email.clone(),
529 sender: sender_email,
530 content,
531 channel: "gmail_push".to_string(),
532 channel_alias: Some(self.alias.clone()),
533 timestamp,
534 thread_ts: Some(gmail_msg.thread_id),
535 interruption_scope_id: None,
536 attachments: Vec::new(),
537 subject: None,
538 };
539
540 if tx.send(channel_msg).await.is_err() {
541 ::zeroclaw_log::record!(
542 DEBUG,
543 ::zeroclaw_log::Event::new(
544 module_path!(),
545 ::zeroclaw_log::Action::Note
546 ),
547 "Gmail push: listener channel closed"
548 );
549 return Ok(());
550 }
551 }
552 Err(e) => {
553 ::zeroclaw_log::record!(
554 ERROR,
555 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
556 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
557 &format!("Gmail push: failed to fetch message {}: {}", msg_id, e)
558 );
559 }
560 }
561 }
562
563 Ok(())
564 }
565}
566
567impl ::zeroclaw_api::attribution::Attributable for GmailPushChannel {
568 fn role(&self) -> ::zeroclaw_api::attribution::Role {
569 ::zeroclaw_api::attribution::Role::Channel(
570 ::zeroclaw_api::attribution::ChannelKind::GmailPush,
571 )
572 }
573 fn alias(&self) -> &str {
574 &self.alias
575 }
576}
577
578#[async_trait]
579impl Channel for GmailPushChannel {
580 fn name(&self) -> &str {
581 "gmail_push"
582 }
583
584 async fn send(&self, message: &SendMessage) -> Result<()> {
585 let token = self.config.oauth_token.clone();
587 if token.is_empty() {
588 ::zeroclaw_log::record!(
589 ERROR,
590 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
591 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
592 "Gmail OAuth token is not configured for sending"
593 );
594 anyhow::bail!("Gmail OAuth token is not configured for sending");
595 }
596
597 let subject = message.subject.as_deref().unwrap_or("ZeroClaw Message");
598 let safe_recipient = sanitize_header_value(&message.recipient);
600 let safe_subject = sanitize_header_value(subject);
601 let rfc2822 = format!(
602 "To: {}\r\nSubject: {}\r\nContent-Type: text/plain; charset=utf-8\r\n\r\n{}",
603 safe_recipient, safe_subject, message.content
604 );
605 let encoded = BASE64.encode(rfc2822.as_bytes());
606 let url_safe = encoded.replace('+', "-").replace('/', "_").replace('=', "");
608
609 let body = serde_json::json!({
610 "raw": url_safe,
611 });
612
613 let resp = self
614 .http
615 .post("https://gmail.googleapis.com/gmail/v1/users/me/messages/send")
616 .bearer_auth(&token)
617 .json(&body)
618 .send()
619 .await?;
620
621 if !resp.status().is_success() {
622 let status = resp.status();
623 let text = resp.text().await.unwrap_or_default();
624 ::zeroclaw_log::record!(
625 ERROR,
626 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
627 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
628 .with_attrs(::serde_json::json!({
629 "phase": "send",
630 "status": status.as_u16(),
631 "body": text,
632 })),
633 "gmail_push: send failed"
634 );
635 return Err(anyhow::Error::msg(format!(
636 "Gmail send failed ({}): {}",
637 status, text
638 )));
639 }
640
641 ::zeroclaw_log::record!(
642 INFO,
643 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
644 &format!("Gmail message sent to {}", message.recipient)
645 );
646 Ok(())
647 }
648
649 async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> Result<()> {
650 {
652 let mut tx_guard = self.tx.lock().await;
653 *tx_guard = Some(tx);
654 }
655
656 ::zeroclaw_log::record!(
657 INFO,
658 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
659 "Gmail push channel started — registering watch subscription"
660 );
661
662 if !self.config.webhook_url.is_empty()
664 && let Err(e) = self.register_watch().await
665 {
666 ::zeroclaw_log::record!(
667 ERROR,
668 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
669 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
670 .with_attrs(::serde_json::json!({"e": e.to_string()})),
671 "Gmail watch registration failed"
672 );
673 }
675
676 let renewal_interval = Duration::from_secs(6 * 24 * 60 * 60); loop {
680 tokio::time::sleep(renewal_interval).await;
681 ::zeroclaw_log::record!(
682 INFO,
683 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
684 "Gmail push: renewing watch subscription"
685 );
686 if let Err(e) = self.register_watch().await {
687 ::zeroclaw_log::record!(
688 ERROR,
689 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
690 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
691 .with_attrs(::serde_json::json!({"e": e.to_string()})),
692 "Gmail watch renewal failed"
693 );
694 }
695 }
696 }
697
698 async fn health_check(&self) -> bool {
699 let token = self.config.oauth_token.clone();
700 if token.is_empty() {
701 return false;
702 }
703
704 match self
705 .http
706 .get("https://gmail.googleapis.com/gmail/v1/users/me/profile")
707 .bearer_auth(&token)
708 .timeout(Duration::from_secs(10))
709 .send()
710 .await
711 {
712 Ok(resp) => resp.status().is_success(),
713 Err(_) => false,
714 }
715 }
716}
717
718pub fn parse_notification(msg: &PubSubMessage) -> Result<GmailNotification> {
722 let decoded = BASE64.decode(&msg.data).map_err(|e| {
723 ::zeroclaw_log::record!(
724 WARN,
725 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
726 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
727 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
728 "Invalid base64 in Pub/Sub message"
729 );
730 anyhow::Error::msg(format!("Invalid base64 in Pub/Sub message: {e}"))
731 })?;
732 let notification: GmailNotification = serde_json::from_slice(&decoded).map_err(|e| {
733 ::zeroclaw_log::record!(
734 WARN,
735 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
736 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
737 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
738 "Invalid JSON in Gmail notification"
739 );
740 anyhow::Error::msg(format!("Invalid JSON in Gmail notification: {e}"))
741 })?;
742 Ok(notification)
743}
744
745pub fn extract_header(msg: &GmailMessage, name: &str) -> Option<String> {
747 msg.payload.as_ref().and_then(|p| {
748 p.headers
749 .iter()
750 .find(|h| h.name.eq_ignore_ascii_case(name))
751 .map(|h| h.value.clone())
752 })
753}
754
755pub fn extract_email_from_header(from: &str) -> String {
757 if let Some(start) = from.find('<') {
758 if let Some(end) = from.rfind('>')
761 && end > start + 1
762 {
763 return from[start + 1..end].to_string();
764 }
765 }
766 from.trim().to_string()
767}
768
769pub fn sanitize_header_value(value: &str) -> String {
772 value.chars().filter(|c| *c != '\r' && *c != '\n').collect()
773}
774
775pub fn extract_body_text(msg: &GmailMessage) -> String {
780 if let Some(ref payload) = msg.payload {
781 if payload.mime_type == "text/plain"
783 && let Some(text) = decode_body(payload.body.as_ref())
784 {
785 return text;
786 }
787
788 if let Some(text) = find_text_in_parts(&payload.parts, "text/plain") {
790 return text;
791 }
792 if let Some(html) = find_text_in_parts(&payload.parts, "text/html") {
793 return strip_html(&html);
794 }
795 }
796
797 msg.snippet.clone()
799}
800
801fn find_text_in_parts(parts: &[MessagePart], mime_type: &str) -> Option<String> {
803 for part in parts {
804 if part.mime_type == mime_type
805 && let Some(text) = decode_body(part.body.as_ref())
806 {
807 return Some(text);
808 }
809 if let Some(text) = find_text_in_parts(&part.parts, mime_type) {
811 return Some(text);
812 }
813 }
814 None
815}
816
817fn decode_body(body: Option<&MessageBody>) -> Option<String> {
819 body.and_then(|b| {
820 b.data.as_ref().and_then(|data| {
821 let standard = data.replace('-', "+").replace('_', "/");
823 let padded = match standard.len() % 4 {
825 2 => format!("{standard}=="),
826 3 => format!("{standard}="),
827 _ => standard,
828 };
829 BASE64
830 .decode(&padded)
831 .ok()
832 .and_then(|bytes| String::from_utf8(bytes).ok())
833 })
834 })
835}
836
837fn strip_html(html: &str) -> String {
839 let mut result = String::new();
840 let mut in_tag = false;
841 for ch in html.chars() {
842 match ch {
843 '<' => in_tag = true,
844 '>' => in_tag = false,
845 _ if !in_tag => result.push(ch),
846 _ => {}
847 }
848 }
849 let mut normalized = String::with_capacity(result.len());
850 for word in result.split_whitespace() {
851 if !normalized.is_empty() {
852 normalized.push(' ');
853 }
854 normalized.push_str(word);
855 }
856 normalized
857}
858
859#[cfg(test)]
862mod tests {
863 use super::*;
864
865 #[test]
868 fn parse_notification_valid() {
869 let payload = serde_json::json!({
870 "emailAddress": "user@example.com",
871 "historyId": 12345
872 });
873 let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
874
875 let msg = PubSubMessage {
876 data: encoded,
877 message_id: "msg-1".into(),
878 publish_time: "2026-03-21T08:00:00Z".into(),
879 };
880
881 let notification = parse_notification(&msg).unwrap();
882 assert_eq!(notification.email_address, "user@example.com");
883 assert_eq!(notification.history_id, 12345);
884 }
885
886 #[test]
887 fn parse_notification_invalid_base64() {
888 let msg = PubSubMessage {
889 data: "!!!not-base64!!!".into(),
890 message_id: "msg-2".into(),
891 publish_time: String::new(),
892 };
893 assert!(parse_notification(&msg).is_err());
894 }
895
896 #[test]
897 fn parse_notification_invalid_json() {
898 let encoded = BASE64.encode(b"not json at all");
899 let msg = PubSubMessage {
900 data: encoded,
901 message_id: "msg-3".into(),
902 publish_time: String::new(),
903 };
904 assert!(parse_notification(&msg).is_err());
905 }
906
907 #[test]
910 fn pubsub_envelope_deserialize() {
911 let payload = serde_json::json!({
912 "emailAddress": "test@gmail.com",
913 "historyId": 999
914 });
915 let encoded = BASE64.encode(serde_json::to_vec(&payload).unwrap());
916
917 let json = serde_json::json!({
918 "message": {
919 "data": encoded,
920 "messageId": "pubsub-1",
921 "publishTime": "2026-03-21T10:00:00Z"
922 },
923 "subscription": "projects/my-project/subscriptions/gmail-push"
924 });
925
926 let envelope: PubSubEnvelope = serde_json::from_value(json).unwrap();
927 assert_eq!(envelope.message.message_id, "pubsub-1");
928 assert_eq!(
929 envelope.subscription,
930 "projects/my-project/subscriptions/gmail-push"
931 );
932
933 let notification = parse_notification(&envelope.message).unwrap();
934 assert_eq!(notification.email_address, "test@gmail.com");
935 assert_eq!(notification.history_id, 999);
936 }
937
938 #[test]
941 fn extract_email_from_header_angle_brackets() {
942 assert_eq!(
943 extract_email_from_header("John Doe <john@example.com>"),
944 "john@example.com"
945 );
946 }
947
948 #[test]
949 fn extract_email_from_header_bare_email() {
950 assert_eq!(
951 extract_email_from_header("user@example.com"),
952 "user@example.com"
953 );
954 }
955
956 #[test]
957 fn extract_email_from_header_empty() {
958 assert_eq!(extract_email_from_header(""), "");
959 }
960
961 #[test]
962 fn extract_email_with_quotes() {
963 assert_eq!(
964 extract_email_from_header("\"Doe, John\" <john@example.com>"),
965 "john@example.com"
966 );
967 }
968
969 #[test]
970 fn extract_email_malformed_angle_brackets() {
971 assert_eq!(
973 extract_email_from_header("attacker> <victim@example.com"),
974 "attacker> <victim@example.com"
975 );
976 assert_eq!(
978 extract_email_from_header("attacker> <victim@example.com>"),
979 "victim@example.com"
980 );
981 assert_eq!(extract_email_from_header("Name <broken"), "Name <broken");
983 }
984
985 #[test]
986 fn sanitize_header_strips_crlf() {
987 assert_eq!(
988 sanitize_header_value("normal@example.com"),
989 "normal@example.com"
990 );
991 assert_eq!(
992 sanitize_header_value("evil@example.com\r\nBcc: spy@evil.com"),
993 "evil@example.comBcc: spy@evil.com"
994 );
995 assert_eq!(
996 sanitize_header_value("inject\nSubject: fake"),
997 "injectSubject: fake"
998 );
999 }
1000
1001 #[test]
1004 fn extract_header_found() {
1005 let msg = GmailMessage {
1006 id: "msg-1".into(),
1007 thread_id: "thread-1".into(),
1008 snippet: String::new(),
1009 payload: Some(MessagePayload {
1010 headers: vec![
1011 MessageHeader {
1012 name: "From".into(),
1013 value: "sender@example.com".into(),
1014 },
1015 MessageHeader {
1016 name: "Subject".into(),
1017 value: "Test Subject".into(),
1018 },
1019 ],
1020 body: None,
1021 parts: Vec::new(),
1022 mime_type: String::new(),
1023 }),
1024 internal_date: "0".into(),
1025 };
1026
1027 assert_eq!(
1028 extract_header(&msg, "Subject"),
1029 Some("Test Subject".to_string())
1030 );
1031 assert_eq!(
1032 extract_header(&msg, "from"), Some("sender@example.com".to_string())
1034 );
1035 assert_eq!(extract_header(&msg, "X-Missing"), None);
1036 }
1037
1038 #[test]
1039 fn extract_header_no_payload() {
1040 let msg = GmailMessage {
1041 id: "msg-2".into(),
1042 thread_id: String::new(),
1043 snippet: String::new(),
1044 payload: None,
1045 internal_date: "0".into(),
1046 };
1047 assert_eq!(extract_header(&msg, "Subject"), None);
1048 }
1049
1050 #[test]
1053 fn extract_body_text_plain() {
1054 let plain_b64 = BASE64
1055 .encode(b"Hello, world!")
1056 .replace('+', "-")
1057 .replace('/', "_")
1058 .replace('=', "");
1059
1060 let msg = GmailMessage {
1061 id: "msg-3".into(),
1062 thread_id: String::new(),
1063 snippet: "snippet".into(),
1064 payload: Some(MessagePayload {
1065 headers: Vec::new(),
1066 body: Some(MessageBody {
1067 data: Some(plain_b64),
1068 size: 13,
1069 }),
1070 parts: Vec::new(),
1071 mime_type: "text/plain".into(),
1072 }),
1073 internal_date: "0".into(),
1074 };
1075
1076 assert_eq!(extract_body_text(&msg), "Hello, world!");
1077 }
1078
1079 #[test]
1080 fn extract_body_text_multipart() {
1081 let html_b64 = BASE64
1082 .encode(b"<p>Hello</p>")
1083 .replace('+', "-")
1084 .replace('/', "_")
1085 .replace('=', "");
1086
1087 let msg = GmailMessage {
1088 id: "msg-4".into(),
1089 thread_id: String::new(),
1090 snippet: "snippet".into(),
1091 payload: Some(MessagePayload {
1092 headers: Vec::new(),
1093 body: None,
1094 parts: vec![MessagePart {
1095 mime_type: "text/html".into(),
1096 body: Some(MessageBody {
1097 data: Some(html_b64),
1098 size: 12,
1099 }),
1100 parts: Vec::new(),
1101 filename: String::new(),
1102 }],
1103 mime_type: "multipart/alternative".into(),
1104 }),
1105 internal_date: "0".into(),
1106 };
1107
1108 assert_eq!(extract_body_text(&msg), "Hello");
1109 }
1110
1111 #[test]
1112 fn extract_body_text_fallback_to_snippet() {
1113 let msg = GmailMessage {
1114 id: "msg-5".into(),
1115 thread_id: String::new(),
1116 snippet: "My snippet text".into(),
1117 payload: Some(MessagePayload {
1118 headers: Vec::new(),
1119 body: None,
1120 parts: Vec::new(),
1121 mime_type: "multipart/mixed".into(),
1122 }),
1123 internal_date: "0".into(),
1124 };
1125
1126 assert_eq!(extract_body_text(&msg), "My snippet text");
1127 }
1128
1129 fn empty_resolver() -> Arc<dyn Fn() -> Vec<String> + Send + Sync> {
1132 Arc::new(Vec::new)
1133 }
1134
1135 fn resolver_from(peers: Vec<String>) -> Arc<dyn Fn() -> Vec<String> + Send + Sync> {
1136 Arc::new(move || peers.clone())
1137 }
1138
1139 #[test]
1140 fn sender_allowed_empty_denies() {
1141 let ch = GmailPushChannel::new(
1142 GmailPushConfig::default(),
1143 "gmail_push_test_alias",
1144 empty_resolver(),
1145 );
1146 assert!(!ch.is_sender_allowed("anyone@example.com"));
1147 }
1148
1149 #[test]
1150 fn sender_allowed_wildcard() {
1151 let ch = GmailPushChannel::new(
1152 GmailPushConfig::default(),
1153 "gmail_push_test_alias",
1154 resolver_from(vec!["*".into()]),
1155 );
1156 assert!(ch.is_sender_allowed("anyone@example.com"));
1157 }
1158
1159 #[test]
1160 fn sender_allowed_specific_email() {
1161 let ch = GmailPushChannel::new(
1162 GmailPushConfig::default(),
1163 "gmail_push_test_alias",
1164 resolver_from(vec!["user@example.com".into()]),
1165 );
1166 assert!(ch.is_sender_allowed("user@example.com"));
1167 assert!(!ch.is_sender_allowed("other@example.com"));
1168 }
1169
1170 #[test]
1171 fn sender_allowed_domain_with_at() {
1172 let ch = GmailPushChannel::new(
1173 GmailPushConfig::default(),
1174 "gmail_push_test_alias",
1175 resolver_from(vec!["@example.com".into()]),
1176 );
1177 assert!(ch.is_sender_allowed("user@example.com"));
1178 assert!(ch.is_sender_allowed("admin@example.com"));
1179 assert!(!ch.is_sender_allowed("user@other.com"));
1180 }
1181
1182 #[test]
1183 fn sender_allowed_domain_without_at() {
1184 let ch = GmailPushChannel::new(
1185 GmailPushConfig::default(),
1186 "gmail_push_test_alias",
1187 resolver_from(vec!["example.com".into()]),
1188 );
1189 assert!(ch.is_sender_allowed("user@example.com"));
1190 assert!(!ch.is_sender_allowed("user@other.com"));
1191 }
1192
1193 #[test]
1196 fn strip_html_basic() {
1197 assert_eq!(strip_html("<p>Hello</p>"), "Hello");
1198 }
1199
1200 #[test]
1201 fn strip_html_nested() {
1202 assert_eq!(
1203 strip_html("<div><p>Hello <b>World</b></p></div>"),
1204 "Hello World"
1205 );
1206 }
1207
1208 #[test]
1211 fn config_default_values() {
1212 let config = GmailPushConfig::default();
1213 assert!(config.topic.is_empty());
1214 assert_eq!(config.label_filter, vec!["INBOX"]);
1215 assert!(config.oauth_token.is_empty());
1216 assert!(config.webhook_url.is_empty());
1217 }
1218
1219 #[test]
1220 fn config_deserialize_with_defaults() {
1221 let json = r#"{"topic": "projects/my-proj/topics/gmail"}"#;
1222 let config: GmailPushConfig = serde_json::from_str(json).unwrap();
1223 assert_eq!(config.topic, "projects/my-proj/topics/gmail");
1224 assert_eq!(config.label_filter, vec!["INBOX"]);
1225 }
1226
1227 #[test]
1228 fn config_serialize_roundtrip() {
1229 let config = GmailPushConfig {
1230 enabled: true,
1231 topic: "projects/test/topics/gmail".into(),
1232 label_filter: vec!["INBOX".into(), "IMPORTANT".into()],
1233 oauth_token: "test-token".into(),
1234 webhook_url: "https://example.com/webhook/gmail".into(),
1235 webhook_secret: "my-secret".into(),
1236 excluded_tools: vec![],
1237 };
1238 let json = serde_json::to_string(&config).unwrap();
1239 let deserialized: GmailPushConfig = serde_json::from_str(&json).unwrap();
1240 assert_eq!(deserialized.topic, config.topic);
1241 assert_eq!(deserialized.label_filter, config.label_filter);
1242 assert_eq!(deserialized.webhook_url, config.webhook_url);
1243 }
1244
1245 #[test]
1248 fn channel_name() {
1249 let ch = GmailPushChannel::new(
1250 GmailPushConfig::default(),
1251 "gmail_push_test_alias",
1252 empty_resolver(),
1253 );
1254 assert_eq!(ch.name(), "gmail_push");
1255 }
1256
1257 #[test]
1260 fn decode_body_none() {
1261 assert!(decode_body(None).is_none());
1262 }
1263
1264 #[test]
1265 fn decode_body_empty_data() {
1266 let body = MessageBody {
1267 data: None,
1268 size: 0,
1269 };
1270 assert!(decode_body(Some(&body)).is_none());
1271 }
1272
1273 #[test]
1274 fn decode_body_valid() {
1275 let b64 = BASE64
1276 .encode(b"test content")
1277 .replace('+', "-")
1278 .replace('/', "_")
1279 .replace('=', "");
1280 let body = MessageBody {
1281 data: Some(b64),
1282 size: 12,
1283 };
1284 assert_eq!(decode_body(Some(&body)), Some("test content".to_string()));
1285 }
1286}