k8s_test_framework/
port_forward.rs

1//! Perform a port forward from a port listening on a local system to the
2//! a port exposed from a cluster-deployed resource.
3
4#![allow(clippy::print_stdout)] // test framework
5
6use std::process::{ExitStatus, Stdio};
7
8use tokio::{
9    io::{AsyncBufReadExt, BufReader},
10    process::{Child, ChildStdout, Command},
11};
12
13use super::Result;
14
15/// Initiate a port forward (`kubectl port-forward`) with the specified
16/// `kubectl_command` for the specified `resource` at the specified `namespace`
17/// and the specified local/cluster-resource ports pair.
18/// Returns a [`PortForwarder`] that manages the process.
19pub fn port_forward(
20    kubectl_command: &str,
21    namespace: &str,
22    resource: &str,
23    local_port: u16,
24    resource_port: u16,
25) -> Result<PortForwarder> {
26    let mut command = Command::new(kubectl_command);
27
28    command
29        .stdin(Stdio::null())
30        .stderr(Stdio::inherit())
31        .stdout(Stdio::piped());
32
33    command.arg("port-forward");
34    command.arg("-n").arg(namespace);
35    command.arg(resource);
36    command.arg(format!("{local_port}:{resource_port}"));
37
38    command.kill_on_drop(true);
39
40    let mut child = command.spawn()?;
41    let stdout = child.stdout.take().unwrap();
42    let reader = BufReader::new(stdout);
43
44    Ok(PortForwarder {
45        local_port,
46        resource_port,
47        child,
48        reader,
49    })
50}
51
52/// Keeps track of the continuously running `kubectl port-forward` command,
53/// exposing the API to terminate it when needed.
54#[derive(Debug)]
55pub struct PortForwarder {
56    local_port: u16,
57    resource_port: u16,
58    child: Child,
59    reader: BufReader<ChildStdout>,
60}
61
62impl PortForwarder {
63    /// Waits for port forward process to start listening on IPv4 and IPv6 local
64    /// sockets.
65    pub async fn wait_until_ready(&mut self) -> Result<()> {
66        let ready_string_ipv4 = format!(
67            "Forwarding from 127.0.0.1:{} -> {}",
68            self.local_port, self.resource_port
69        );
70        let ready_string_ipv6 = format!(
71            "Forwarding from [::1]:{} -> {}",
72            self.local_port, self.resource_port
73        );
74
75        let mut buf = String::new();
76        let mut seen_ipv4 = false;
77        let mut seen_ipv6 = false;
78        loop {
79            self.reader.read_line(&mut buf).await?;
80            print!("{}", &buf);
81
82            if buf.contains(&ready_string_ipv4) {
83                seen_ipv4 = true;
84            }
85            if buf.contains(&ready_string_ipv6) {
86                seen_ipv6 = true;
87            }
88
89            buf.clear();
90
91            if seen_ipv4 && seen_ipv6 {
92                break;
93            }
94        }
95        Ok(())
96    }
97
98    /// Returns the local port that port forward was requested to listen on.
99    pub fn local_port(&self) -> u16 {
100        self.local_port
101    }
102
103    /// Returns the resource port that port forward was requested to forward to.
104    pub fn resource_port(&self) -> u16 {
105        self.resource_port
106    }
107
108    /// Returns the local address (in the "host:port" form) to connect to
109    /// in order to reach the cluster resource port, at the IPv4 address family.
110    pub fn local_addr_ipv4(&self) -> String {
111        format!("127.0.0.1:{}", self.local_port)
112    }
113
114    /// Returns the local address (in the "host:port" form) to connect to
115    /// in order to reach the cluster resource port, at the IPv6 address family.
116    pub fn local_addr_ipv6(&self) -> String {
117        format!("[::1]:{}", self.local_port)
118    }
119
120    /// Wait for the `kubectl port-forward` process to exit and return the exit
121    /// code.
122    pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
123        self.child.wait().await
124    }
125
126    /// Send a termination signal to the `kubectl port-forward` process.
127    pub async fn kill(&mut self) -> std::io::Result<()> {
128        self.child.kill().await
129    }
130}