Skip to main content

zeroclaw_gateway/
api_pairing.rs

1//! Device management and pairing API handlers.
2
3use super::AppState;
4use axum::{
5    extract::State,
6    http::{HeaderMap, StatusCode, header},
7    response::{IntoResponse, Json},
8};
9use chrono::{DateTime, Utc};
10use parking_lot::Mutex;
11use rusqlite::Connection;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::path::{Path, PathBuf};
15
16/// Metadata about a paired device.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DeviceInfo {
19    pub id: String,
20    pub name: Option<String>,
21    pub device_type: Option<String>,
22    pub paired_at: DateTime<Utc>,
23    pub last_seen: DateTime<Utc>,
24    pub ip_address: Option<String>,
25    /// macOS TCC permissions (and equivalent on other OSes) the device reports as granted.
26    /// Pushed by the desktop app via POST /api/devices/me/capabilities.
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    pub capabilities: Option<Vec<String>>,
29}
30
31/// Registry of paired devices backed by SQLite.
32#[derive(Debug)]
33pub struct DeviceRegistry {
34    cache: Mutex<HashMap<String, DeviceInfo>>,
35    db_path: PathBuf,
36}
37
38impl DeviceRegistry {
39    pub fn new(workspace_dir: &Path) -> Self {
40        let db_path = workspace_dir.join("devices.db");
41        let conn = Connection::open(&db_path).expect("Failed to open device registry database");
42        conn.execute_batch(
43            "PRAGMA journal_mode = WAL;
44             PRAGMA synchronous = NORMAL;
45             PRAGMA temp_store = MEMORY;
46             CREATE TABLE IF NOT EXISTS devices (
47                token_hash TEXT PRIMARY KEY,
48                id TEXT NOT NULL,
49                name TEXT,
50                device_type TEXT,
51                paired_at TEXT NOT NULL,
52                last_seen TEXT NOT NULL,
53                ip_address TEXT,
54                capabilities TEXT
55            )",
56        )
57        .expect("Failed to create devices table");
58
59        // Additive migration for DBs created before the capabilities column existed.
60        // SQLite has no IF NOT EXISTS for columns; the duplicate-column error here is benign.
61        let _ = conn.execute("ALTER TABLE devices ADD COLUMN capabilities TEXT", []);
62
63        // Warm the in-memory cache from DB
64        let mut cache = HashMap::new();
65        let mut stmt = conn
66            .prepare("SELECT token_hash, id, name, device_type, paired_at, last_seen, ip_address, capabilities FROM devices")
67            .expect("Failed to prepare device select");
68        let rows = stmt
69            .query_map([], |row| {
70                let token_hash: String = row.get(0)?;
71                let id: String = row.get(1)?;
72                let name: Option<String> = row.get(2)?;
73                let device_type: Option<String> = row.get(3)?;
74                let paired_at_str: String = row.get(4)?;
75                let last_seen_str: String = row.get(5)?;
76                let ip_address: Option<String> = row.get(6)?;
77                let capabilities_json: Option<String> = row.get(7)?;
78                let paired_at = DateTime::parse_from_rfc3339(&paired_at_str)
79                    .map(|dt| dt.with_timezone(&Utc))
80                    .unwrap_or_else(|_| Utc::now());
81                let last_seen = DateTime::parse_from_rfc3339(&last_seen_str)
82                    .map(|dt| dt.with_timezone(&Utc))
83                    .unwrap_or_else(|_| Utc::now());
84                let capabilities = capabilities_json
85                    .as_deref()
86                    .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok());
87                Ok((
88                    token_hash,
89                    DeviceInfo {
90                        id,
91                        name,
92                        device_type,
93                        paired_at,
94                        last_seen,
95                        ip_address,
96                        capabilities,
97                    },
98                ))
99            })
100            .expect("Failed to query devices");
101        for (hash, info) in rows.flatten() {
102            cache.insert(hash, info);
103        }
104
105        Self {
106            cache: Mutex::new(cache),
107            db_path,
108        }
109    }
110
111    fn open_db(&self) -> Connection {
112        let conn =
113            Connection::open(&self.db_path).expect("Failed to open device registry database");
114        conn.execute_batch(
115            "PRAGMA journal_mode = WAL;
116             PRAGMA synchronous = NORMAL;
117             PRAGMA temp_store = MEMORY;",
118        )
119        .expect("Failed to set device registry pragmas");
120        conn
121    }
122
123    pub fn register(&self, token_hash: String, info: DeviceInfo) {
124        let capabilities_json = info
125            .capabilities
126            .as_ref()
127            .and_then(|c| serde_json::to_string(c).ok());
128        let conn = self.open_db();
129        conn.execute(
130            "INSERT OR REPLACE INTO devices (token_hash, id, name, device_type, paired_at, last_seen, ip_address, capabilities) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
131            rusqlite::params![
132                token_hash,
133                info.id,
134                info.name,
135                info.device_type,
136                info.paired_at.to_rfc3339(),
137                info.last_seen.to_rfc3339(),
138                info.ip_address,
139                capabilities_json,
140            ],
141        )
142        .expect("Failed to insert device");
143        self.cache.lock().insert(token_hash, info);
144    }
145
146    pub fn list(&self) -> Vec<DeviceInfo> {
147        let conn = self.open_db();
148        let mut stmt = conn
149            .prepare("SELECT token_hash, id, name, device_type, paired_at, last_seen, ip_address, capabilities FROM devices")
150            .expect("Failed to prepare device select");
151        let rows = stmt
152            .query_map([], |row| {
153                let id: String = row.get(1)?;
154                let name: Option<String> = row.get(2)?;
155                let device_type: Option<String> = row.get(3)?;
156                let paired_at_str: String = row.get(4)?;
157                let last_seen_str: String = row.get(5)?;
158                let ip_address: Option<String> = row.get(6)?;
159                let capabilities_json: Option<String> = row.get(7)?;
160                let paired_at = DateTime::parse_from_rfc3339(&paired_at_str)
161                    .map(|dt| dt.with_timezone(&Utc))
162                    .unwrap_or_else(|_| Utc::now());
163                let last_seen = DateTime::parse_from_rfc3339(&last_seen_str)
164                    .map(|dt| dt.with_timezone(&Utc))
165                    .unwrap_or_else(|_| Utc::now());
166                let capabilities = capabilities_json
167                    .as_deref()
168                    .and_then(|s| serde_json::from_str::<Vec<String>>(s).ok());
169                Ok(DeviceInfo {
170                    id,
171                    name,
172                    device_type,
173                    paired_at,
174                    last_seen,
175                    ip_address,
176                    capabilities,
177                })
178            })
179            .expect("Failed to query devices");
180        rows.filter_map(|r| r.ok()).collect()
181    }
182
183    /// Delete a device by id and return its SHA-256 token hash so the caller
184    /// can revoke the matching bearer token.
185    ///
186    /// `Ok(None)` means the device did not exist; real SQLite errors are
187    /// propagated so handlers can distinguish "nothing to do" from "DB is
188    /// broken" — confusing the two during incident response is dangerous.
189    /// Uses `DELETE … RETURNING` (SQLite ≥ 3.35) so the read and delete are
190    /// atomic under concurrent revoke calls.
191    pub fn revoke(&self, device_id: &str) -> Result<Option<String>, rusqlite::Error> {
192        let conn = self.open_db();
193        let deleted: Option<String> = conn
194            .query_row(
195                "DELETE FROM devices WHERE id = ?1 RETURNING token_hash",
196                rusqlite::params![device_id],
197                |row| row.get::<_, String>(0),
198            )
199            .map(Some)
200            .or_else(|e| match e {
201                rusqlite::Error::QueryReturnedNoRows => Ok(None),
202                other => Err(other),
203            })?;
204        if let Some(hash) = deleted.as_ref() {
205            self.cache.lock().remove(hash);
206        }
207        Ok(deleted)
208    }
209
210    /// Delete every device row and clear the in-memory cache. Returns the
211    /// number of rows removed. Pairs with `PairingGuard::revoke_all_tokens`
212    /// for the "rotate after compromise — nuke everything" path so the device
213    /// registry does not silently coexist with the now-revoked token set.
214    pub fn clear(&self) -> Result<usize, rusqlite::Error> {
215        let conn = self.open_db();
216        let removed = conn.execute("DELETE FROM devices", [])?;
217        self.cache.lock().clear();
218        Ok(removed)
219    }
220
221    pub fn update_last_seen(&self, token_hash: &str) {
222        let now = Utc::now();
223        let conn = self.open_db();
224        conn.execute(
225            "UPDATE devices SET last_seen = ?1 WHERE token_hash = ?2",
226            rusqlite::params![now.to_rfc3339(), token_hash],
227        )
228        .ok();
229        if let Some(device) = self.cache.lock().get_mut(token_hash) {
230            device.last_seen = now;
231        }
232    }
233
234    /// Replace the capability list for the device identified by `token_hash`.
235    /// Returns true if a row was updated.
236    pub fn update_capabilities(&self, token_hash: &str, capabilities: Vec<String>) -> bool {
237        let json = serde_json::to_string(&capabilities).unwrap_or_else(|_| "[]".into());
238        let conn = self.open_db();
239        let updated = conn
240            .execute(
241                "UPDATE devices SET capabilities = ?1, last_seen = ?2 WHERE token_hash = ?3",
242                rusqlite::params![json, Utc::now().to_rfc3339(), token_hash],
243            )
244            .unwrap_or(0);
245        if updated > 0
246            && let Some(device) = self.cache.lock().get_mut(token_hash)
247        {
248            device.capabilities = Some(capabilities);
249            device.last_seen = Utc::now();
250        }
251        updated > 0
252    }
253
254    pub fn device_count(&self) -> usize {
255        self.cache.lock().len()
256    }
257}
258
259/// Store for pending pairing requests.
260#[derive(Debug, Default)]
261pub struct PairingStore {
262    pending: Mutex<Vec<PendingPairing>>,
263}
264
265#[derive(Debug, Clone, Serialize)]
266struct PendingPairing {
267    code: String,
268    created_at: DateTime<Utc>,
269    expires_at: DateTime<Utc>,
270    client_ip: Option<String>,
271    attempts: u32,
272}
273
274impl PairingStore {
275    pub fn new() -> Self {
276        Self::default()
277    }
278
279    pub fn pending_count(&self) -> usize {
280        let mut pending = self.pending.lock();
281        pending.retain(|p| p.expires_at > Utc::now());
282        pending.len()
283    }
284}
285
286fn extract_bearer(headers: &HeaderMap) -> Option<&str> {
287    headers
288        .get(header::AUTHORIZATION)
289        .and_then(|v| v.to_str().ok())
290        .and_then(|auth| auth.strip_prefix("Bearer "))
291}
292
293fn require_auth(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, &'static str)> {
294    if state.pairing.require_pairing() {
295        let token = extract_bearer(headers).unwrap_or("");
296        if !state.pairing.is_authenticated(token) {
297            return Err((StatusCode::UNAUTHORIZED, "Unauthorized"));
298        }
299    }
300    Ok(())
301}
302
303/// POST /api/pairing/initiate — initiate a new pairing session
304pub async fn initiate_pairing(
305    State(state): State<AppState>,
306    headers: HeaderMap,
307) -> impl IntoResponse {
308    if let Err(e) = require_auth(&state, &headers) {
309        return e.into_response();
310    }
311
312    match state.pairing.generate_new_pairing_code() {
313        Some(code) => Json(serde_json::json!({
314            "pairing_code": code,
315            "message": "New pairing code generated"
316        }))
317        .into_response(),
318        None => (
319            StatusCode::SERVICE_UNAVAILABLE,
320            "Pairing is disabled or not available",
321        )
322            .into_response(),
323    }
324}
325
326/// POST /api/pair — submit pairing code (for new device pairing)
327pub async fn submit_pairing_enhanced(
328    State(state): State<AppState>,
329    headers: HeaderMap,
330    Json(body): Json<serde_json::Value>,
331) -> impl IntoResponse {
332    let code = body["code"].as_str().unwrap_or("");
333    let device_name = body["device_name"].as_str().map(String::from);
334    let device_type = body["device_type"].as_str().map(String::from);
335
336    let client_id = headers
337        .get("X-Forwarded-For")
338        .and_then(|v| v.to_str().ok())
339        .unwrap_or("unknown")
340        .to_string();
341
342    match state.pairing.try_pair(code, &client_id).await {
343        Ok(Some(token)) => {
344            // Register the new device
345            let token_hash = {
346                use sha2::{Digest, Sha256};
347                let hash = Sha256::digest(token.as_bytes());
348                hex::encode(hash)
349            };
350            if let Some(ref registry) = state.device_registry {
351                registry.register(
352                    token_hash,
353                    DeviceInfo {
354                        id: uuid::Uuid::new_v4().to_string(),
355                        name: device_name,
356                        device_type,
357                        paired_at: Utc::now(),
358                        last_seen: Utc::now(),
359                        ip_address: Some(client_id),
360                        capabilities: None,
361                    },
362                );
363            }
364            if let Err(e) =
365                super::persist_pairing_tokens(state.config.clone(), &state.pairing).await
366            {
367                ::zeroclaw_log::record!(
368                    ERROR,
369                    ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
370                        .with_outcome(::zeroclaw_log::EventOutcome::Failure)
371                        .with_attrs(::serde_json::json!({"error": format!("{e}")})),
372                    "pairing succeeded but token persistence failed"
373                );
374                return Json(serde_json::json!({
375                    "paired": true,
376                    "persisted": false,
377                    "token": token,
378                    "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
379                }))
380                .into_response();
381            }
382            Json(serde_json::json!({
383                "paired": true,
384                "persisted": true,
385                "token": token,
386                "message": "Pairing successful"
387            }))
388            .into_response()
389        }
390        Ok(None) => (StatusCode::BAD_REQUEST, "Invalid or expired pairing code").into_response(),
391        Err(lockout_secs) => (
392            StatusCode::TOO_MANY_REQUESTS,
393            format!("Too many attempts. Locked out for {lockout_secs}s"),
394        )
395            .into_response(),
396    }
397}
398
399/// GET /api/devices — list paired devices
400pub async fn list_devices(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
401    if let Err(e) = require_auth(&state, &headers) {
402        return e.into_response();
403    }
404
405    let devices = state
406        .device_registry
407        .as_ref()
408        .map(|r| r.list())
409        .unwrap_or_default();
410
411    let count = devices.len();
412    Json(serde_json::json!({
413        "devices": devices,
414        "count": count
415    }))
416    .into_response()
417}
418
419/// DELETE /api/devices/{id} — revoke a paired device and its bearer token.
420pub async fn revoke_device(
421    State(state): State<AppState>,
422    headers: HeaderMap,
423    axum::extract::Path(device_id): axum::extract::Path<String>,
424) -> impl IntoResponse {
425    if let Err(e) = require_auth(&state, &headers) {
426        return e.into_response();
427    }
428
429    let Some(registry) = state.device_registry.as_ref() else {
430        return (
431            StatusCode::SERVICE_UNAVAILABLE,
432            "Device registry is disabled",
433        )
434            .into_response();
435    };
436
437    let token_hash = match registry.revoke(&device_id) {
438        Ok(Some(hash)) => hash,
439        Ok(None) => return (StatusCode::NOT_FOUND, "Device not found").into_response(),
440        Err(e) => {
441            return (
442                StatusCode::INTERNAL_SERVER_ERROR,
443                format!("Device registry error: {e}"),
444            )
445                .into_response();
446        }
447    };
448
449    state.pairing.revoke_token_hash(&token_hash);
450
451    // If persistence fails after the in-memory revoke + row delete, the
452    // device row is already gone and the token is already invalid in this
453    // process; a daemon restart will resurrect the token from the unchanged
454    // on-disk config. Surface that to the caller so they know to re-pair
455    // and audit, rather than treating the operation as silently complete.
456    if let Err(e) = super::persist_pairing_tokens(state.config.clone(), &state.pairing).await {
457        return (
458            StatusCode::INTERNAL_SERVER_ERROR,
459            format!("Token revoked in memory but config persist failed: {e}"),
460        )
461            .into_response();
462    }
463
464    Json(serde_json::json!({
465        "message": "Device revoked and bearer token invalidated",
466        "device_id": device_id,
467    }))
468    .into_response()
469}
470
471/// POST /api/devices/me/capabilities — the calling device replaces its capability list.
472///
473/// The "me" path means there's no separate device id in the URL — the bearer token in
474/// Authorization identifies which row gets updated. Body: `{ "capabilities": ["..."] }`.
475pub async fn update_my_capabilities(
476    State(state): State<AppState>,
477    headers: HeaderMap,
478    Json(body): Json<serde_json::Value>,
479) -> impl IntoResponse {
480    if let Err(e) = require_auth(&state, &headers) {
481        return e.into_response();
482    }
483
484    let token = match extract_bearer(&headers) {
485        Some(t) => t,
486        None => return (StatusCode::UNAUTHORIZED, "Missing bearer token").into_response(),
487    };
488    let token_hash = {
489        use sha2::{Digest, Sha256};
490        let hash = Sha256::digest(token.as_bytes());
491        hex::encode(hash)
492    };
493
494    let capabilities: Vec<String> = body
495        .get("capabilities")
496        .and_then(|v| v.as_array())
497        .map(|arr| {
498            arr.iter()
499                .filter_map(|v| v.as_str().map(String::from))
500                .collect()
501        })
502        .unwrap_or_default();
503
504    let registry = match state.device_registry.as_ref() {
505        Some(r) => r,
506        None => {
507            return (
508                StatusCode::SERVICE_UNAVAILABLE,
509                "Device registry is disabled",
510            )
511                .into_response();
512        }
513    };
514
515    if registry.update_capabilities(&token_hash, capabilities.clone()) {
516        Json(serde_json::json!({
517            "message": "Capabilities updated",
518            "capabilities": capabilities,
519        }))
520        .into_response()
521    } else {
522        (StatusCode::NOT_FOUND, "Device not found for this token").into_response()
523    }
524}
525
526/// POST /api/devices/{id}/token/rotate — revoke the device's current bearer
527/// token and issue a fresh pairing code for re-pairing.
528///
529/// The device row is removed because the schema keys on `token_hash`; once
530/// the token is revoked the row's primary key is dead anyway. Re-pairing
531/// inserts a fresh row with the new token's hash.
532///
533/// The rotation's load-bearing effect is invalidating the leaked token, not
534/// issuing a new code. If another flow holds the pairing-code slot the
535/// revoke still happens; the response reports that no new code was issued
536/// and the operator can use the pending code or call again once it clears.
537///
538/// If the caller is using the same bearer token as the device being rotated
539/// (self-revocation), the response is delivered over the now-invalid token;
540/// subsequent requests from that client will fail until they re-pair. That
541/// is the intended path for "rotate my own token after I think it leaked."
542pub async fn rotate_token(
543    State(state): State<AppState>,
544    headers: HeaderMap,
545    axum::extract::Path(device_id): axum::extract::Path<String>,
546) -> impl IntoResponse {
547    if let Err(e) = require_auth(&state, &headers) {
548        return e.into_response();
549    }
550
551    let Some(registry) = state.device_registry.as_ref() else {
552        return (
553            StatusCode::SERVICE_UNAVAILABLE,
554            "Device registry is disabled",
555        )
556            .into_response();
557    };
558
559    let token_hash = match registry.revoke(&device_id) {
560        Ok(Some(hash)) => hash,
561        Ok(None) => return (StatusCode::NOT_FOUND, "Device not found").into_response(),
562        Err(e) => {
563            return (
564                StatusCode::INTERNAL_SERVER_ERROR,
565                format!("Device registry error: {e}"),
566            )
567                .into_response();
568        }
569    };
570
571    state.pairing.revoke_token_hash(&token_hash);
572
573    // Same persist-fail caveat as `revoke_device`: device row + in-memory
574    // token are already gone; surfacing the persist error tells the caller
575    // a restart could resurrect the token.
576    if let Err(e) = super::persist_pairing_tokens(state.config.clone(), &state.pairing).await {
577        return (
578            StatusCode::INTERNAL_SERVER_ERROR,
579            format!("Token revoked in memory but config persist failed: {e}"),
580        )
581            .into_response();
582    }
583
584    // Issue the new pairing code atomically against the slot. If another
585    // flow holds the slot, the revoke still stands — return 200 with
586    // `pairing_code: null` and a message that tells the operator what
587    // happened so they do not assume rotation failed.
588    match state.pairing.generate_pairing_code_if_vacant() {
589        Ok(code) => Json(serde_json::json!({
590            "device_id": device_id,
591            "pairing_code": code,
592            "message": "Old token revoked. Use this code to re-pair the device.",
593        }))
594        .into_response(),
595        Err(zeroclaw_config::pairing::GeneratePairingCodeError::Pending) => {
596            Json(serde_json::json!({
597                "device_id": device_id,
598                "pairing_code": null,
599                "message": "Old token revoked. A pairing code is already pending; use it or call again after it clears.",
600            }))
601            .into_response()
602        }
603        Err(zeroclaw_config::pairing::GeneratePairingCodeError::PairingDisabled) => {
604            Json(serde_json::json!({
605                "device_id": device_id,
606                "pairing_code": null,
607                "message": "Old token revoked. Pairing is disabled; cannot issue a new code.",
608            }))
609            .into_response()
610        }
611    }
612}