zeroclaw_runtime/tunnel/
pinggy.rs1use super::{SharedProcess, Tunnel, TunnelProcess, kill_shared, new_shared_process};
2use anyhow::{Result, bail};
3use tokio::io::AsyncBufReadExt;
4use tokio::process::Command;
5
6pub struct PinggyTunnel {
12 token: Option<String>,
13 region: Option<String>,
14 proc: SharedProcess,
15}
16
17impl PinggyTunnel {
18 pub fn new(token: Option<String>, region: Option<String>) -> Self {
19 Self {
20 token,
21 region,
22 proc: new_shared_process(),
23 }
24 }
25}
26
27#[async_trait::async_trait]
28impl Tunnel for PinggyTunnel {
29 fn name(&self) -> &str {
30 "pinggy"
31 }
32
33 async fn start(&self, local_host: &str, local_port: u16) -> Result<String> {
34 let base = match self.token.as_deref() {
36 Some(t) if !t.is_empty() => "pro.pinggy.io",
37 _ => "free.pinggy.io",
38 };
39 let server_host = match self.region.as_deref() {
40 Some(r) if !r.is_empty() => format!("{}.{base}", r.to_ascii_lowercase()),
41 _ => base.into(),
42 };
43
44 let destination = match self.token.as_deref() {
46 Some(t) if !t.is_empty() => format!("{t}@{server_host}"),
47 _ => server_host,
48 };
49
50 let forward_spec = format!("0:{local_host}:{local_port}");
52
53 let mut child = Command::new("ssh")
54 .args([
55 "-T",
56 "-p",
57 "443",
58 "-R",
59 &forward_spec,
60 "-o",
61 "StrictHostKeyChecking=accept-new",
62 "-o",
63 "ServerAliveInterval=30",
64 &destination,
65 ])
66 .stdin(std::process::Stdio::null())
67 .stdout(std::process::Stdio::piped())
68 .stderr(std::process::Stdio::piped())
69 .kill_on_drop(true)
70 .spawn()?;
71
72 let stdout = child.stdout.take().ok_or_else(|| {
75 ::zeroclaw_log::record!(
76 ERROR,
77 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
78 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
79 .with_attrs(
80 ::serde_json::json!({"tunnel_provider": "pinggy", "stream": "stdout"})
81 ),
82 "tunnel process: failed to capture child stream"
83 );
84 anyhow::Error::msg("Failed to capture pinggy stdout")
85 })?;
86 let stderr = child.stderr.take().ok_or_else(|| {
87 ::zeroclaw_log::record!(
88 ERROR,
89 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Fail)
90 .with_outcome(::zeroclaw_log::EventOutcome::Failure)
91 .with_attrs(
92 ::serde_json::json!({"tunnel_provider": "pinggy", "stream": "stderr"})
93 ),
94 "tunnel process: failed to capture child stream"
95 );
96 anyhow::Error::msg("Failed to capture pinggy stderr")
97 })?;
98
99 let mut stdout_lines = tokio::io::BufReader::new(stdout).lines();
100 let mut stderr_lines = tokio::io::BufReader::new(stderr).lines();
101 let mut public_url = String::new();
102
103 enum StreamLine {
105 Stdout(std::io::Result<Option<String>>),
106 Stderr(std::io::Result<Option<String>>),
107 }
108
109 let mut stdout_done = false;
110 let mut stderr_done = false;
111
112 let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(15);
114 while tokio::time::Instant::now() < deadline && !(stdout_done && stderr_done) {
115 let stream_line = tokio::time::timeout(tokio::time::Duration::from_secs(3), async {
116 tokio::select! {
117 biased;
118 l = stdout_lines.next_line(), if !stdout_done => StreamLine::Stdout(l),
119 l = stderr_lines.next_line(), if !stderr_done => StreamLine::Stderr(l),
120 }
121 })
122 .await;
123
124 match stream_line {
125 Ok(StreamLine::Stdout(Ok(Some(l))) | StreamLine::Stderr(Ok(Some(l)))) => {
126 ::zeroclaw_log::record!(
127 DEBUG,
128 ::zeroclaw_log::Event::new(module_path!(), ::zeroclaw_log::Action::Note)
129 .with_attrs(::serde_json::json!({"l": l})),
130 "pinggy: "
131 );
132 if let Some(idx) = l.find("https://") {
135 let url_part = &l[idx..];
136 let end = url_part
137 .find(|c: char| c.is_whitespace())
138 .unwrap_or(url_part.len());
139 let candidate = &url_part[..end];
140 if candidate.contains(".pinggy.link") {
141 public_url = candidate.to_string();
142 break;
143 }
144 }
145 }
146 Ok(StreamLine::Stdout(Ok(None))) => stdout_done = true,
147 Ok(StreamLine::Stderr(Ok(None))) => stderr_done = true,
148 Ok(StreamLine::Stdout(Err(e)) | StreamLine::Stderr(Err(e))) => {
149 bail!("Error reading pinggy output: {e}")
150 }
151 Err(_) => {} }
153 }
154
155 if public_url.is_empty() {
156 child.kill().await.ok();
157 child.wait().await.ok();
158 bail!(
159 "pinggy did not produce a public URL within 15s. Is SSH available and the token valid?"
160 );
161 }
162
163 let mut guard = self.proc.lock().await;
164 *guard = Some(TunnelProcess {
165 child,
166 public_url: public_url.clone(),
167 });
168
169 Ok(public_url)
170 }
171
172 async fn stop(&self) -> Result<()> {
173 kill_shared(&self.proc).await
174 }
175
176 async fn health_check(&self) -> bool {
177 let mut guard = self.proc.lock().await;
178 match guard.as_mut() {
179 Some(tp) => match tp.child.try_wait() {
180 Ok(None) => true, Ok(Some(_)) | Err(_) => false, },
183 None => false,
184 }
185 }
186
187 fn public_url(&self) -> Option<String> {
188 self.proc
189 .try_lock()
190 .ok()
191 .and_then(|g| g.as_ref().map(|tp| tp.public_url.clone()))
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn name_returns_pinggy() {
201 let tunnel = PinggyTunnel::new(None, None);
202 assert_eq!(tunnel.name(), "pinggy");
203 }
204
205 #[test]
206 fn constructor_stores_fields() {
207 let tunnel = PinggyTunnel::new(Some("test-token".into()), Some("us".into()));
208 assert_eq!(tunnel.token.as_deref(), Some("test-token"));
209 assert_eq!(tunnel.region.as_deref(), Some("us"));
210 }
211
212 #[test]
213 fn public_url_is_none_before_start() {
214 let tunnel = PinggyTunnel::new(None, None);
215 assert!(tunnel.public_url().is_none());
216 }
217
218 #[tokio::test]
219 async fn stop_before_start_is_ok() {
220 let tunnel = PinggyTunnel::new(None, None);
221 let result = tunnel.stop().await;
222 assert!(result.is_ok());
223 }
224
225 #[tokio::test]
226 async fn health_check_is_false_before_start() {
227 let tunnel = PinggyTunnel::new(None, None);
228 assert!(!tunnel.health_check().await);
229 }
230}