1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
//! Read process output.
use std::process::{ExitStatus, Stdio};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, ChildStdout, Command},
};
/// Keeps track of the command invocation, proving the interface to
/// read the output and send a termination signal.
#[derive(Debug)]
pub struct Reader {
child: Child,
reader: BufReader<ChildStdout>,
}
impl Reader {
/// Spawn a command and provide a [`Reader`].
pub fn spawn(mut command: Command) -> std::io::Result<Self> {
Self::prepare_stdout(&mut command);
let child = command.spawn()?;
Ok(Self::new(child))
}
fn prepare_stdout(command: &mut Command) {
command.stdout(Stdio::piped());
}
fn new(mut child: Child) -> Self {
let stdout = child.stdout.take().unwrap();
let reader = BufReader::new(stdout);
Reader { child, reader }
}
/// Wait for the `kubectl logs` process to exit and return the exit code.
pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
self.child.wait().await
}
/// Send a termination signal to the `kubectl logs` process.
pub async fn kill(&mut self) -> std::io::Result<()> {
self.child.kill().await
}
/// Read one line from the stdout of the `kubectl logs` process.
pub async fn read_line(&mut self) -> Option<String> {
let mut s = String::new();
let result = self.reader.read_line(&mut s).await;
match result {
Ok(0) => None,
Ok(_) => Some(s),
Err(err) => panic!("{}", err),
}
}
}
#[cfg(unix)]
#[cfg(test)]
mod tests {
use super::*;
async fn collect(reader: &mut Reader) -> Vec<String> {
let mut list = Vec::new();
while let Some(line) = reader.read_line().await {
list.push(line)
}
list
}
#[tokio::test]
async fn test_reader_finite() {
let mut command = Command::new("echo");
command.arg("test");
let mut reader = Reader::spawn(command).expect("unable to spawn");
// Collect all line, expect stream to finish.
let lines = collect(&mut reader).await;
// Assert we got all the lines we expected.
assert_eq!(lines, vec!["test\n".to_owned()]);
// Ensure wait doesn't fail, and that we exit status is success.
let exit_status = reader.wait().await.expect("wait failed");
assert!(exit_status.success());
}
#[tokio::test]
async fn test_reader_infinite() {
let mut command = Command::new("bash");
command.arg("-c");
command.arg(r#"NUM=0; while true; do echo "Line $NUM"; NUM=$((NUM+=1)); sleep 0.01; done"#);
let mut reader = Reader::spawn(command).expect("unable to spawn");
// Read the lines and at some point ask the command we're reading from
// to stop.
let mut expected_num = 0;
while let Some(line) = reader.read_line().await {
// Assert we're getting expected lines.
assert_eq!(line, format!("Line {}\n", expected_num));
// On line 100 issue a `kill` to stop the infinite stream.
if expected_num == 100 {
reader.kill().await.expect("process already stopped")
}
// If we are past 200 it means we issued `kill` at 100 and it wasn't
// effective. This is problem, fail the test.
// We don't to this immediately after `kill` to allow for some
// potential race condition. That kind of race is not just ok, but
// is desirable in the real-life usage to read-up the whole stdout
// buffer.
if expected_num > 200 {
panic!("Went too far without stop being effective");
}
// Bump the expected num for the next iteration.
expected_num += 1;
}
// Ensure wait doesn't fail. We killed the process, so expect
// a non-success exit code.
let exit_status = reader.wait().await.expect("wait failed");
assert!(!exit_status.success());
}
}