k8s_test_framework/
port_forward.rs1#![allow(clippy::print_stdout)] use std::process::{ExitStatus, Stdio};
7
8use tokio::{
9 io::{AsyncBufReadExt, BufReader},
10 process::{Child, ChildStdout, Command},
11};
12
13use super::Result;
14
15pub fn port_forward(
20 kubectl_command: &str,
21 namespace: &str,
22 resource: &str,
23 local_port: u16,
24 resource_port: u16,
25) -> Result<PortForwarder> {
26 let mut command = Command::new(kubectl_command);
27
28 command
29 .stdin(Stdio::null())
30 .stderr(Stdio::inherit())
31 .stdout(Stdio::piped());
32
33 command.arg("port-forward");
34 command.arg("-n").arg(namespace);
35 command.arg(resource);
36 command.arg(format!("{local_port}:{resource_port}"));
37
38 command.kill_on_drop(true);
39
40 let mut child = command.spawn()?;
41 let stdout = child.stdout.take().unwrap();
42 let reader = BufReader::new(stdout);
43
44 Ok(PortForwarder {
45 local_port,
46 resource_port,
47 child,
48 reader,
49 })
50}
51
52#[derive(Debug)]
55pub struct PortForwarder {
56 local_port: u16,
57 resource_port: u16,
58 child: Child,
59 reader: BufReader<ChildStdout>,
60}
61
62impl PortForwarder {
63 pub async fn wait_until_ready(&mut self) -> Result<()> {
66 let ready_string_ipv4 = format!(
67 "Forwarding from 127.0.0.1:{} -> {}",
68 self.local_port, self.resource_port
69 );
70 let ready_string_ipv6 = format!(
71 "Forwarding from [::1]:{} -> {}",
72 self.local_port, self.resource_port
73 );
74
75 let mut buf = String::new();
76 let mut seen_ipv4 = false;
77 let mut seen_ipv6 = false;
78 loop {
79 self.reader.read_line(&mut buf).await?;
80 print!("{}", &buf);
81
82 if buf.contains(&ready_string_ipv4) {
83 seen_ipv4 = true;
84 }
85 if buf.contains(&ready_string_ipv6) {
86 seen_ipv6 = true;
87 }
88
89 buf.clear();
90
91 if seen_ipv4 && seen_ipv6 {
92 break;
93 }
94 }
95 Ok(())
96 }
97
98 pub fn local_port(&self) -> u16 {
100 self.local_port
101 }
102
103 pub fn resource_port(&self) -> u16 {
105 self.resource_port
106 }
107
108 pub fn local_addr_ipv4(&self) -> String {
111 format!("127.0.0.1:{}", self.local_port)
112 }
113
114 pub fn local_addr_ipv6(&self) -> String {
117 format!("[::1]:{}", self.local_port)
118 }
119
120 pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
123 self.child.wait().await
124 }
125
126 pub async fn kill(&mut self) -> std::io::Result<()> {
128 self.child.kill().await
129 }
130}