1use super::AppState;
4use axum::{
5 extract::State,
6 http::{HeaderMap, StatusCode, header},
7 response::{IntoResponse, Json},
8};
9use chrono::{DateTime, Utc};
10use parking_lot::Mutex;
11use rusqlite::Connection;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::path::{Path, PathBuf};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DeviceInfo {
19 pub id: String,
20 pub name: Option<String>,
21 pub device_type: Option<String>,
22 pub paired_at: DateTime<Utc>,
23 pub last_seen: DateTime<Utc>,
24 pub ip_address: Option<String>,
25 #[serde(default, skip_serializing_if = "Option::is_none")]
28 pub capabilities: Option<Vec<String>>,
29}
30
31#[derive(Debug)]
33pub struct DeviceRegistry {
34 cache: Mutex<HashMap<String, DeviceInfo>>,
35 db_path: PathBuf,
36}
37
38impl DeviceRegistry {
39 pub fn new(workspace_dir: &Path) -> Self {
40 let db_path = workspace_dir.join("devices.db");
41 let conn = Connection::open(&db_path).expect("Failed to open device registry database");
42 conn.execute_batch(
43 "PRAGMA journal_mode = WAL;
44 PRAGMA synchronous = NORMAL;
45 PRAGMA temp_store = MEMORY;
46 CREATE TABLE IF NOT EXISTS devices (
47 token_hash TEXT PRIMARY KEY,
48 id TEXT NOT NULL,
49 name TEXT,
50 device_type TEXT,
51 paired_at TEXT NOT NULL,
52 last_seen TEXT NOT NULL,
53 ip_address TEXT,
54 capabilities TEXT
55 )",
56 )
57 .expect("Failed to create devices table");
58
59 let _ = conn.execute("ALTER TABLE devices ADD COLUMN capabilities TEXT", []);
62
63 let mut cache = HashMap::new();
65 let mut stmt = conn
66 .prepare("SELECT token_hash, id, name, device_type, paired_at, last_seen, ip_address, capabilities FROM devices")
67 .expect("Failed to prepare device select");
68 let rows = stmt
69 .query_map([], |row| {
70 let token_hash: String = row.get(0)?;
71 let id: String = row.get(1)?;
72 let name: Option<String> = row.get(2)?;
73 let device_type: Option<String> = row.get(3)?;
74 let paired_at_str: String = row.get(4)?;
75 let last_seen_str: String = row.get(5)?;
76 let ip_address: Option<String> = row.get(6)?;
77 let capabilities_json: Option<String> = row.get(7)?;
78 let paired_at = DateTime::parse_from_rfc3339(&paired_at_str)
79 .map(|dt| dt.with_timezone(&Utc))
80 .unwrap_or_else(|_| Utc::now());
81 let last_seen = DateTime::parse_from_rfc3339(&last_seen_str)
82 .map(|dt| dt.with_timezone(&Utc))
83 .unwrap_or_else(|_| Utc::now());
84 let capabilities = capabilities_json
85 .as_deref()
86 .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok());
87 Ok((
88 token_hash,
89 DeviceInfo {
90 id,
91 name,
92 device_type,
93 paired_at,
94 last_seen,
95 ip_address,
96 capabilities,
97 },
98 ))
99 })
100 .expect("Failed to query devices");
101 for (hash, info) in rows.flatten() {
102 cache.insert(hash, info);
103 }
104
105 Self {
106 cache: Mutex::new(cache),
107 db_path,
108 }
109 }
110
111 fn open_db(&self) -> Connection {
112 let conn =
113 Connection::open(&self.db_path).expect("Failed to open device registry database");
114 conn.execute_batch(
115 "PRAGMA journal_mode = WAL;
116 PRAGMA synchronous = NORMAL;
117 PRAGMA temp_store = MEMORY;",
118 )
119 .expect("Failed to set device registry pragmas");
120 conn
121 }
122
123 pub fn register(&self, token_hash: String, info: DeviceInfo) {
124 let capabilities_json = info
125 .capabilities
126 .as_ref()
127 .and_then(|c| serde_json::to_string(c).ok());
128 let conn = self.open_db();
129 conn.execute(
130 "INSERT OR REPLACE INTO devices (token_hash, id, name, device_type, paired_at, last_seen, ip_address, capabilities) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
131 rusqlite::params![
132 token_hash,
133 info.id,
134 info.name,
135 info.device_type,
136 info.paired_at.to_rfc3339(),
137 info.last_seen.to_rfc3339(),
138 info.ip_address,
139 capabilities_json,
140 ],
141 )
142 .expect("Failed to insert device");
143 self.cache.lock().insert(token_hash, info);
144 }
145
146 pub fn list(&self) -> Vec<DeviceInfo> {
147 let conn = self.open_db();
148 let mut stmt = conn
149 .prepare("SELECT token_hash, id, name, device_type, paired_at, last_seen, ip_address, capabilities FROM devices")
150 .expect("Failed to prepare device select");
151 let rows = stmt
152 .query_map([], |row| {
153 let id: String = row.get(1)?;
154 let name: Option<String> = row.get(2)?;
155 let device_type: Option<String> = row.get(3)?;
156 let paired_at_str: String = row.get(4)?;
157 let last_seen_str: String = row.get(5)?;
158 let ip_address: Option<String> = row.get(6)?;
159 let capabilities_json: Option<String> = row.get(7)?;
160 let paired_at = DateTime::parse_from_rfc3339(&paired_at_str)
161 .map(|dt| dt.with_timezone(&Utc))
162 .unwrap_or_else(|_| Utc::now());
163 let last_seen = DateTime::parse_from_rfc3339(&last_seen_str)
164 .map(|dt| dt.with_timezone(&Utc))
165 .unwrap_or_else(|_| Utc::now());
166 let capabilities = capabilities_json
167 .as_deref()
168 .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok());
169 Ok(DeviceInfo {
170 id,
171 name,
172 device_type,
173 paired_at,
174 last_seen,
175 ip_address,
176 capabilities,
177 })
178 })
179 .expect("Failed to query devices");
180 rows.filter_map(|r| r.ok()).collect()
181 }
182
183 pub fn revoke(&self, device_id: &str) -> Result<Option<String>, rusqlite::Error> {
192 let conn = self.open_db();
193 let deleted: Option<String> = conn
194 .query_row(
195 "DELETE FROM devices WHERE id = ?1 RETURNING token_hash",
196 rusqlite::params![device_id],
197 |row| row.get::<_, String>(0),
198 )
199 .map(Some)
200 .or_else(|e| match e {
201 rusqlite::Error::QueryReturnedNoRows => Ok(None),
202 other => Err(other),
203 })?;
204 if let Some(hash) = deleted.as_ref() {
205 self.cache.lock().remove(hash);
206 }
207 Ok(deleted)
208 }
209
210 pub fn clear(&self) -> Result<usize, rusqlite::Error> {
215 let conn = self.open_db();
216 let removed = conn.execute("DELETE FROM devices", [])?;
217 self.cache.lock().clear();
218 Ok(removed)
219 }
220
221 pub fn update_last_seen(&self, token_hash: &str) {
222 let now = Utc::now();
223 let conn = self.open_db();
224 conn.execute(
225 "UPDATE devices SET last_seen = ?1 WHERE token_hash = ?2",
226 rusqlite::params![now.to_rfc3339(), token_hash],
227 )
228 .ok();
229 if let Some(device) = self.cache.lock().get_mut(token_hash) {
230 device.last_seen = now;
231 }
232 }
233
234 pub fn update_capabilities(&self, token_hash: &str, capabilities: Vec<String>) -> bool {
237 let json = serde_json::to_string(&capabilities).unwrap_or_else(|_| "[]".into());
238 let conn = self.open_db();
239 let updated = conn
240 .execute(
241 "UPDATE devices SET capabilities = ?1, last_seen = ?2 WHERE token_hash = ?3",
242 rusqlite::params![json, Utc::now().to_rfc3339(), token_hash],
243 )
244 .unwrap_or(0);
245 if updated > 0
246 && let Some(device) = self.cache.lock().get_mut(token_hash)
247 {
248 device.capabilities = Some(capabilities);
249 device.last_seen = Utc::now();
250 }
251 updated > 0
252 }
253
254 pub fn device_count(&self) -> usize {
255 self.cache.lock().len()
256 }
257}
258
259#[derive(Debug, Default)]
261pub struct PairingStore {
262 pending: Mutex<Vec<PendingPairing>>,
263}
264
265#[derive(Debug, Clone, Serialize)]
266struct PendingPairing {
267 code: String,
268 created_at: DateTime<Utc>,
269 expires_at: DateTime<Utc>,
270 client_ip: Option<String>,
271 attempts: u32,
272}
273
274impl PairingStore {
275 pub fn new() -> Self {
276 Self::default()
277 }
278
279 pub fn pending_count(&self) -> usize {
280 let mut pending = self.pending.lock();
281 pending.retain(|p| p.expires_at > Utc::now());
282 pending.len()
283 }
284}
285
286fn extract_bearer(headers: &HeaderMap) -> Option<&str> {
287 headers
288 .get(header::AUTHORIZATION)
289 .and_then(|v| v.to_str().ok())
290 .and_then(|auth| auth.strip_prefix("Bearer "))
291}
292
293fn require_auth(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, &'static str)> {
294 if state.pairing.require_pairing() {
295 let token = extract_bearer(headers).unwrap_or("");
296 if !state.pairing.is_authenticated(token) {
297 return Err((StatusCode::UNAUTHORIZED, "Unauthorized"));
298 }
299 }
300 Ok(())
301}
302
303pub async fn initiate_pairing(
305 State(state): State<AppState>,
306 headers: HeaderMap,
307) -> impl IntoResponse {
308 if let Err(e) = require_auth(&state, &headers) {
309 return e.into_response();
310 }
311
312 match state.pairing.generate_new_pairing_code() {
313 Some(code) => Json(serde_json::json!({
314 "pairing_code": code,
315 "message": "New pairing code generated"
316 }))
317 .into_response(),
318 None => (
319 StatusCode::SERVICE_UNAVAILABLE,
320 "Pairing is disabled or not available",
321 )
322 .into_response(),
323 }
324}
325
326pub async fn submit_pairing_enhanced(
328 State(state): State<AppState>,
329 headers: HeaderMap,
330 Json(body): Json<serde_json::Value>,
331) -> impl IntoResponse {
332 let code = body["code"].as_str().unwrap_or("");
333 let device_name = body["device_name"].as_str().map(String::from);
334 let device_type = body["device_type"].as_str().map(String::from);
335
336 let client_id = headers
337 .get("X-Forwarded-For")
338 .and_then(|v| v.to_str().ok())
339 .unwrap_or("unknown")
340 .to_string();
341
342 match state.pairing.try_pair(code, &client_id).await {
343 Ok(Some(token)) => {
344 let token_hash = {
346 use sha2::{Digest, Sha256};
347 let hash = Sha256::digest(token.as_bytes());
348 hex::encode(hash)
349 };
350 if let Some(ref registry) = state.device_registry {
351 registry.register(
352 token_hash,
353 DeviceInfo {
354 id: uuid::Uuid::new_v4().to_string(),
355 name: device_name,
356 device_type,
357 paired_at: Utc::now(),
358 last_seen: Utc::now(),
359 ip_address: Some(client_id),
360 capabilities: None,
361 },
362 );
363 }
364 if let Err(e) =
365 super::persist_pairing_tokens(state.config.clone(), &state.pairing).await
366 {
367 ::zeroclaw_log::record!(
368 ERROR,
369 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
370 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
371 .with_attrs(::serde_json::json!({"error": format!("{e}")})),
372 "pairing succeeded but token persistence failed"
373 );
374 return Json(serde_json::json!({
375 "paired": true,
376 "persisted": false,
377 "token": token,
378 "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
379 }))
380 .into_response();
381 }
382 Json(serde_json::json!({
383 "paired": true,
384 "persisted": true,
385 "token": token,
386 "message": "Pairing successful"
387 }))
388 .into_response()
389 }
390 Ok(None) => (StatusCode::BAD_REQUEST, "Invalid or expired pairing code").into_response(),
391 Err(lockout_secs) => (
392 StatusCode::TOO_MANY_REQUESTS,
393 format!("Too many attempts. Locked out for {lockout_secs}s"),
394 )
395 .into_response(),
396 }
397}
398
399pub async fn list_devices(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
401 if let Err(e) = require_auth(&state, &headers) {
402 return e.into_response();
403 }
404
405 let devices = state
406 .device_registry
407 .as_ref()
408 .map(|r| r.list())
409 .unwrap_or_default();
410
411 let count = devices.len();
412 Json(serde_json::json!({
413 "devices": devices,
414 "count": count
415 }))
416 .into_response()
417}
418
419pub async fn revoke_device(
421 State(state): State<AppState>,
422 headers: HeaderMap,
423 axum::extract::Path(device_id): axum::extract::Path<String>,
424) -> impl IntoResponse {
425 if let Err(e) = require_auth(&state, &headers) {
426 return e.into_response();
427 }
428
429 let Some(registry) = state.device_registry.as_ref() else {
430 return (
431 StatusCode::SERVICE_UNAVAILABLE,
432 "Device registry is disabled",
433 )
434 .into_response();
435 };
436
437 let token_hash = match registry.revoke(&device_id) {
438 Ok(Some(hash)) => hash,
439 Ok(None) => return (StatusCode::NOT_FOUND, "Device not found").into_response(),
440 Err(e) => {
441 return (
442 StatusCode::INTERNAL_SERVER_ERROR,
443 format!("Device registry error: {e}"),
444 )
445 .into_response();
446 }
447 };
448
449 state.pairing.revoke_token_hash(&token_hash);
450
451 if let Err(e) = super::persist_pairing_tokens(state.config.clone(), &state.pairing).await {
457 return (
458 StatusCode::INTERNAL_SERVER_ERROR,
459 format!("Token revoked in memory but config persist failed: {e}"),
460 )
461 .into_response();
462 }
463
464 Json(serde_json::json!({
465 "message": "Device revoked and bearer token invalidated",
466 "device_id": device_id,
467 }))
468 .into_response()
469}
470
471pub async fn update_my_capabilities(
476 State(state): State<AppState>,
477 headers: HeaderMap,
478 Json(body): Json<serde_json::Value>,
479) -> impl IntoResponse {
480 if let Err(e) = require_auth(&state, &headers) {
481 return e.into_response();
482 }
483
484 let token = match extract_bearer(&headers) {
485 Some(t) => t,
486 None => return (StatusCode::UNAUTHORIZED, "Missing bearer token").into_response(),
487 };
488 let token_hash = {
489 use sha2::{Digest, Sha256};
490 let hash = Sha256::digest(token.as_bytes());
491 hex::encode(hash)
492 };
493
494 let capabilities: Vec<String> = body
495 .get("capabilities")
496 .and_then(|v| v.as_array())
497 .map(|arr| {
498 arr.iter()
499 .filter_map(|v| v.as_str().map(String::from))
500 .collect()
501 })
502 .unwrap_or_default();
503
504 let registry = match state.device_registry.as_ref() {
505 Some(r) => r,
506 None => {
507 return (
508 StatusCode::SERVICE_UNAVAILABLE,
509 "Device registry is disabled",
510 )
511 .into_response();
512 }
513 };
514
515 if registry.update_capabilities(&token_hash, capabilities.clone()) {
516 Json(serde_json::json!({
517 "message": "Capabilities updated",
518 "capabilities": capabilities,
519 }))
520 .into_response()
521 } else {
522 (StatusCode::NOT_FOUND, "Device not found for this token").into_response()
523 }
524}
525
526pub async fn rotate_token(
543 State(state): State<AppState>,
544 headers: HeaderMap,
545 axum::extract::Path(device_id): axum::extract::Path<String>,
546) -> impl IntoResponse {
547 if let Err(e) = require_auth(&state, &headers) {
548 return e.into_response();
549 }
550
551 let Some(registry) = state.device_registry.as_ref() else {
552 return (
553 StatusCode::SERVICE_UNAVAILABLE,
554 "Device registry is disabled",
555 )
556 .into_response();
557 };
558
559 let token_hash = match registry.revoke(&device_id) {
560 Ok(Some(hash)) => hash,
561 Ok(None) => return (StatusCode::NOT_FOUND, "Device not found").into_response(),
562 Err(e) => {
563 return (
564 StatusCode::INTERNAL_SERVER_ERROR,
565 format!("Device registry error: {e}"),
566 )
567 .into_response();
568 }
569 };
570
571 state.pairing.revoke_token_hash(&token_hash);
572
573 if let Err(e) = super::persist_pairing_tokens(state.config.clone(), &state.pairing).await {
577 return (
578 StatusCode::INTERNAL_SERVER_ERROR,
579 format!("Token revoked in memory but config persist failed: {e}"),
580 )
581 .into_response();
582 }
583
584 match state.pairing.generate_pairing_code_if_vacant() {
589 Ok(code) => Json(serde_json::json!({
590 "device_id": device_id,
591 "pairing_code": code,
592 "message": "Old token revoked. Use this code to re-pair the device.",
593 }))
594 .into_response(),
595 Err(zeroclaw_config::pairing::GeneratePairingCodeError::Pending) => {
596 Json(serde_json::json!({
597 "device_id": device_id,
598 "pairing_code": null,
599 "message": "Old token revoked. A pairing code is already pending; use it or call again after it clears.",
600 }))
601 .into_response()
602 }
603 Err(zeroclaw_config::pairing::GeneratePairingCodeError::PairingDisabled) => {
604 Json(serde_json::json!({
605 "device_id": device_id,
606 "pairing_code": null,
607 "message": "Old token revoked. Pairing is disabled; cannot issue a new code.",
608 }))
609 .into_response()
610 }
611 }
612}