1#![deny(warnings)]
2
3use std::{collections::BTreeMap, env};
4
5use indoc::formatdoc;
6use k8s_openapi::{
7 api::core::v1::{Affinity, Container, Pod, PodAffinity, PodAffinityTerm, PodSpec},
8 apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta},
9};
10use k8s_test_framework::{
11 CommandBuilder, Framework, Interface, Manager, Reader, test_pod, wait_for_resource::WaitFor,
12};
13use rand::{
14 distr::{Alphanumeric, SampleString},
15 rng,
16};
17use tracing::{debug, error, info};
18
19pub mod metrics;
20
21pub const BUSYBOX_IMAGE: &str = "busybox:1.28";
22
23pub fn init() {
24 _ = env_logger::builder().is_test(true).try_init();
25}
26
27pub fn get_namespace() -> String {
28 let id = Alphanumeric.sample_string(&mut rng(), 5).to_lowercase();
31
32 format!("vector-{id}")
33}
34
35pub fn get_namespace_appended(namespace: &str, suffix: &str) -> String {
36 format!("{namespace}-{suffix}")
37}
38
39pub fn get_override_name(namespace: &str, suffix: &str) -> String {
42 format!("{namespace}-{suffix}")
43}
44
45pub fn is_multinode() -> bool {
47 env::var("MULTINODE").is_ok()
48}
49
50pub fn config_override_name(name: &str, cleanup: bool) -> String {
53 let vectordir = if is_multinode() {
54 format!("{name}-vector")
55 } else {
56 "vector".to_string()
57 };
58
59 let volumeconfig = if is_multinode() {
60 formatdoc!(
61 r#"
62 dataVolume:
63 hostPath:
64 path: /var/lib/{}/
65 "#,
66 vectordir,
67 )
68 } else {
69 String::new()
70 };
71
72 let cleanupconfig = if cleanup {
73 formatdoc!(
74 r#"
75 extraVolumeMounts:
76 - name: var-lib
77 mountPath: /var/writablelib
78 readOnly: false
79
80 lifecycle:
81 preStop:
82 exec:
83 command:
84 - sh
85 - -c
86 - rm -rf /var/writablelib/{}
87 "#,
88 vectordir,
89 )
90 } else {
91 String::new()
92 };
93
94 formatdoc!(
95 r#"
96 fullnameOverride: "{}"
97 {}
98 {}
99 "#,
100 name,
101 volumeconfig,
102 cleanupconfig,
103 )
104}
105
106pub fn make_framework() -> Framework {
107 let interface = Interface::from_env().expect("interface is not ready");
108 Framework::new(interface)
109}
110
111pub fn collect_btree<'a>(
112 items: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
113) -> Option<std::collections::BTreeMap<String, String>> {
114 let collected: std::collections::BTreeMap<String, String> = items
115 .into_iter()
116 .map(|(key, val)| (key.to_owned(), val.to_owned()))
117 .collect();
118 if collected.is_empty() {
119 return None;
120 }
121 Some(collected)
122}
123
124pub fn make_test_container<'a>(name: &'a str, command: &'a str) -> Container {
125 Container {
126 name: name.to_owned(),
127 image: Some(BUSYBOX_IMAGE.to_owned()),
128 command: Some(vec!["sh".to_owned()]),
129 args: Some(vec!["-c".to_owned(), command.to_owned()]),
130 ..Container::default()
131 }
132}
133
134pub fn make_test_pod_with_containers<'a>(
135 namespace: &'a str,
136 name: &'a str,
137 labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
138 annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
139 affinity: Option<Affinity>,
140 containers: Vec<Container>,
141) -> Pod {
142 Pod {
143 metadata: ObjectMeta {
144 name: Some(name.to_owned()),
145 namespace: Some(namespace.to_owned()),
146 labels: collect_btree(labels),
147 annotations: collect_btree(annotations),
148 ..ObjectMeta::default()
149 },
150 spec: Some(PodSpec {
151 containers,
152 restart_policy: Some("Never".to_owned()),
153 affinity,
154 ..PodSpec::default()
155 }),
156 ..Pod::default()
157 }
158}
159
160pub fn make_test_pod_with_affinity<'a>(
163 namespace: &'a str,
164 name: &'a str,
165 command: &'a str,
166 labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
167 annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
168 affinity_label: Option<(&'a str, &'a str)>,
169 affinity_namespace: Option<&'a str>,
170) -> Pod {
171 let affinity = affinity_label.map(|(label, value)| {
172 let selector = LabelSelector {
173 match_expressions: None,
174 match_labels: Some({
175 let mut map = BTreeMap::new();
176 map.insert(label.to_string(), value.to_string());
177 map
178 }),
179 };
180
181 Affinity {
182 node_affinity: None,
183 pod_affinity: Some(PodAffinity {
184 preferred_during_scheduling_ignored_during_execution: None,
185 required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm {
186 label_selector: Some(selector),
187 namespaces: Some(vec![affinity_namespace.unwrap_or(namespace).to_string()]),
188 topology_key: "kubernetes.io/hostname".to_string(),
189 }]),
190 }),
191 pod_anti_affinity: None,
192 }
193 });
194
195 make_test_pod_with_containers(
196 namespace,
197 name,
198 labels,
199 annotations,
200 affinity,
201 vec![make_test_container(name, command)],
202 )
203}
204
205pub fn make_test_pod<'a>(
206 namespace: &'a str,
207 name: &'a str,
208 command: &'a str,
209 labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
210 annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
211) -> Pod {
212 make_test_pod_with_affinity(namespace, name, command, labels, annotations, None, None)
213}
214
215pub fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
216 serde_json::from_str(s)
217}
218
219pub fn generate_long_string(a: usize, b: usize) -> String {
220 (0..a).fold(String::new(), |mut acc, i| {
221 let istr = i.to_string();
222 for _ in 0..b {
223 acc.push_str(&istr);
224 }
225 acc
226 })
227}
228
229pub async fn smoke_check_first_line(log_reader: &mut Reader) {
233 let first_line = log_reader
235 .read_line()
236 .await
237 .expect("unable to read first line");
238 let expected_pat = "INFO vector::app:";
239 assert!(
240 first_line.contains(expected_pat),
241 "Expected a line ending with {expected_pat:?} but got {first_line:?}; vector might be malfunctioning"
242 );
243}
244
245pub enum FlowControlCommand {
246 GoOn,
247 Terminate,
248}
249
250pub async fn look_for_log_line<P>(
251 log_reader: &mut Reader,
252 mut predicate: P,
253) -> Result<(), Box<dyn std::error::Error>>
254where
255 P: FnMut(serde_json::Value) -> FlowControlCommand,
256{
257 let mut lines_till_we_give_up = 10000;
258 while let Some(line) = log_reader.read_line().await {
259 debug!("Got line: {:?}", line);
260
261 lines_till_we_give_up -= 1;
262 if lines_till_we_give_up <= 0 {
263 info!("Giving up");
264 log_reader.kill().await?;
265 break;
266 }
267
268 if !line.starts_with('{') {
269 continue;
271 }
272
273 let val = match parse_json(&line) {
274 Ok(val) => val,
275 Err(err) if err.is_eof() => {
276 error!(
280 "The JSON line we just got was incomplete, most likely it was too long, so we're skipping it"
281 );
282 continue;
283 }
284 Err(err) => return Err(err.into()),
285 };
286
287 match predicate(val) {
288 FlowControlCommand::GoOn => {
289 }
291 FlowControlCommand::Terminate => {
292 log_reader.kill().await?;
297 }
298 }
299 }
300
301 log_reader.wait().await.expect("log reader wait failed");
303
304 Ok(())
305}
306
307pub async fn create_affinity_pod(
310 framework: &Framework,
311 namespace: &str,
312 affinity_label: &str,
313) -> Result<Manager<CommandBuilder>, Box<dyn std::error::Error>> {
314 let test_pod = framework
315 .test_pod(test_pod::Config::from_pod(&make_test_pod(
316 namespace,
317 "affinity-pod",
318 "tail -f /dev/null",
319 vec![(affinity_label, "yes")],
320 vec![],
321 ))?)
322 .await?;
323 framework
324 .wait(
325 namespace,
326 vec!["pods/affinity-pod"],
327 WaitFor::Condition("initialized"),
328 vec!["--timeout=60s"],
329 )
330 .await?;
331
332 Ok(test_pod)
333}