1use async_trait::async_trait;
2use serde_json::json;
3use std::collections::HashSet;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use uuid::Uuid;
7use zeroclaw_api::channel::{Channel, ChannelMessage, SendMessage};
8
9const TWITTER_API_BASE: &str = "https://api.x.com/2";
10
11pub struct TwitterChannel {
14 bearer_token: String,
15 alias: String,
18 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
21 dedup: Arc<RwLock<HashSet<String>>>,
23}
24
25const DEDUP_CAPACITY: usize = 10_000;
27
28impl TwitterChannel {
29 pub fn new(
30 bearer_token: String,
31 alias: impl Into<String>,
32 peer_resolver: Arc<dyn Fn() -> Vec<String> + Send + Sync>,
33 ) -> Self {
34 Self {
35 bearer_token,
36 alias: alias.into(),
37 peer_resolver,
38 dedup: Arc::new(RwLock::new(HashSet::new())),
39 }
40 }
41
42 pub fn alias(&self) -> &str {
45 &self.alias
46 }
47
48 fn http_client(&self) -> reqwest::Client {
49 zeroclaw_config::schema::build_runtime_proxy_client("channel.twitter")
50 }
51
52 fn is_user_allowed(&self, user_id: &str) -> bool {
53 let peers = (self.peer_resolver)();
54 crate::allowlist::is_user_allowed(&peers, user_id, crate::allowlist::Match::Sensitive)
55 }
56
57 async fn is_duplicate(&self, tweet_id: &str) -> bool {
59 if tweet_id.is_empty() {
60 return false;
61 }
62
63 let mut dedup = self.dedup.write().await;
64
65 if dedup.contains(tweet_id) {
66 return true;
67 }
68
69 if dedup.len() >= DEDUP_CAPACITY {
70 let to_remove: Vec<String> = dedup.iter().take(DEDUP_CAPACITY / 2).cloned().collect();
71 for key in to_remove {
72 dedup.remove(&key);
73 }
74 }
75
76 dedup.insert(tweet_id.to_string());
77 false
78 }
79
80 async fn get_authenticated_user_id(&self) -> anyhow::Result<String> {
82 let resp = self
83 .http_client()
84 .get(format!("{TWITTER_API_BASE}/users/me"))
85 .bearer_auth(&self.bearer_token)
86 .send()
87 .await?;
88
89 if !resp.status().is_success() {
90 let status = resp.status();
91 let err = resp.text().await.unwrap_or_default();
92 anyhow::bail!("Twitter users/me failed ({status}): {err}");
93 }
94
95 let data: serde_json::Value = resp.json().await?;
96 let user_id = data
97 .get("data")
98 .and_then(|d| d.get("id"))
99 .and_then(|id| id.as_str())
100 .ok_or_else(|| {
101 ::zeroclaw_log::record!(
102 WARN,
103 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Reject)
104 .with_outcome(::zeroclaw_log::EventOutcome::Failure),
105 "Missing user id in Twitter response"
106 );
107 anyhow::Error::msg("Missing user id in Twitter response")
108 })?
109 .to_string();
110
111 Ok(user_id)
112 }
113
114 async fn create_tweet(
116 &self,
117 text: &str,
118 reply_tweet_id: Option<&str>,
119 ) -> anyhow::Result<String> {
120 let mut body = json!({ "text": text });
121
122 if let Some(reply_id) = reply_tweet_id {
123 body["reply"] = json!({ "in_reply_to_tweet_id": reply_id });
124 }
125
126 let resp = self
127 .http_client()
128 .post(format!("{TWITTER_API_BASE}/tweets"))
129 .bearer_auth(&self.bearer_token)
130 .json(&body)
131 .send()
132 .await?;
133
134 if !resp.status().is_success() {
135 let status = resp.status();
136 let err = resp.text().await.unwrap_or_default();
137 anyhow::bail!("Twitter create tweet failed ({status}): {err}");
138 }
139
140 let data: serde_json::Value = resp.json().await?;
141 let tweet_id = data
142 .get("data")
143 .and_then(|d| d.get("id"))
144 .and_then(|id| id.as_str())
145 .unwrap_or("")
146 .to_string();
147
148 Ok(tweet_id)
149 }
150
151 async fn send_dm(&self, recipient_id: &str, text: &str) -> anyhow::Result<()> {
153 let body = json!({
154 "text": text,
155 });
156
157 let resp = self
158 .http_client()
159 .post(format!(
160 "{TWITTER_API_BASE}/dm_conversations/with/{recipient_id}/messages"
161 ))
162 .bearer_auth(&self.bearer_token)
163 .json(&body)
164 .send()
165 .await?;
166
167 if !resp.status().is_success() {
168 let status = resp.status();
169 let err = resp.text().await.unwrap_or_default();
170 anyhow::bail!("Twitter DM send failed ({status}): {err}");
171 }
172
173 Ok(())
174 }
175}
176
177impl ::zeroclaw_api::attribution::Attributable for TwitterChannel {
178 fn role(&self) -> ::zeroclaw_api::attribution::Role {
179 ::zeroclaw_api::attribution::Role::Channel(
180 ::zeroclaw_api::attribution::ChannelKind::Twitter,
181 )
182 }
183 fn alias(&self) -> &str {
184 &self.alias
185 }
186}
187
188#[async_trait]
189impl Channel for TwitterChannel {
190 fn name(&self) -> &str {
191 "twitter"
192 }
193
194 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
195 if let Some(user_id) = message.recipient.strip_prefix("dm:") {
197 self.send_dm(user_id, &message.content).await
199 } else if let Some(tweet_id) = message.recipient.strip_prefix("tweet:") {
200 let chunks = split_tweet_text(&message.content, 280);
202 let mut reply_to = tweet_id.to_string();
203 for chunk in chunks {
204 reply_to = self.create_tweet(&chunk, Some(&reply_to)).await?;
205 }
206 Ok(())
207 } else {
208 let chunks = split_tweet_text(&message.content, 280);
210 let mut reply_to = message.recipient.clone();
211 for chunk in chunks {
212 reply_to = self.create_tweet(&chunk, Some(&reply_to)).await?;
213 }
214 Ok(())
215 }
216 }
217
218 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
219 ::zeroclaw_log::record!(
220 INFO,
221 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note),
222 "authenticating..."
223 );
224 let bot_user_id = self.get_authenticated_user_id().await?;
225 ::zeroclaw_log::record!(
226 INFO,
227 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
228 .with_attrs(::serde_json::json!({"bot_user_id": bot_user_id})),
229 "authenticated as user"
230 );
231
232 let mut since_id: Option<String> = None;
235 let poll_interval = std::time::Duration::from_secs(15);
236
237 loop {
238 let mut url = format!(
239 "{TWITTER_API_BASE}/users/{bot_user_id}/mentions?tweet.fields=author_id,conversation_id,created_at&expansions=author_id&max_results=20"
240 );
241
242 if let Some(ref id) = since_id {
243 use std::fmt::Write;
244 let _ = write!(url, "&since_id={id}");
245 }
246
247 match self
248 .http_client()
249 .get(&url)
250 .bearer_auth(&self.bearer_token)
251 .send()
252 .await
253 {
254 Ok(resp) if resp.status().is_success() => {
255 let data: serde_json::Value = match resp.json().await {
256 Ok(d) => d,
257 Err(e) => {
258 ::zeroclaw_log::record!(
259 WARN,
260 ::zeroclaw_log::Event::new(
261 module_path!(),
262 ::zeroclaw_log::Action::Note
263 )
264 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
265 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
266 "failed to parse mentions response"
267 );
268 tokio::time::sleep(poll_interval).await;
269 continue;
270 }
271 };
272
273 if let Some(tweets) = data.get("data").and_then(|d| d.as_array()) {
274 let user_map: std::collections::HashMap<String, String> = data
276 .get("includes")
277 .and_then(|i| i.get("users"))
278 .and_then(|u| u.as_array())
279 .map(|users| {
280 users
281 .iter()
282 .filter_map(|u| {
283 let id = u.get("id")?.as_str()?.to_string();
284 let username = u.get("username")?.as_str()?.to_string();
285 Some((id, username))
286 })
287 .collect()
288 })
289 .unwrap_or_default();
290
291 for tweet in tweets.iter().rev() {
293 let tweet_id = tweet.get("id").and_then(|i| i.as_str()).unwrap_or("");
294 let author_id = tweet
295 .get("author_id")
296 .and_then(|a| a.as_str())
297 .unwrap_or("");
298 let text = tweet.get("text").and_then(|t| t.as_str()).unwrap_or("");
299
300 if author_id == bot_user_id {
302 continue;
303 }
304
305 if self.is_duplicate(tweet_id).await {
306 continue;
307 }
308
309 let username = user_map
310 .get(author_id)
311 .cloned()
312 .unwrap_or_else(|| author_id.to_string());
313
314 if !self.is_user_allowed(&username) && !self.is_user_allowed(author_id)
315 {
316 ::zeroclaw_log::record!(
317 DEBUG,
318 ::zeroclaw_log::Event::new(
319 module_path!(),
320 ::zeroclaw_log::Action::Note
321 )
322 .with_attrs(::serde_json::json!({"username": username})),
323 "ignoring mention from unauthorized user"
324 );
325 continue;
326 }
327
328 let trimmed_text = text.trim();
329 if trimmed_text.is_empty() {
330 continue;
331 }
332
333 let reply_target = format!("tweet:{tweet_id}");
334
335 let channel_msg = ChannelMessage {
336 id: Uuid::new_v4().to_string(),
337 sender: username,
338 reply_target,
339 content: trimmed_text.to_string(),
340 channel: "twitter".to_string(),
341 channel_alias: Some(self.alias.clone()),
342 timestamp: std::time::SystemTime::now()
343 .duration_since(std::time::UNIX_EPOCH)
344 .unwrap_or_default()
345 .as_secs(),
346 thread_ts: tweet
347 .get("conversation_id")
348 .and_then(|c| c.as_str())
349 .map(|s| s.to_string()),
350 interruption_scope_id: None,
351 attachments: vec![],
352 subject: None,
353 };
354
355 if tx.send(channel_msg).await.is_err() {
356 ::zeroclaw_log::record!(
357 WARN,
358 ::zeroclaw_log::Event::new(
359 module_path!(),
360 ::zeroclaw_log::Action::Note
361 )
362 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
363 "message channel closed"
364 );
365 return Ok(());
366 }
367
368 if since_id.as_deref().is_none_or(|s| tweet_id > s) {
370 since_id = Some(tweet_id.to_string());
371 }
372 }
373 }
374
375 if let Some(newest) = data
377 .get("meta")
378 .and_then(|m| m.get("newest_id"))
379 .and_then(|n| n.as_str())
380 {
381 since_id = Some(newest.to_string());
382 }
383 }
384 Ok(resp) => {
385 let status = resp.status();
386 if status.as_u16() == 429 {
387 ::zeroclaw_log::record!(
389 WARN,
390 ::zeroclaw_log::Event::new(
391 module_path!(),
392 ::zeroclaw_log::Action::Note
393 )
394 .with_outcome(::zeroclaw_log::EventOutcome::Unknown),
395 "rate limited, backing off 60s"
396 );
397 tokio::time::sleep(std::time::Duration::from_secs(60)).await;
398 continue;
399 }
400 let err = resp.text().await.unwrap_or_default();
401 ::zeroclaw_log::record!(WARN, ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note).with_outcome(::zeroclaw_log::EventOutcome::Unknown).with_attrs(::serde_json::json!({"error": format!("{}", err), "status": status.to_string()})), "mentions request failed");
402 }
403 Err(e) => {
404 ::zeroclaw_log::record!(
405 WARN,
406 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
407 .with_outcome(::zeroclaw_log::EventOutcome::Unknown)
408 .with_attrs(::serde_json::json!({"error": format!("{}", e)})),
409 "mentions request error"
410 );
411 }
412 }
413
414 tokio::time::sleep(poll_interval).await;
415 }
416 }
417
418 async fn health_check(&self) -> bool {
419 self.get_authenticated_user_id().await.is_ok()
420 }
421}
422
423fn split_tweet_text(text: &str, max_len: usize) -> Vec<String> {
425 if text.len() <= max_len {
426 return vec![text.to_string()];
427 }
428
429 let mut chunks = Vec::new();
430 let mut remaining = text;
431
432 while !remaining.is_empty() {
433 if remaining.len() <= max_len {
434 chunks.push(remaining.to_string());
435 break;
436 }
437
438 let split_at = remaining[..max_len].rfind(' ').unwrap_or(max_len);
440
441 chunks.push(remaining[..split_at].to_string());
442 remaining = remaining[split_at..].trim_start();
443 }
444
445 chunks
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451
452 #[test]
453 fn test_name() {
454 let ch = TwitterChannel::new("token".into(), "twitter_test_alias", Arc::new(Vec::new));
455 assert_eq!(ch.name(), "twitter");
456 }
457
458 #[test]
459 fn test_user_allowed_wildcard() {
460 let ch = TwitterChannel::new(
461 "token".into(),
462 "twitter_test_alias",
463 Arc::new(|| vec!["*".into()]),
464 );
465 assert!(ch.is_user_allowed("anyone"));
466 }
467
468 #[test]
469 fn test_user_allowed_specific() {
470 let ch = TwitterChannel::new(
471 "token".into(),
472 "twitter_test_alias",
473 Arc::new(|| vec!["user123".into()]),
474 );
475 assert!(ch.is_user_allowed("user123"));
476 assert!(!ch.is_user_allowed("other"));
477 }
478
479 #[test]
480 fn test_user_denied_empty() {
481 let ch = TwitterChannel::new("token".into(), "twitter_test_alias", Arc::new(Vec::new));
482 assert!(!ch.is_user_allowed("anyone"));
483 }
484
485 #[tokio::test]
486 async fn test_dedup() {
487 let ch = TwitterChannel::new("token".into(), "twitter_test_alias", Arc::new(Vec::new));
488 assert!(!ch.is_duplicate("tweet1").await);
489 assert!(ch.is_duplicate("tweet1").await);
490 assert!(!ch.is_duplicate("tweet2").await);
491 }
492
493 #[tokio::test]
494 async fn test_dedup_empty_id() {
495 let ch = TwitterChannel::new("token".into(), "twitter_test_alias", Arc::new(Vec::new));
496 assert!(!ch.is_duplicate("").await);
497 assert!(!ch.is_duplicate("").await);
498 }
499
500 #[test]
501 fn test_split_tweet_text_short() {
502 let chunks = split_tweet_text("hello", 280);
503 assert_eq!(chunks, vec!["hello"]);
504 }
505
506 #[test]
507 fn test_split_tweet_text_long() {
508 let text = "a ".repeat(200);
509 let chunks = split_tweet_text(text.trim(), 280);
510 assert!(chunks.len() > 1);
511 for chunk in &chunks {
512 assert!(chunk.len() <= 280);
513 }
514 }
515
516 #[test]
517 fn test_split_tweet_text_no_spaces() {
518 let text = "a".repeat(300);
519 let chunks = split_tweet_text(&text, 280);
520 assert_eq!(chunks.len(), 2);
521 assert_eq!(chunks[0].len(), 280);
522 }
523
524 #[test]
525 fn test_config_serde() {
526 let toml_str = r#"
527bearer_token = "AAAA"
528"#;
529 let config: zeroclaw_config::schema::TwitterConfig = toml::from_str(toml_str).unwrap();
530 assert_eq!(config.bearer_token, "AAAA");
531 }
532
533 #[test]
534 fn test_config_serde_defaults() {
535 let toml_str = r#"
536bearer_token = "tok"
537"#;
538 let config: zeroclaw_config::schema::TwitterConfig = toml::from_str(toml_str).unwrap();
539 assert_eq!(config.bearer_token, "tok");
540 }
541}