Skip to main content

zeroclaw_channels/
twitter.rs

1use async_trait::async_trait;
2use serde_json::json;
3use std::collections::HashSet;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use uuid::Uuid;
7use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
8
9const TWITTER_API_BASE: &str = "https://api.x.com/2";
10
11/// X/Twitter channel — uses the Twitter API v2 with OAuth 2.0 Bearer Token
12/// for sending tweets/DMs and filtered stream for receiving mentions.
13pub struct TwitterChannel {
14    bearer_token: String,
15    /// The alias key under `[channels.twitter.<alias>]` this handle is
16    /// bound to. Used to scope peer-group writes and resolver lookups.
17    alias: String,
18    /// Resolves inbound external peers from canonical state at message-time.
19    /// No cache (see AGENTS.md "ABSOLUTE RULE — SINGLE SOURCE OF TRUTH").
20    peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
21    /// Message deduplication set.
22    dedup: Arc<RwLock<HashSet<String>>>,
23}
24
25/// Deduplication set capacity — evict half of entries when full.
26const DEDUP_CAPACITY: usize = 10_000;
27
28impl TwitterChannel {
29    pub fn new(
30        bearer_token: String,
31        alias: impl Into<String>,
32        peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
33    ) -> Self {
34        Self {
35            bearer_token,
36            alias: alias.into(),
37            peer_resolver,
38            dedup: Arc::new(RwLock::new(HashSet::new())),
39        }
40    }
41
42    /// Return the alias under `[channels.twitter.<alias>]` that this
43    /// channel handle is bound to.
44    pub fn alias(&self) -> &str {
45        &self.alias
46    }
47
48    fn http_client(&self) -> reqwest::Client {
49        zeroclaw_config::schema::build_runtime_proxy_client("channel.twitter")
50    }
51
52    fn is_user_allowed(&self, user_id: &str) -> bool {
53        let peers = (self.peer_resolver)();
54        crate::allowlist::is_user_allowed(&peers, user_id, crate::allowlist::Match::Sensitive)
55    }
56
57    /// Check and insert tweet ID for deduplication.
58    async fn is_duplicate(&self, tweet_id: &str) -> bool {
59        if tweet_id.is_empty() {
60            return false;
61        }
62
63        let mut dedup = self.dedup.write().await;
64
65        if dedup.contains(tweet_id) {
66            return true;
67        }
68
69        if dedup.len() >= DEDUP_CAPACITY {
70            let to_remove: Vec<String> = dedup.iter().take(DEDUP_CAPACITY / 2).cloned().collect();
71            for key in to_remove {
72                dedup.remove(&key);
73            }
74        }
75
76        dedup.insert(tweet_id.to_string());
77        false
78    }
79
80    /// Get the authenticated user's ID for filtered stream rules.
81    async fn get_authenticated_user_id(&self) -> anyhow::Result<String> {
82        let resp = self
83            .http_client()
84            .get(format!("{TWITTER_API_BASE}/users/me"))
85            .bearer_auth(&self.bearer_token)
86            .send()
87            .await?;
88
89        if !resp.status().is_success() {
90            let status = resp.status();
91            let err = resp.text().await.unwrap_or_default();
92            anyhow::bail!("Twitter users/me failed ({status}): {err}");
93        }
94
95        let data: serde_json::Value = resp.json().await?;
96        let user_id = data
97            .get("data")
98            .and_then(|d| d.get("id"))
99            .and_then(|id| id.as_str())
100            .ok_or_else(|| {
101                ::zeroclaw_log::record!(
102                    WARN,
103                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
104                        .with_outcome(::zeroclaw_log::EventOutcome::Failure),
105                    "Missing user id in Twitter response"
106                );
107                anyhow::Error::msg("Missing user id in Twitter response")
108            })?
109            .to_string();
110
111        Ok(user_id)
112    }
113
114    /// Send a reply tweet.
115    async fn create_tweet(
116        &self,
117        text: &str,
118        reply_tweet_id: Option<&str>,
119    ) -> anyhow::Result<String> {
120        let mut body = json!({ "text": text });
121
122        if let Some(reply_id) = reply_tweet_id {
123            body["reply"] = json!({ "in_reply_to_tweet_id": reply_id });
124        }
125
126        let resp = self
127            .http_client()
128            .post(format!("{TWITTER_API_BASE}/tweets"))
129            .bearer_auth(&self.bearer_token)
130            .json(&body)
131            .send()
132            .await?;
133
134        if !resp.status().is_success() {
135            let status = resp.status();
136            let err = resp.text().await.unwrap_or_default();
137            anyhow::bail!("Twitter create tweet failed ({status}): {err}");
138        }
139
140        let data: serde_json::Value = resp.json().await?;
141        let tweet_id = data
142            .get("data")
143            .and_then(|d| d.get("id"))
144            .and_then(|id| id.as_str())
145            .unwrap_or("")
146            .to_string();
147
148        Ok(tweet_id)
149    }
150
151    /// Send a DM to a user.
152    async fn send_dm(&self, recipient_id: &str, text: &str) -> anyhow::Result<()> {
153        let body = json!({
154            "text": text,
155        });
156
157        let resp = self
158            .http_client()
159            .post(format!(
160                "{TWITTER_API_BASE}/dm_conversations/with/{recipient_id}/messages"
161            ))
162            .bearer_auth(&self.bearer_token)
163            .json(&body)
164            .send()
165            .await?;
166
167        if !resp.status().is_success() {
168            let status = resp.status();
169            let err = resp.text().await.unwrap_or_default();
170            anyhow::bail!("Twitter DM send failed ({status}): {err}");
171        }
172
173        Ok(())
174    }
175}
176
177impl ::zeroclaw_api::attribution::Attributable for TwitterChannel {
178    fn role(&self) -> ::zeroclaw_api::attribution::Role {
179        ::zeroclaw_api::attribution::Role::Channel(
180            ::zeroclaw_api::attribution::ChannelKind::Twitter,
181        )
182    }
183    fn alias(&self) -> &str {
184        &self.alias
185    }
186}
187
188#[async_trait]
189impl Channel for TwitterChannel {
190    fn name(&self) -> &str {
191        "twitter"
192    }
193
194    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
195        // recipient format: "dm:{user_id}" for DMs, "tweet:{tweet_id}" for replies
196        if let Some(user_id) = message.recipient.strip_prefix("dm:") {
197            // Twitter API enforces a 280 char limit on tweets but DMs can be up to 10000.
198            self.send_dm(user_id, &message.content).await
199        } else if let Some(tweet_id) = message.recipient.strip_prefix("tweet:") {
200            // Split long replies into tweet threads (280 char limit).
201            let chunks = split_tweet_text(&message.content, 280);
202            let mut reply_to = tweet_id.to_string();
203            for chunk in chunks {
204                reply_to = self.create_tweet(&chunk, Some(&reply_to)).await?;
205            }
206            Ok(())
207        } else {
208            // Default: treat as tweet reply
209            let chunks = split_tweet_text(&message.content, 280);
210            let mut reply_to = message.recipient.clone();
211            for chunk in chunks {
212                reply_to = self.create_tweet(&chunk, Some(&reply_to)).await?;
213            }
214            Ok(())
215        }
216    }
217
218    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
219        ::zeroclaw_log::record!(
220            INFO,
221            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
222            "authenticating..."
223        );
224        let bot_user_id = self.get_authenticated_user_id().await?;
225        ::zeroclaw_log::record!(
226            INFO,
227            ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
228                .with_attrs(::serde_json::json!({"bot_user_id": bot_user_id})),
229            "authenticated as user"
230        );
231
232        // Poll mentions timeline (filtered stream requires elevated access).
233        // Using mentions timeline polling as a more accessible approach.
234        let mut since_id: Option<String> = None;
235        let poll_interval = std::time::Duration::from_secs(15);
236
237        loop {
238            let mut url = format!(
239                "{TWITTER_API_BASE}/users/{bot_user_id}/mentions?tweet.fields=author_id,conversation_id,created_at&expansions=author_id&max_results=20"
240            );
241
242            if let Some(ref id) = since_id {
243                use std::fmt::Write;
244                let _ = write!(url, "&since_id={id}");
245            }
246
247            match self
248                .http_client()
249                .get(&url)
250                .bearer_auth(&self.bearer_token)
251                .send()
252                .await
253            {
254                Ok(resp) if resp.status().is_success() => {
255                    let data: serde_json::Value = match resp.json().await {
256                        Ok(d) => d,
257                        Err(e) => {
258                            ::zeroclaw_log::record!(
259                                WARN,
260                                ::zeroclaw_log::Event::new(
261                                    module_path!(),
262                                    ::zeroclaw_log::Action::Note
263                                )
264                                .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
265                                .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
266                                "failed to parse mentions response"
267                            );
268                            tokio::time::sleep(poll_interval).await;
269                            continue;
270                        }
271                    };
272
273                    if let Some(tweets) = data.get("data").and_then(|d| d.as_array()) {
274                        // Build user lookup map from includes
275                        let user_map: std::collections::HashMap<String, String> = data
276                            .get("includes")
277                            .and_then(|i| i.get("users"))
278                            .and_then(|u| u.as_array())
279                            .map(|users| {
280                                users
281                                    .iter()
282                                    .filter_map(|u| {
283                                        let id = u.get("id")?.as_str()?.to_string();
284                                        let username = u.get("username")?.as_str()?.to_string();
285                                        Some((id, username))
286                                    })
287                                    .collect()
288                            })
289                            .unwrap_or_default();
290
291                        // Process tweets in chronological order (oldest first)
292                        for tweet in tweets.iter().rev() {
293                            let tweet_id = tweet.get("id").and_then(|i| i.as_str()).unwrap_or("");
294                            let author_id = tweet
295                                .get("author_id")
296                                .and_then(|a| a.as_str())
297                                .unwrap_or("");
298                            let text = tweet.get("text").and_then(|t| t.as_str()).unwrap_or("");
299
300                            // Skip own tweets
301                            if author_id == bot_user_id {
302                                continue;
303                            }
304
305                            if self.is_duplicate(tweet_id).await {
306                                continue;
307                            }
308
309                            let username = user_map
310                                .get(author_id)
311                                .cloned()
312                                .unwrap_or_else(|| author_id.to_string());
313
314                            if !self.is_user_allowed(&username) && !self.is_user_allowed(author_id)
315                            {
316                                ::zeroclaw_log::record!(
317                                    DEBUG,
318                                    ::zeroclaw_log::Event::new(
319                                        module_path!(),
320                                        ::zeroclaw_log::Action::Note
321                                    )
322                                    .with_attrs(::serde_json::json!({"username": username})),
323                                    "ignoring mention from unauthorized user"
324                                );
325                                continue;
326                            }
327
328                            let trimmed_text = text.trim();
329                            if trimmed_text.is_empty() {
330                                continue;
331                            }
332
333                            let reply_target = format!("tweet:{tweet_id}");
334
335                            let channel_msg = ChannelMessage {
336                                id: Uuid::new_v4().to_string(),
337                                sender: username,
338                                reply_target,
339                                content: trimmed_text.to_string(),
340                                channel: "twitter".to_string(),
341                                channel_alias: Some(self.alias.clone()),
342                                timestamp: std::time::SystemTime::now()
343                                    .duration_since(std::time::UNIX_EPOCH)
344                                    .unwrap_or_default()
345                                    .as_secs(),
346                                thread_ts: tweet
347                                    .get("conversation_id")
348                                    .and_then(|c| c.as_str())
349                                    .map(|s| s.to_string()),
350                                interruption_scope_id: None,
351                                attachments: vec![],
352                                subject: None,
353                            };
354
355                            if tx.send(channel_msg).await.is_err() {
356                                ::zeroclaw_log::record!(
357                                    WARN,
358                                    ::zeroclaw_log::Event::new(
359                                        module_path!(),
360                                        ::zeroclaw_log::Action::Note
361                                    )
362                                    .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
363                                    "message channel closed"
364                                );
365                                return Ok(());
366                            }
367
368                            // Track newest ID for pagination
369                            if since_id.as_deref().is_none_or(|s| tweet_id > s) {
370                                since_id = Some(tweet_id.to_string());
371                            }
372                        }
373                    }
374
375                    // Update newest_id from meta
376                    if let Some(newest) = data
377                        .get("meta")
378                        .and_then(|m| m.get("newest_id"))
379                        .and_then(|n| n.as_str())
380                    {
381                        since_id = Some(newest.to_string());
382                    }
383                }
384                Ok(resp) => {
385                    let status = resp.status();
386                    if status.as_u16() == 429 {
387                        // Rate limited — back off
388                        ::zeroclaw_log::record!(
389                            WARN,
390                            ::zeroclaw_log::Event::new(
391                                module_path!(),
392                                ::zeroclaw_log::Action::Note
393                            )
394                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
395                            "rate limited, backing off 60s"
396                        );
397                        tokio::time::sleep(std::time::Duration::from_secs(60)).await;
398                        continue;
399                    }
400                    let err = resp.text().await.unwrap_or_default();
401                    ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"error": format!("{}", err), "status": status.to_string()})), "mentions request failed");
402                }
403                Err(e) => {
404                    ::zeroclaw_log::record!(
405                        WARN,
406                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
407                            .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
408                            .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
409                        "mentions request error"
410                    );
411                }
412            }
413
414            tokio::time::sleep(poll_interval).await;
415        }
416    }
417
418    async fn health_check(&self) -> bool {
419        self.get_authenticated_user_id().await.is_ok()
420    }
421}
422
423/// Split text into tweet-sized chunks, breaking at word boundaries.
424fn split_tweet_text(text: &str, max_len: usize) -> Vec<String> {
425    if text.len() <= max_len {
426        return vec![text.to_string()];
427    }
428
429    let mut chunks = Vec::new();
430    let mut remaining = text;
431
432    while !remaining.is_empty() {
433        if remaining.len() <= max_len {
434            chunks.push(remaining.to_string());
435            break;
436        }
437
438        // Find last space within limit
439        let split_at = remaining[..max_len].rfind(' ').unwrap_or(max_len);
440
441        chunks.push(remaining[..split_at].to_string());
442        remaining = remaining[split_at..].trim_start();
443    }
444
445    chunks
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451
452    #[test]
453    fn test_name() {
454        let ch = TwitterChannel::new("token".into(), "twitter_test_alias", Arc::new(Vec::new));
455        assert_eq!(ch.name(), "twitter");
456    }
457
458    #[test]
459    fn test_user_allowed_wildcard() {
460        let ch = TwitterChannel::new(
461            "token".into(),
462            "twitter_test_alias",
463            Arc::new(|| vec!["*".into()]),
464        );
465        assert!(ch.is_user_allowed("anyone"));
466    }
467
468    #[test]
469    fn test_user_allowed_specific() {
470        let ch = TwitterChannel::new(
471            "token".into(),
472            "twitter_test_alias",
473            Arc::new(|| vec!["user123".into()]),
474        );
475        assert!(ch.is_user_allowed("user123"));
476        assert!(!ch.is_user_allowed("other"));
477    }
478
479    #[test]
480    fn test_user_denied_empty() {
481        let ch = TwitterChannel::new("token".into(), "twitter_test_alias", Arc::new(Vec::new));
482        assert!(!ch.is_user_allowed("anyone"));
483    }
484
485    #[tokio::test]
486    async fn test_dedup() {
487        let ch = TwitterChannel::new("token".into(), "twitter_test_alias", Arc::new(Vec::new));
488        assert!(!ch.is_duplicate("tweet1").await);
489        assert!(ch.is_duplicate("tweet1").await);
490        assert!(!ch.is_duplicate("tweet2").await);
491    }
492
493    #[tokio::test]
494    async fn test_dedup_empty_id() {
495        let ch = TwitterChannel::new("token".into(), "twitter_test_alias", Arc::new(Vec::new));
496        assert!(!ch.is_duplicate("").await);
497        assert!(!ch.is_duplicate("").await);
498    }
499
500    #[test]
501    fn test_split_tweet_text_short() {
502        let chunks = split_tweet_text("hello", 280);
503        assert_eq!(chunks, vec!["hello"]);
504    }
505
506    #[test]
507    fn test_split_tweet_text_long() {
508        let text = "a ".repeat(200);
509        let chunks = split_tweet_text(text.trim(), 280);
510        assert!(chunks.len() > 1);
511        for chunk in &chunks {
512            assert!(chunk.len() <= 280);
513        }
514    }
515
516    #[test]
517    fn test_split_tweet_text_no_spaces() {
518        let text = "a".repeat(300);
519        let chunks = split_tweet_text(&text, 280);
520        assert_eq!(chunks.len(), 2);
521        assert_eq!(chunks[0].len(), 280);
522    }
523
524    #[test]
525    fn test_config_serde() {
526        let toml_str = r#"
527bearer_token = "AAAA"
528"#;
529        let config: zeroclaw_config::schema::TwitterConfig = toml::from_str(toml_str).unwrap();
530        assert_eq!(config.bearer_token, "AAAA");
531    }
532
533    #[test]
534    fn test_config_serde_defaults() {
535        let toml_str = r#"
536bearer_token = "tok"
537"#;
538        let config: zeroclaw_config::schema::TwitterConfig = toml::from_str(toml_str).unwrap();
539        assert_eq!(config.bearer_token, "tok");
540    }
541}