Skip to main content

zeroclaw_runtime/tunnel/
pinggy.rs

1use super::{SharedProcess, Tunnel, TunnelProcess, kill_shared, new_shared_process};
2use anyhow::{Result, bail};
3use tokio::io::AsyncBufReadExt;
4use tokio::process::Command;
5
6/// Pinggy Tunnel — uses SSH to expose a local port via pinggy.io.
7///
8/// No separate binary required — uses the system `ssh` command.
9/// Free tier works without a token; Pro features require a token
10/// from dashboard.pinggy.io.
11pub struct PinggyTunnel {
12    token: Option<String>,
13    region: Option<String>,
14    proc: SharedProcess,
15}
16
17impl PinggyTunnel {
18    pub fn new(token: Option<String>, region: Option<String>) -> Self {
19        Self {
20            token,
21            region,
22            proc: new_shared_process(),
23        }
24    }
25}
26
27#[async_trait::async_trait]
28impl Tunnel for PinggyTunnel {
29    fn name(&self) -> &str {
30        "pinggy"
31    }
32
33    async fn start(&self, local_host: &str, local_port: u16) -> Result<String> {
34        // Pro tokens use pro.pinggy.io; free tier uses free.pinggy.io.
35        let base = match self.token.as_deref() {
36            Some(t) if !t.is_empty() => "pro.pinggy.io",
37            _ => "free.pinggy.io",
38        };
39        let server_host = match self.region.as_deref() {
40            Some(r) if !r.is_empty() => format!("{}.{base}", r.to_ascii_lowercase()),
41            _ => base.into(),
42        };
43
44        // Build the SSH user portion: TOKEN@ or empty for free tier
45        let destination = match self.token.as_deref() {
46            Some(t) if !t.is_empty() => format!("{t}@{server_host}"),
47            _ => server_host,
48        };
49
50        // Use the caller-provided local_host for forwarding target.
51        let forward_spec = format!("0:{local_host}:{local_port}");
52
53        let mut child = Command::new("ssh")
54            .args([
55                "-T",
56                "-p",
57                "443",
58                "-R",
59                &forward_spec,
60                "-o",
61                "StrictHostKeyChecking=accept-new",
62                "-o",
63                "ServerAliveInterval=30",
64                &destination,
65            ])
66            .stdin(std::process::Stdio::null())
67            .stdout(std::process::Stdio::piped())
68            .stderr(std::process::Stdio::piped())
69            .kill_on_drop(true)
70            .spawn()?;
71
72        // Pinggy may print the tunnel URL to stdout or stderr depending on
73        // SSH mode; read both streams concurrently to catch it either way.
74        let stdout = child.stdout.take().ok_or_else(|| {
75            ::zeroclaw_log::record!(
76                ERROR,
77                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
78                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
79                    .with_attrs(
80                        ::serde_json::json!({"tunnel_provider": "pinggy", "stream": "stdout"})
81                    ),
82                "tunnel process: failed to capture child stream"
83            );
84            anyhow::Error::msg("Failed to capture pinggy stdout")
85        })?;
86        let stderr = child.stderr.take().ok_or_else(|| {
87            ::zeroclaw_log::record!(
88                ERROR,
89                ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
90                    .with_outcome(::zeroclaw_log::EventOutcome::Failure)
91                    .with_attrs(
92                        ::serde_json::json!({"tunnel_provider": "pinggy", "stream": "stderr"})
93                    ),
94                "tunnel process: failed to capture child stream"
95            );
96            anyhow::Error::msg("Failed to capture pinggy stderr")
97        })?;
98
99        let mut stdout_lines = tokio::io::BufReader::new(stdout).lines();
100        let mut stderr_lines = tokio::io::BufReader::new(stderr).lines();
101        let mut public_url = String::new();
102
103        // Tag each stream line so we know which stream produced EOF.
104        enum StreamLine {
105            Stdout(std::io::Result<Option<String>>),
106            Stderr(std::io::Result<Option<String>>),
107        }
108
109        let mut stdout_done = false;
110        let mut stderr_done = false;
111
112        // Wait up to 15s for the tunnel URL to appear on either stream
113        let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(15);
114        while tokio::time::Instant::now() < deadline && !(stdout_done && stderr_done) {
115            let stream_line = tokio::time::timeout(tokio::time::Duration::from_secs(3), async {
116                tokio::select! {
117                    biased;
118                    l = stdout_lines.next_line(), if !stdout_done => StreamLine::Stdout(l),
119                    l = stderr_lines.next_line(), if !stderr_done => StreamLine::Stderr(l),
120                }
121            })
122            .await;
123
124            match stream_line {
125                Ok(StreamLine::Stdout(Ok(Some(l))) | StreamLine::Stderr(Ok(Some(l)))) => {
126                    ::zeroclaw_log::record!(
127                        DEBUG,
128                        ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
129                            .with_attrs(::serde_json::json!({"l": l})),
130                        "pinggy: "
131                    );
132                    // Pinggy prints tunnel URLs like: https://xxxxx.a.free.pinggy.link
133                    // Skip non-tunnel URLs (e.g. dashboard.pinggy.io promo links).
134                    if let Some(idx) = l.find("https://") {
135                        let url_part = &l[idx..];
136                        let end = url_part
137                            .find(|c: char| c.is_whitespace())
138                            .unwrap_or(url_part.len());
139                        let candidate = &url_part[..end];
140                        if candidate.contains(".pinggy.link") {
141                            public_url = candidate.to_string();
142                            break;
143                        }
144                    }
145                }
146                Ok(StreamLine::Stdout(Ok(None))) => stdout_done = true,
147                Ok(StreamLine::Stderr(Ok(None))) => stderr_done = true,
148                Ok(StreamLine::Stdout(Err(e)) | StreamLine::Stderr(Err(e))) => {
149                    bail!("Error reading pinggy output: {e}")
150                }
151                Err(_) => {} // timeout — retry
152            }
153        }
154
155        if public_url.is_empty() {
156            child.kill().await.ok();
157            child.wait().await.ok();
158            bail!(
159                "pinggy did not produce a public URL within 15s. Is SSH available and the token valid?"
160            );
161        }
162
163        let mut guard = self.proc.lock().await;
164        *guard = Some(TunnelProcess {
165            child,
166            public_url: public_url.clone(),
167        });
168
169        Ok(public_url)
170    }
171
172    async fn stop(&self) -> Result<()> {
173        kill_shared(&self.proc).await
174    }
175
176    async fn health_check(&self) -> bool {
177        let mut guard = self.proc.lock().await;
178        match guard.as_mut() {
179            Some(tp) => match tp.child.try_wait() {
180                Ok(None) => true,              // still running
181                Ok(Some(_)) | Err(_) => false, // exited or error
182            },
183            None => false,
184        }
185    }
186
187    fn public_url(&self) -> Option<String> {
188        self.proc
189            .try_lock()
190            .ok()
191            .and_then(|g| g.as_ref().map(|tp| tp.public_url.clone()))
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn name_returns_pinggy() {
201        let tunnel = PinggyTunnel::new(None, None);
202        assert_eq!(tunnel.name(), "pinggy");
203    }
204
205    #[test]
206    fn constructor_stores_fields() {
207        let tunnel = PinggyTunnel::new(Some("test-token".into()), Some("us".into()));
208        assert_eq!(tunnel.token.as_deref(), Some("test-token"));
209        assert_eq!(tunnel.region.as_deref(), Some("us"));
210    }
211
212    #[test]
213    fn public_url_is_none_before_start() {
214        let tunnel = PinggyTunnel::new(None, None);
215        assert!(tunnel.public_url().is_none());
216    }
217
218    #[tokio::test]
219    async fn stop_before_start_is_ok() {
220        let tunnel = PinggyTunnel::new(None, None);
221        let result = tunnel.stop().await;
222        assert!(result.is_ok());
223    }
224
225    #[tokio::test]
226    async fn health_check_is_false_before_start() {
227        let tunnel = PinggyTunnel::new(None, None);
228        assert!(!tunnel.health_check().await);
229    }
230}