k8s_test_framework/
reader.rs1use std::process::{ExitStatus, Stdio};
4
5use tokio::{
6 io::{AsyncBufReadExt, BufReader},
7 process::{Child, ChildStdout, Command},
8};
9
10#[derive(Debug)]
13pub struct Reader {
14 child: Child,
15 reader: BufReader<ChildStdout>,
16}
17
18impl Reader {
19 pub fn spawn(mut command: Command) -> std::io::Result<Self> {
21 Self::prepare_stdout(&mut command);
22 let child = command.spawn()?;
23 Ok(Self::new(child))
24 }
25
26 fn prepare_stdout(command: &mut Command) {
27 command.stdout(Stdio::piped());
28 }
29
30 fn new(mut child: Child) -> Self {
31 let stdout = child.stdout.take().unwrap();
32 let reader = BufReader::new(stdout);
33 Reader { child, reader }
34 }
35
36 pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
38 self.child.wait().await
39 }
40
41 pub async fn kill(&mut self) -> std::io::Result<()> {
43 self.child.kill().await
44 }
45
46 pub async fn read_line(&mut self) -> Option<String> {
48 let mut s = String::new();
49 let result = self.reader.read_line(&mut s).await;
50 match result {
51 Ok(0) => None,
52 Ok(_) => Some(s),
53 Err(err) => panic!("{}", err),
54 }
55 }
56}
57
58#[cfg(unix)]
59#[cfg(test)]
60mod tests {
61 use super::*;
62
63 async fn collect(reader: &mut Reader) -> Vec<String> {
64 let mut list = Vec::new();
65 while let Some(line) = reader.read_line().await {
66 list.push(line)
67 }
68 list
69 }
70
71 #[tokio::test]
72 async fn test_reader_finite() {
73 let mut command = Command::new("echo");
74 command.arg("test");
75
76 let mut reader = Reader::spawn(command).expect("unable to spawn");
77
78 let lines = collect(&mut reader).await;
80 assert_eq!(lines, vec!["test\n".to_owned()]);
82
83 let exit_status = reader.wait().await.expect("wait failed");
85 assert!(exit_status.success());
86 }
87
88 #[tokio::test]
89 async fn test_reader_infinite() {
90 let mut command = Command::new("bash");
91 command.arg("-c");
92 command.arg(r#"NUM=0; while true; do echo "Line $NUM"; NUM=$((NUM+=1)); sleep 0.01; done"#);
93
94 let mut reader = Reader::spawn(command).expect("unable to spawn");
95
96 let mut expected_num = 0;
99 while let Some(line) = reader.read_line().await {
100 assert_eq!(line, format!("Line {expected_num}\n"));
102
103 if expected_num == 100 {
105 reader.kill().await.expect("process already stopped")
106 }
107
108 if expected_num > 200 {
115 panic!("Went too far without stop being effective");
116 }
117
118 expected_num += 1;
120 }
121
122 let exit_status = reader.wait().await.expect("wait failed");
125 assert!(!exit_status.success());
126 }
127}