k8s_test_framework/
reader.rs

1//! Read process output.
2
3use std::process::{ExitStatus, Stdio};
4
5use tokio::{
6    io::{AsyncBufReadExt, BufReader},
7    process::{Child, ChildStdout, Command},
8};
9
10/// Keeps track of the command invocation, proving the interface to
11/// read the output and send a termination signal.
12#[derive(Debug)]
13pub struct Reader {
14    child: Child,
15    reader: BufReader<ChildStdout>,
16}
17
18impl Reader {
19    /// Spawn a command and provide a [`Reader`].
20    pub fn spawn(mut command: Command) -> std::io::Result<Self> {
21        Self::prepare_stdout(&mut command);
22        let child = command.spawn()?;
23        Ok(Self::new(child))
24    }
25
26    fn prepare_stdout(command: &mut Command) {
27        command.stdout(Stdio::piped());
28    }
29
30    fn new(mut child: Child) -> Self {
31        let stdout = child.stdout.take().unwrap();
32        let reader = BufReader::new(stdout);
33        Reader { child, reader }
34    }
35
36    /// Wait for the `kubectl logs` process to exit and return the exit code.
37    pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
38        self.child.wait().await
39    }
40
41    /// Send a termination signal to the `kubectl logs` process.
42    pub async fn kill(&mut self) -> std::io::Result<()> {
43        self.child.kill().await
44    }
45
46    /// Read one line from the stdout of the `kubectl logs` process.
47    pub async fn read_line(&mut self) -> Option<String> {
48        let mut s = String::new();
49        let result = self.reader.read_line(&mut s).await;
50        match result {
51            Ok(0) => None,
52            Ok(_) => Some(s),
53            Err(err) => panic!("{}", err),
54        }
55    }
56}
57
58#[cfg(unix)]
59#[cfg(test)]
60mod tests {
61    use super::*;
62
63    async fn collect(reader: &mut Reader) -> Vec<String> {
64        let mut list = Vec::new();
65        while let Some(line) = reader.read_line().await {
66            list.push(line)
67        }
68        list
69    }
70
71    #[tokio::test]
72    async fn test_reader_finite() {
73        let mut command = Command::new("echo");
74        command.arg("test");
75
76        let mut reader = Reader::spawn(command).expect("unable to spawn");
77
78        // Collect all line, expect stream to finish.
79        let lines = collect(&mut reader).await;
80        // Assert we got all the lines we expected.
81        assert_eq!(lines, vec!["test\n".to_owned()]);
82
83        // Ensure wait doesn't fail, and that we exit status is success.
84        let exit_status = reader.wait().await.expect("wait failed");
85        assert!(exit_status.success());
86    }
87
88    #[tokio::test]
89    async fn test_reader_infinite() {
90        let mut command = Command::new("bash");
91        command.arg("-c");
92        command.arg(r#"NUM=0; while true; do echo "Line $NUM"; NUM=$((NUM+=1)); sleep 0.01; done"#);
93
94        let mut reader = Reader::spawn(command).expect("unable to spawn");
95
96        // Read the lines and at some point ask the command we're reading from
97        // to stop.
98        let mut expected_num = 0;
99        while let Some(line) = reader.read_line().await {
100            // Assert we're getting expected lines.
101            assert_eq!(line, format!("Line {expected_num}\n"));
102
103            // On line 100 issue a `kill` to stop the infinite stream.
104            if expected_num == 100 {
105                reader.kill().await.expect("process already stopped")
106            }
107
108            // If we are past 200 it means we issued `kill` at 100 and it wasn't
109            // effective. This is problem, fail the test.
110            // We don't to this immediately after `kill` to allow for some
111            // potential race condition. That kind of race is not just ok, but
112            // is desirable in the real-life usage to read-up the whole stdout
113            // buffer.
114            if expected_num > 200 {
115                panic!("Went too far without stop being effective");
116            }
117
118            // Bump the expected num for the next iteration.
119            expected_num += 1;
120        }
121
122        // Ensure wait doesn't fail. We killed the process, so expect
123        // a non-success exit code.
124        let exit_status = reader.wait().await.expect("wait failed");
125        assert!(!exit_status.success());
126    }
127}