Skip to main content

zeroclaw_channels/
bluesky.rs

1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use parking_lot::Mutex;
4use serde::{Deserialize, Serialize};
5use std::time::{Duration, Instant};
6use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
7
8/// Bluesky channel — polls for mentions via AT Protocol and replies as posts.
9pub struct BlueskyChannel {
10    alias: String,
11    handle: String,
12    app_password: String,
13    auth: Mutex<BlueskyAuth>,
14}
15
16struct BlueskyAuth {
17    access_jwt: String,
18    refresh_jwt: String,
19    did: String,
20    expires_at: Instant,
21}
22
23const BSKY_API_BASE: &str = "https://bsky.social/xrpc";
24const POLL_INTERVAL: Duration = Duration::from_secs(5);
25
26#[derive(Deserialize)]
27struct CreateSessionResponse {
28    #[serde(rename = "accessJwt")]
29    access_jwt: String,
30    #[serde(rename = "refreshJwt")]
31    refresh_jwt: String,
32    did: String,
33}
34
35#[derive(Deserialize)]
36struct RefreshSessionResponse {
37    #[serde(rename = "accessJwt")]
38    access_jwt: String,
39    #[serde(rename = "refreshJwt")]
40    refresh_jwt: String,
41}
42
43#[derive(Deserialize)]
44struct NotificationListResponse {
45    notifications: Vec<Notification>,
46    cursor: Option<String>,
47}
48
49#[allow(dead_code)]
50#[derive(Deserialize)]
51struct Notification {
52    uri: String,
53    cid: String,
54    author: NotificationAuthor,
55    reason: String,
56    record: Option<serde_json::Value>,
57    #[serde(rename = "isRead")]
58    is_read: bool,
59    #[serde(rename = "indexedAt")]
60    indexed_at: String,
61}
62
63#[allow(dead_code)]
64#[derive(Deserialize)]
65struct NotificationAuthor {
66    did: String,
67    handle: String,
68    #[serde(rename = "displayName")]
69    display_name: Option<String>,
70}
71
72/// AT Protocol record for creating a post.
73#[derive(Serialize)]
74struct CreateRecordRequest {
75    repo: String,
76    collection: String,
77    record: PostRecord,
78}
79
80#[derive(Serialize)]
81struct PostRecord {
82    #[serde(rename = "$type")]
83    record_type: String,
84    text: String,
85    #[serde(rename = "createdAt")]
86    created_at: String,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    reply: Option<ReplyRef>,
89}
90
91#[derive(Serialize)]
92struct ReplyRef {
93    root: PostRef,
94    parent: PostRef,
95}
96
97#[derive(Serialize)]
98struct PostRef {
99    uri: String,
100    cid: String,
101}
102
103impl BlueskyChannel {
104    pub fn new(alias: String, handle: String, app_password: String) -> Self {
105        Self {
106            alias,
107            handle,
108            app_password,
109            auth: Mutex::new(BlueskyAuth {
110                access_jwt: String::new(),
111                refresh_jwt: String::new(),
112                did: String::new(),
113                expires_at: Instant::now(),
114            }),
115        }
116    }
117
118    fn http_client(&self) -> reqwest::Client {
119        zeroclaw_config::schema::build_runtime_proxy_client("channel.bluesky")
120    }
121
122    /// Create a new session with handle + app password.
123    async fn create_session(&self) -> Result<()> {
124        let client = self.http_client();
125        let resp = client
126            .post(format!("{BSKY_API_BASE}/com.atproto.server.createSession"))
127            .json(&serde_json::json!({
128                "identifier": self.handle,
129                "password": self.app_password,
130            }))
131            .send()
132            .await?;
133
134        let status = resp.status();
135        if !status.is_success() {
136            let body = resp
137                .text()
138                .await
139                .unwrap_or_else(|e| format!("<failed to read response: {e}>"));
140            bail!("createSession failed ({status}): {body}");
141        }
142
143        let session: CreateSessionResponse = resp.json().await?;
144        let mut auth = self.auth.lock();
145        auth.access_jwt = session.access_jwt;
146        auth.refresh_jwt = session.refresh_jwt;
147        auth.did = session.did;
148        // AT Protocol JWTs typically last ~2 hours; refresh well before that.
149        auth.expires_at = Instant::now() + Duration::from_secs(90 * 60);
150        Ok(())
151    }
152
153    /// Refresh an existing session.
154    async fn refresh_session(&self) -> Result<()> {
155        let refresh_jwt = {
156            let auth = self.auth.lock();
157            auth.refresh_jwt.clone()
158        };
159
160        if refresh_jwt.is_empty() {
161            return self.create_session().await;
162        }
163
164        let client = self.http_client();
165        let resp = client
166            .post(format!("{BSKY_API_BASE}/com.atproto.server.refreshSession"))
167            .bearer_auth(&refresh_jwt)
168            .send()
169            .await?;
170
171        if !resp.status().is_success() {
172            // Refresh failed — fall back to full re-auth
173            ::zeroclaw_log::record!(
174                WARN,
175                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
176                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
177                "session refresh failed, re-authenticating"
178            );
179            return self.create_session().await;
180        }
181
182        let refreshed: RefreshSessionResponse = resp.json().await?;
183        let mut auth = self.auth.lock();
184        auth.access_jwt = refreshed.access_jwt;
185        auth.refresh_jwt = refreshed.refresh_jwt;
186        auth.expires_at = Instant::now() + Duration::from_secs(90 * 60);
187        Ok(())
188    }
189
190    /// Get a valid access JWT, refreshing if expired.
191    async fn get_access_jwt(&self) -> Result<String> {
192        {
193            let auth = self.auth.lock();
194            if !auth.access_jwt.is_empty() && Instant::now() < auth.expires_at {
195                return Ok(auth.access_jwt.clone());
196            }
197        }
198        self.refresh_session().await?;
199        let auth = self.auth.lock();
200        Ok(auth.access_jwt.clone())
201    }
202
203    /// Get the DID for the authenticated account.
204    fn get_did(&self) -> String {
205        self.auth.lock().did.clone()
206    }
207
208    /// Parse a notification into a ChannelMessage (only processes mentions).
209    fn parse_notification(&self, notif: &Notification) -> Option<ChannelMessage> {
210        // Only process mentions
211        if notif.reason != "mention" && notif.reason != "reply" {
212            return None;
213        }
214
215        // Skip already-read notifications
216        if notif.is_read {
217            return None;
218        }
219
220        // Skip own posts
221        if notif.author.did == self.get_did() {
222            return None;
223        }
224
225        // Extract text from the record
226        let text = notif
227            .record
228            .as_ref()
229            .and_then(|r| r.get("text"))
230            .and_then(|t| t.as_str())
231            .unwrap_or("");
232
233        if text.is_empty() {
234            return None;
235        }
236
237        // Parse timestamp from indexedAt (ISO 8601)
238        let timestamp = chrono::DateTime::parse_from_rfc3339(&notif.indexed_at)
239            .map(|dt| dt.timestamp().cast_unsigned())
240            .unwrap_or(0);
241
242        // Extract CID from the record for reply references
243        let cid = notif
244            .record
245            .as_ref()
246            .and_then(|r| r.get("cid"))
247            .and_then(|c| c.as_str())
248            .unwrap_or(&notif.cid);
249
250        // The reply target encodes the URI and CID needed for threading
251        let reply_target = format!("{}|{}", notif.uri, cid);
252
253        Some(ChannelMessage {
254            id: format!("bluesky_{}", notif.cid),
255            sender: notif.author.handle.clone(),
256            reply_target,
257            content: text.to_string(),
258            channel: "bluesky".to_string(),
259            channel_alias: None,
260            timestamp,
261            thread_ts: Some(notif.uri.clone()),
262            interruption_scope_id: None,
263            attachments: vec![],
264            subject: None,
265        })
266    }
267
268    /// Mark notifications as read up to a given timestamp.
269    async fn update_seen(&self, seen_at: &str) -> Result<()> {
270        let token = self.get_access_jwt().await?;
271        let client = self.http_client();
272
273        let resp = client
274            .post(format!("{BSKY_API_BASE}/app.bsky.notification.updateSeen"))
275            .bearer_auth(&token)
276            .json(&serde_json::json!({ "seenAt": seen_at }))
277            .send()
278            .await?;
279
280        if !resp.status().is_success() {
281            ::zeroclaw_log::record!(
282                WARN,
283                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
284                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
285                &format!("updateSeen failed: {}", resp.status())
286            );
287        }
288        Ok(())
289    }
290}
291
292impl ::zeroclaw_api::attribution::Attributable for BlueskyChannel {
293    fn role(&self) -> ::zeroclaw_api::attribution::Role {
294        ::zeroclaw_api::attribution::Role::Channel(
295            ::zeroclaw_api::attribution::ChannelKind::Bluesky,
296        )
297    }
298    fn alias(&self) -> &str {
299        &self.alias
300    }
301}
302
303#[async_trait]
304impl Channel for BlueskyChannel {
305    fn name(&self) -> &str {
306        "bluesky"
307    }
308
309    async fn send(&self, message: &SendMessage) -> Result<()> {
310        let token = self.get_access_jwt().await?;
311        let did = self.get_did();
312        let client = self.http_client();
313
314        let now = chrono::Utc::now().to_rfc3339();
315
316        // Parse reply reference from recipient if present (format: "uri|cid")
317        let reply = if message.recipient.contains('|') {
318            let parts: Vec<&str> = message.recipient.splitn(2, '|').collect();
319            if parts.len() == 2 {
320                let uri = parts[0];
321                let cid = parts[1];
322                Some(ReplyRef {
323                    root: PostRef {
324                        uri: uri.to_string(),
325                        cid: cid.to_string(),
326                    },
327                    parent: PostRef {
328                        uri: uri.to_string(),
329                        cid: cid.to_string(),
330                    },
331                })
332            } else {
333                None
334            }
335        } else {
336            None
337        };
338
339        // Bluesky posts have a 300-character limit (grapheme clusters).
340        // For longer content, truncate with an indicator.
341        let text = if message.content.len() > 300 {
342            format!("{}...", &message.content[..297])
343        } else {
344            message.content.clone()
345        };
346
347        let request = CreateRecordRequest {
348            repo: did,
349            collection: "app.bsky.feed.post".to_string(),
350            record: PostRecord {
351                record_type: "app.bsky.feed.post".to_string(),
352                text,
353                created_at: now,
354                reply,
355            },
356        };
357
358        let resp = client
359            .post(format!("{BSKY_API_BASE}/com.atproto.repo.createRecord"))
360            .bearer_auth(&token)
361            .json(&request)
362            .send()
363            .await?;
364
365        let status = resp.status();
366        if !status.is_success() {
367            let body = resp
368                .text()
369                .await
370                .unwrap_or_else(|e| format!("<failed to read response: {e}>"));
371            bail!("post failed ({status}): {body}");
372        }
373
374        Ok(())
375    }
376
377    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
378        // Initial auth
379        self.create_session().await?;
380
381        ::zeroclaw_log::record!(
382            INFO,
383            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
384            &format!("channel listening as @{}...", self.handle)
385        );
386
387        loop {
388            tokio::time::sleep(POLL_INTERVAL).await;
389
390            let token = match self.get_access_jwt().await {
391                Ok(t) => t,
392                Err(e) => {
393                    ::zeroclaw_log::record!(
394                        WARN,
395                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
396                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
397                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
398                        "auth error"
399                    );
400                    continue;
401                }
402            };
403
404            let client = self.http_client();
405            let resp = match client
406                .get(format!(
407                    "{BSKY_API_BASE}/app.bsky.notification.listNotifications"
408                ))
409                .bearer_auth(&token)
410                .query(&[("limit", "25")])
411                .send()
412                .await
413            {
414                Ok(r) => r,
415                Err(e) => {
416                    ::zeroclaw_log::record!(
417                        WARN,
418                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
419                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
420                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
421                        "poll error"
422                    );
423                    continue;
424                }
425            };
426
427            if !resp.status().is_success() {
428                ::zeroclaw_log::record!(
429                    WARN,
430                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
431                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
432                    &format!("notifications failed: {}", resp.status())
433                );
434                continue;
435            }
436
437            let listing: NotificationListResponse = match resp.json().await {
438                Ok(l) => l,
439                Err(e) => {
440                    ::zeroclaw_log::record!(
441                        WARN,
442                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
443                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
444                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
445                        "parse error"
446                    );
447                    continue;
448                }
449            };
450
451            let mut latest_indexed_at: Option<String> = None;
452            for notif in &listing.notifications {
453                if let Some(msg) = self.parse_notification(notif) {
454                    latest_indexed_at = Some(notif.indexed_at.clone());
455                    if tx.send(msg).await.is_err() {
456                        return Ok(());
457                    }
458                }
459            }
460
461            // Mark as seen
462            if let Some(ref seen_at) = latest_indexed_at
463                && let Err(e) = self.update_seen(seen_at).await
464            {
465                ::zeroclaw_log::record!(
466                    WARN,
467                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
468                        .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
469                        .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
470                    "updateSeen error"
471                );
472            }
473
474            let _ = &listing.cursor; // cursor available for pagination if needed
475        }
476    }
477
478    async fn health_check(&self) -> bool {
479        self.get_access_jwt().await.is_ok()
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486
487    fn make_channel() -> BlueskyChannel {
488        let ch = BlueskyChannel::new(
489            "testbot".into(),
490            "testbot.bsky.social".into(),
491            "app-password".into(),
492        );
493        // Seed auth with a DID for tests
494        {
495            let mut auth = ch.auth.lock();
496            auth.did = "did:plc:test123".into();
497        }
498        ch
499    }
500
501    fn make_notification(
502        reason: &str,
503        handle: &str,
504        did: &str,
505        text: &str,
506        is_read: bool,
507    ) -> Notification {
508        Notification {
509            uri: format!("at://{did}/app.bsky.feed.post/abc123"),
510            cid: "bafyreitest123".into(),
511            author: NotificationAuthor {
512                did: did.into(),
513                handle: handle.into(),
514                display_name: None,
515            },
516            reason: reason.into(),
517            record: Some(serde_json::json!({ "text": text })),
518            is_read,
519            indexed_at: "2026-01-15T10:00:00.000Z".into(),
520        }
521    }
522
523    #[test]
524    fn parse_mention_notification() {
525        let ch = make_channel();
526        let notif = make_notification(
527            "mention",
528            "user1.bsky.social",
529            "did:plc:user1",
530            "@testbot hello",
531            false,
532        );
533
534        let msg = ch.parse_notification(&notif).unwrap();
535        assert_eq!(msg.sender, "user1.bsky.social");
536        assert_eq!(msg.content, "@testbot hello");
537        assert_eq!(msg.channel, "bluesky");
538        assert!(msg.id.starts_with("bluesky_"));
539    }
540
541    #[test]
542    fn parse_reply_notification() {
543        let ch = make_channel();
544        let notif = make_notification(
545            "reply",
546            "user2.bsky.social",
547            "did:plc:user2",
548            "thanks for the info!",
549            false,
550        );
551
552        let msg = ch.parse_notification(&notif).unwrap();
553        assert_eq!(msg.sender, "user2.bsky.social");
554        assert_eq!(msg.content, "thanks for the info!");
555    }
556
557    #[test]
558    fn skip_read_notifications() {
559        let ch = make_channel();
560        let notif = make_notification(
561            "mention",
562            "user1.bsky.social",
563            "did:plc:user1",
564            "old message",
565            true,
566        );
567
568        assert!(ch.parse_notification(&notif).is_none());
569    }
570
571    #[test]
572    fn skip_own_notifications() {
573        let ch = make_channel();
574        let notif = make_notification(
575            "mention",
576            "testbot.bsky.social",
577            "did:plc:test123", // same as seeded DID
578            "self message",
579            false,
580        );
581
582        assert!(ch.parse_notification(&notif).is_none());
583    }
584
585    #[test]
586    fn skip_like_notifications() {
587        let ch = make_channel();
588        let notif = make_notification(
589            "like",
590            "user1.bsky.social",
591            "did:plc:user1",
592            "liked post",
593            false,
594        );
595
596        assert!(ch.parse_notification(&notif).is_none());
597    }
598
599    #[test]
600    fn skip_empty_text() {
601        let ch = make_channel();
602        let notif = make_notification("mention", "user1.bsky.social", "did:plc:user1", "", false);
603
604        assert!(ch.parse_notification(&notif).is_none());
605    }
606
607    #[test]
608    fn reply_target_encoding() {
609        let ch = make_channel();
610        let notif = make_notification(
611            "mention",
612            "user1.bsky.social",
613            "did:plc:user1",
614            "hello",
615            false,
616        );
617
618        let msg = ch.parse_notification(&notif).unwrap();
619        // reply_target should contain URI|CID
620        assert!(msg.reply_target.contains('|'));
621        let parts: Vec<&str> = msg.reply_target.splitn(2, '|').collect();
622        assert_eq!(parts.len(), 2);
623        assert!(parts[0].starts_with("at://"));
624    }
625
626    #[test]
627    fn send_message_formatting() {
628        // Verify reply target parsing
629        let reply_target = "at://did:plc:user1/app.bsky.feed.post/abc|bafyreitest";
630        let parts: Vec<&str> = reply_target.splitn(2, '|').collect();
631        assert_eq!(parts.len(), 2);
632        assert_eq!(parts[0], "at://did:plc:user1/app.bsky.feed.post/abc");
633        assert_eq!(parts[1], "bafyreitest");
634    }
635}