1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use parking_lot::Mutex;
4use serde::{Deserialize, Serialize};
5use std::time::{Duration, Instant};
6use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
7
8pub struct BlueskyChannel {
10 alias: String,
11 handle: String,
12 app_password: String,
13 auth: Mutex<BlueskyAuth>,
14}
15
16struct BlueskyAuth {
17 access_jwt: String,
18 refresh_jwt: String,
19 did: String,
20 expires_at: Instant,
21}
22
23const BSKY_API_BASE: &str = "https://bsky.social/xrpc";
24const POLL_INTERVAL: Duration = Duration::from_secs(5);
25
26#[derive(Deserialize)]
27struct CreateSessionResponse {
28 #[serde(rename = "accessJwt")]
29 access_jwt: String,
30 #[serde(rename = "refreshJwt")]
31 refresh_jwt: String,
32 did: String,
33}
34
35#[derive(Deserialize)]
36struct RefreshSessionResponse {
37 #[serde(rename = "accessJwt")]
38 access_jwt: String,
39 #[serde(rename = "refreshJwt")]
40 refresh_jwt: String,
41}
42
43#[derive(Deserialize)]
44struct NotificationListResponse {
45 notifications: Vec<Notification>,
46 cursor: Option<String>,
47}
48
49#[allow(dead_code)]
50#[derive(Deserialize)]
51struct Notification {
52 uri: String,
53 cid: String,
54 author: NotificationAuthor,
55 reason: String,
56 record: Option<serde_json::Value>,
57 #[serde(rename = "isRead")]
58 is_read: bool,
59 #[serde(rename = "indexedAt")]
60 indexed_at: String,
61}
62
63#[allow(dead_code)]
64#[derive(Deserialize)]
65struct NotificationAuthor {
66 did: String,
67 handle: String,
68 #[serde(rename = "displayName")]
69 display_name: Option<String>,
70}
71
72#[derive(Serialize)]
74struct CreateRecordRequest {
75 repo: String,
76 collection: String,
77 record: PostRecord,
78}
79
80#[derive(Serialize)]
81struct PostRecord {
82 #[serde(rename = "$type")]
83 record_type: String,
84 text: String,
85 #[serde(rename = "createdAt")]
86 created_at: String,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 reply: Option<ReplyRef>,
89}
90
91#[derive(Serialize)]
92struct ReplyRef {
93 root: PostRef,
94 parent: PostRef,
95}
96
97#[derive(Serialize)]
98struct PostRef {
99 uri: String,
100 cid: String,
101}
102
103impl BlueskyChannel {
104 pub fn new(alias: String, handle: String, app_password: String) -> Self {
105 Self {
106 alias,
107 handle,
108 app_password,
109 auth: Mutex::new(BlueskyAuth {
110 access_jwt: String::new(),
111 refresh_jwt: String::new(),
112 did: String::new(),
113 expires_at: Instant::now(),
114 }),
115 }
116 }
117
118 fn http_client(&self) -> reqwest::Client {
119 zeroclaw_config::schema::build_runtime_proxy_client("channel.bluesky")
120 }
121
122 async fn create_session(&self) -> Result<()> {
124 let client = self.http_client();
125 let resp = client
126 .post(format!("{BSKY_API_BASE}/com.atproto.server.createSession"))
127 .json(&serde_json::json!({
128 "identifier": self.handle,
129 "password": self.app_password,
130 }))
131 .send()
132 .await?;
133
134 let status = resp.status();
135 if !status.is_success() {
136 let body = resp
137 .text()
138 .await
139 .unwrap_or_else(|e| format!("<failed to read response: {e}>"));
140 bail!("createSession failed ({status}): {body}");
141 }
142
143 let session: CreateSessionResponse = resp.json().await?;
144 let mut auth = self.auth.lock();
145 auth.access_jwt = session.access_jwt;
146 auth.refresh_jwt = session.refresh_jwt;
147 auth.did = session.did;
148 auth.expires_at = Instant::now() + Duration::from_secs(90 * 60);
150 Ok(())
151 }
152
153 async fn refresh_session(&self) -> Result<()> {
155 let refresh_jwt = {
156 let auth = self.auth.lock();
157 auth.refresh_jwt.clone()
158 };
159
160 if refresh_jwt.is_empty() {
161 return self.create_session().await;
162 }
163
164 let client = self.http_client();
165 let resp = client
166 .post(format!("{BSKY_API_BASE}/com.atproto.server.refreshSession"))
167 .bearer_auth(&refresh_jwt)
168 .send()
169 .await?;
170
171 if !resp.status().is_success() {
172 ::zeroclaw_log::record!(
174 WARN,
175 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
176 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
177 "session refresh failed, re-authenticating"
178 );
179 return self.create_session().await;
180 }
181
182 let refreshed: RefreshSessionResponse = resp.json().await?;
183 let mut auth = self.auth.lock();
184 auth.access_jwt = refreshed.access_jwt;
185 auth.refresh_jwt = refreshed.refresh_jwt;
186 auth.expires_at = Instant::now() + Duration::from_secs(90 * 60);
187 Ok(())
188 }
189
190 async fn get_access_jwt(&self) -> Result<String> {
192 {
193 let auth = self.auth.lock();
194 if !auth.access_jwt.is_empty() && Instant::now() < auth.expires_at {
195 return Ok(auth.access_jwt.clone());
196 }
197 }
198 self.refresh_session().await?;
199 let auth = self.auth.lock();
200 Ok(auth.access_jwt.clone())
201 }
202
203 fn get_did(&self) -> String {
205 self.auth.lock().did.clone()
206 }
207
208 fn parse_notification(&self, notif: &Notification) -> Option<ChannelMessage> {
210 if notif.reason != "mention" && notif.reason != "reply" {
212 return None;
213 }
214
215 if notif.is_read {
217 return None;
218 }
219
220 if notif.author.did == self.get_did() {
222 return None;
223 }
224
225 let text = notif
227 .record
228 .as_ref()
229 .and_then(|r| r.get("text"))
230 .and_then(|t| t.as_str())
231 .unwrap_or("");
232
233 if text.is_empty() {
234 return None;
235 }
236
237 let timestamp = chrono::DateTime::parse_from_rfc3339(¬if.indexed_at)
239 .map(|dt| dt.timestamp().cast_unsigned())
240 .unwrap_or(0);
241
242 let cid = notif
244 .record
245 .as_ref()
246 .and_then(|r| r.get("cid"))
247 .and_then(|c| c.as_str())
248 .unwrap_or(¬if.cid);
249
250 let reply_target = format!("{}|{}", notif.uri, cid);
252
253 Some(ChannelMessage {
254 id: format!("bluesky_{}", notif.cid),
255 sender: notif.author.handle.clone(),
256 reply_target,
257 content: text.to_string(),
258 channel: "bluesky".to_string(),
259 channel_alias: None,
260 timestamp,
261 thread_ts: Some(notif.uri.clone()),
262 interruption_scope_id: None,
263 attachments: vec![],
264 subject: None,
265 })
266 }
267
268 async fn update_seen(&self, seen_at: &str) -> Result<()> {
270 let token = self.get_access_jwt().await?;
271 let client = self.http_client();
272
273 let resp = client
274 .post(format!("{BSKY_API_BASE}/app.bsky.notification.updateSeen"))
275 .bearer_auth(&token)
276 .json(&serde_json::json!({ "seenAt": seen_at }))
277 .send()
278 .await?;
279
280 if !resp.status().is_success() {
281 ::zeroclaw_log::record!(
282 WARN,
283 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
284 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
285 &format!("updateSeen failed: {}", resp.status())
286 );
287 }
288 Ok(())
289 }
290}
291
292impl ::zeroclaw_api::attribution::Attributable for BlueskyChannel {
293 fn role(&self) -> ::zeroclaw_api::attribution::Role {
294 ::zeroclaw_api::attribution::Role::Channel(
295 ::zeroclaw_api::attribution::ChannelKind::Bluesky,
296 )
297 }
298 fn alias(&self) -> &str {
299 &self.alias
300 }
301}
302
303#[async_trait]
304impl Channel for BlueskyChannel {
305 fn name(&self) -> &str {
306 "bluesky"
307 }
308
309 async fn send(&self, message: &SendMessage) -> Result<()> {
310 let token = self.get_access_jwt().await?;
311 let did = self.get_did();
312 let client = self.http_client();
313
314 let now = chrono::Utc::now().to_rfc3339();
315
316 let reply = if message.recipient.contains('|') {
318 let parts: Vec<&str> = message.recipient.splitn(2, '|').collect();
319 if parts.len() == 2 {
320 let uri = parts[0];
321 let cid = parts[1];
322 Some(ReplyRef {
323 root: PostRef {
324 uri: uri.to_string(),
325 cid: cid.to_string(),
326 },
327 parent: PostRef {
328 uri: uri.to_string(),
329 cid: cid.to_string(),
330 },
331 })
332 } else {
333 None
334 }
335 } else {
336 None
337 };
338
339 let text = if message.content.len() > 300 {
342 format!("{}...", &message.content[..297])
343 } else {
344 message.content.clone()
345 };
346
347 let request = CreateRecordRequest {
348 repo: did,
349 collection: "app.bsky.feed.post".to_string(),
350 record: PostRecord {
351 record_type: "app.bsky.feed.post".to_string(),
352 text,
353 created_at: now,
354 reply,
355 },
356 };
357
358 let resp = client
359 .post(format!("{BSKY_API_BASE}/com.atproto.repo.createRecord"))
360 .bearer_auth(&token)
361 .json(&request)
362 .send()
363 .await?;
364
365 let status = resp.status();
366 if !status.is_success() {
367 let body = resp
368 .text()
369 .await
370 .unwrap_or_else(|e| format!("<failed to read response: {e}>"));
371 bail!("post failed ({status}): {body}");
372 }
373
374 Ok(())
375 }
376
377 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> Result<()> {
378 self.create_session().await?;
380
381 ::zeroclaw_log::record!(
382 INFO,
383 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
384 &format!("channel listening as @{}...", self.handle)
385 );
386
387 loop {
388 tokio::time::sleep(POLL_INTERVAL).await;
389
390 let token = match self.get_access_jwt().await {
391 Ok(t) => t,
392 Err(e) => {
393 ::zeroclaw_log::record!(
394 WARN,
395 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
396 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
397 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
398 "auth error"
399 );
400 continue;
401 }
402 };
403
404 let client = self.http_client();
405 let resp = match client
406 .get(format!(
407 "{BSKY_API_BASE}/app.bsky.notification.listNotifications"
408 ))
409 .bearer_auth(&token)
410 .query(&[("limit", "25")])
411 .send()
412 .await
413 {
414 Ok(r) => r,
415 Err(e) => {
416 ::zeroclaw_log::record!(
417 WARN,
418 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
419 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
420 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
421 "poll error"
422 );
423 continue;
424 }
425 };
426
427 if !resp.status().is_success() {
428 ::zeroclaw_log::record!(
429 WARN,
430 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
431 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
432 &format!("notifications failed: {}", resp.status())
433 );
434 continue;
435 }
436
437 let listing: NotificationListResponse = match resp.json().await {
438 Ok(l) => l,
439 Err(e) => {
440 ::zeroclaw_log::record!(
441 WARN,
442 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
443 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
444 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
445 "parse error"
446 );
447 continue;
448 }
449 };
450
451 let mut latest_indexed_at: Option<String> = None;
452 for notif in &listing.notifications {
453 if let Some(msg) = self.parse_notification(notif) {
454 latest_indexed_at = Some(notif.indexed_at.clone());
455 if tx.send(msg).await.is_err() {
456 return Ok(());
457 }
458 }
459 }
460
461 if let Some(ref seen_at) = latest_indexed_at
463 && let Err(e) = self.update_seen(seen_at).await
464 {
465 ::zeroclaw_log::record!(
466 WARN,
467 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
468 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
469 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
470 "updateSeen error"
471 );
472 }
473
474 let _ = &listing.cursor; }
476 }
477
478 async fn health_check(&self) -> bool {
479 self.get_access_jwt().await.is_ok()
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486
487 fn make_channel() -> BlueskyChannel {
488 let ch = BlueskyChannel::new(
489 "testbot".into(),
490 "testbot.bsky.social".into(),
491 "app-password".into(),
492 );
493 {
495 let mut auth = ch.auth.lock();
496 auth.did = "did:plc:test123".into();
497 }
498 ch
499 }
500
501 fn make_notification(
502 reason: &str,
503 handle: &str,
504 did: &str,
505 text: &str,
506 is_read: bool,
507 ) -> Notification {
508 Notification {
509 uri: format!("at://{did}/app.bsky.feed.post/abc123"),
510 cid: "bafyreitest123".into(),
511 author: NotificationAuthor {
512 did: did.into(),
513 handle: handle.into(),
514 display_name: None,
515 },
516 reason: reason.into(),
517 record: Some(serde_json::json!({ "text": text })),
518 is_read,
519 indexed_at: "2026-01-15T10:00:00.000Z".into(),
520 }
521 }
522
523 #[test]
524 fn parse_mention_notification() {
525 let ch = make_channel();
526 let notif = make_notification(
527 "mention",
528 "user1.bsky.social",
529 "did:plc:user1",
530 "@testbot hello",
531 false,
532 );
533
534 let msg = ch.parse_notification(¬if).unwrap();
535 assert_eq!(msg.sender, "user1.bsky.social");
536 assert_eq!(msg.content, "@testbot hello");
537 assert_eq!(msg.channel, "bluesky");
538 assert!(msg.id.starts_with("bluesky_"));
539 }
540
541 #[test]
542 fn parse_reply_notification() {
543 let ch = make_channel();
544 let notif = make_notification(
545 "reply",
546 "user2.bsky.social",
547 "did:plc:user2",
548 "thanks for the info!",
549 false,
550 );
551
552 let msg = ch.parse_notification(¬if).unwrap();
553 assert_eq!(msg.sender, "user2.bsky.social");
554 assert_eq!(msg.content, "thanks for the info!");
555 }
556
557 #[test]
558 fn skip_read_notifications() {
559 let ch = make_channel();
560 let notif = make_notification(
561 "mention",
562 "user1.bsky.social",
563 "did:plc:user1",
564 "old message",
565 true,
566 );
567
568 assert!(ch.parse_notification(¬if).is_none());
569 }
570
571 #[test]
572 fn skip_own_notifications() {
573 let ch = make_channel();
574 let notif = make_notification(
575 "mention",
576 "testbot.bsky.social",
577 "did:plc:test123", "self message",
579 false,
580 );
581
582 assert!(ch.parse_notification(¬if).is_none());
583 }
584
585 #[test]
586 fn skip_like_notifications() {
587 let ch = make_channel();
588 let notif = make_notification(
589 "like",
590 "user1.bsky.social",
591 "did:plc:user1",
592 "liked post",
593 false,
594 );
595
596 assert!(ch.parse_notification(¬if).is_none());
597 }
598
599 #[test]
600 fn skip_empty_text() {
601 let ch = make_channel();
602 let notif = make_notification("mention", "user1.bsky.social", "did:plc:user1", "", false);
603
604 assert!(ch.parse_notification(¬if).is_none());
605 }
606
607 #[test]
608 fn reply_target_encoding() {
609 let ch = make_channel();
610 let notif = make_notification(
611 "mention",
612 "user1.bsky.social",
613 "did:plc:user1",
614 "hello",
615 false,
616 );
617
618 let msg = ch.parse_notification(¬if).unwrap();
619 assert!(msg.reply_target.contains('|'));
621 let parts: Vec<&str> = msg.reply_target.splitn(2, '|').collect();
622 assert_eq!(parts.len(), 2);
623 assert!(parts[0].starts_with("at://"));
624 }
625
626 #[test]
627 fn send_message_formatting() {
628 let reply_target = "at://did:plc:user1/app.bsky.feed.post/abc|bafyreitest";
630 let parts: Vec<&str> = reply_target.splitn(2, '|').collect();
631 assert_eq!(parts.len(), 2);
632 assert_eq!(parts[0], "at://did:plc:user1/app.bsky.feed.post/abc");
633 assert_eq!(parts[1], "bafyreitest");
634 }
635}