1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//! Perform a port forward from a port listening on a local system to the
//! a port exposed from a cluster-deployed resource.

#![allow(clippy::print_stdout)] // test framework

use std::process::{ExitStatus, Stdio};

use tokio::{
    io::{AsyncBufReadExt, BufReader},
    process::{Child, ChildStdout, Command},
};

use super::Result;

/// Initiate a port forward (`kubectl port-forward`) with the specified
/// `kubectl_command` for the specified `resource` at the specified `namespace`
/// and the specified local/cluster-resource ports pair.
/// Returns a [`PortForwarder`] that manages the process.
pub fn port_forward(
    kubectl_command: &str,
    namespace: &str,
    resource: &str,
    local_port: u16,
    resource_port: u16,
) -> Result<PortForwarder> {
    let mut command = Command::new(kubectl_command);

    command
        .stdin(Stdio::null())
        .stderr(Stdio::inherit())
        .stdout(Stdio::piped());

    command.arg("port-forward");
    command.arg("-n").arg(namespace);
    command.arg(resource);
    command.arg(format!("{}:{}", local_port, resource_port));

    command.kill_on_drop(true);

    let mut child = command.spawn()?;
    let stdout = child.stdout.take().unwrap();
    let reader = BufReader::new(stdout);

    Ok(PortForwarder {
        local_port,
        resource_port,
        child,
        reader,
    })
}

/// Keeps track of the continuously running `kubectl port-forward` command,
/// exposing the API to terminate it when needed.
#[derive(Debug)]
pub struct PortForwarder {
    local_port: u16,
    resource_port: u16,
    child: Child,
    reader: BufReader<ChildStdout>,
}

impl PortForwarder {
    /// Waits for port forward process to start listening on IPv4 and IPv6 local
    /// sockets.
    pub async fn wait_until_ready(&mut self) -> Result<()> {
        let ready_string_ipv4 = format!(
            "Forwarding from 127.0.0.1:{} -> {}",
            self.local_port, self.resource_port
        );
        let ready_string_ipv6 = format!(
            "Forwarding from [::1]:{} -> {}",
            self.local_port, self.resource_port
        );

        let mut buf = String::new();
        let mut seen_ipv4 = false;
        let mut seen_ipv6 = false;
        loop {
            self.reader.read_line(&mut buf).await?;
            print!("{}", &buf);

            if buf.contains(&ready_string_ipv4) {
                seen_ipv4 = true;
            }
            if buf.contains(&ready_string_ipv6) {
                seen_ipv6 = true;
            }

            buf.clear();

            if seen_ipv4 && seen_ipv6 {
                break;
            }
        }
        Ok(())
    }

    /// Returns the local port that port forward was requested to listen on.
    pub fn local_port(&self) -> u16 {
        self.local_port
    }

    /// Returns the resource port that port forward was requested to forward to.
    pub fn resource_port(&self) -> u16 {
        self.resource_port
    }

    /// Returns the local address (in the "host:port" form) to connect to
    /// in order to reach the cluster resource port, at the IPv4 address family.
    pub fn local_addr_ipv4(&self) -> String {
        format!("127.0.0.1:{}", self.local_port)
    }

    /// Returns the local address (in the "host:port" form) to connect to
    /// in order to reach the cluster resource port, at the IPv6 address family.
    pub fn local_addr_ipv6(&self) -> String {
        format!("[::1]:{}", self.local_port)
    }

    /// Wait for the `kubectl port-forward` process to exit and return the exit
    /// code.
    pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
        self.child.wait().await
    }

    /// Send a termination signal to the `kubectl port-forward` process.
    pub async fn kill(&mut self) -> std::io::Result<()> {
        self.child.kill().await
    }
}