1#![deny(warnings)]
2
3use std::{collections::BTreeMap, env};
4
5use indoc::formatdoc;
6use k8s_openapi::{
7 api::core::v1::{Affinity, Container, Pod, PodAffinity, PodAffinityTerm, PodSpec},
8 apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta},
9};
10use k8s_test_framework::{
11 CommandBuilder, Framework, Interface, Manager, Reader, test_pod, wait_for_resource::WaitFor,
12};
13use rand::{
14 distr::{Alphanumeric, SampleString},
15 rng,
16};
17use tracing::{debug, error, info};
18
19pub mod metrics;
20
21pub const BUSYBOX_IMAGE: &str = "busybox:1.28";
22
23pub fn helm_chart_repo() -> String {
26 env::var("HELM_CHART_REPO").unwrap_or_else(|_| "https://helm.vector.dev".to_string())
27}
28
29pub fn init() {
30 _ = env_logger::builder().is_test(true).try_init();
31}
32
33pub fn get_namespace() -> String {
34 let id = Alphanumeric.sample_string(&mut rng(), 5).to_lowercase();
37
38 format!("vector-{id}")
39}
40
41pub fn get_namespace_appended(namespace: &str, suffix: &str) -> String {
42 format!("{namespace}-{suffix}")
43}
44
45pub fn get_override_name(namespace: &str, suffix: &str) -> String {
48 format!("{namespace}-{suffix}")
49}
50
51pub fn is_multinode() -> bool {
53 env::var("MULTINODE").is_ok()
54}
55
56pub fn config_override_name(name: &str, cleanup: bool) -> String {
59 let vectordir = if is_multinode() {
60 format!("{name}-vector")
61 } else {
62 "vector".to_string()
63 };
64
65 let volumeconfig = if is_multinode() {
66 formatdoc!(
67 r#"
68 dataVolume:
69 hostPath:
70 path: /var/lib/{}/
71 "#,
72 vectordir,
73 )
74 } else {
75 String::new()
76 };
77
78 let cleanupconfig = if cleanup {
79 formatdoc!(
80 r#"
81 extraVolumeMounts:
82 - name: var-lib
83 mountPath: /var/writablelib
84 readOnly: false
85
86 lifecycle:
87 preStop:
88 exec:
89 command:
90 - sh
91 - -c
92 - rm -rf /var/writablelib/{}
93 "#,
94 vectordir,
95 )
96 } else {
97 String::new()
98 };
99
100 formatdoc!(
101 r#"
102 fullnameOverride: "{}"
103 {}
104 {}
105 "#,
106 name,
107 volumeconfig,
108 cleanupconfig,
109 )
110}
111
112pub fn make_framework() -> Framework {
113 let interface = Interface::from_env().expect("interface is not ready");
114 Framework::new(interface)
115}
116
117pub fn collect_btree<'a>(
118 items: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
119) -> Option<std::collections::BTreeMap<String, String>> {
120 let collected: std::collections::BTreeMap<String, String> = items
121 .into_iter()
122 .map(|(key, val)| (key.to_owned(), val.to_owned()))
123 .collect();
124 if collected.is_empty() {
125 return None;
126 }
127 Some(collected)
128}
129
130pub fn make_test_container<'a>(name: &'a str, command: &'a str) -> Container {
131 Container {
132 name: name.to_owned(),
133 image: Some(BUSYBOX_IMAGE.to_owned()),
134 command: Some(vec!["sh".to_owned()]),
135 args: Some(vec!["-c".to_owned(), command.to_owned()]),
136 ..Container::default()
137 }
138}
139
140pub fn make_test_pod_with_containers<'a>(
141 namespace: &'a str,
142 name: &'a str,
143 labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
144 annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
145 affinity: Option<Affinity>,
146 containers: Vec<Container>,
147) -> Pod {
148 Pod {
149 metadata: ObjectMeta {
150 name: Some(name.to_owned()),
151 namespace: Some(namespace.to_owned()),
152 labels: collect_btree(labels),
153 annotations: collect_btree(annotations),
154 ..ObjectMeta::default()
155 },
156 spec: Some(PodSpec {
157 containers,
158 restart_policy: Some("Never".to_owned()),
159 affinity,
160 ..PodSpec::default()
161 }),
162 ..Pod::default()
163 }
164}
165
166pub fn make_test_pod_with_affinity<'a>(
169 namespace: &'a str,
170 name: &'a str,
171 command: &'a str,
172 labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
173 annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
174 affinity_label: Option<(&'a str, &'a str)>,
175 affinity_namespace: Option<&'a str>,
176) -> Pod {
177 let affinity = affinity_label.map(|(label, value)| {
178 let selector = LabelSelector {
179 match_expressions: None,
180 match_labels: Some({
181 let mut map = BTreeMap::new();
182 map.insert(label.to_string(), value.to_string());
183 map
184 }),
185 };
186
187 Affinity {
188 node_affinity: None,
189 pod_affinity: Some(PodAffinity {
190 preferred_during_scheduling_ignored_during_execution: None,
191 required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm {
192 label_selector: Some(selector),
193 namespaces: Some(vec![affinity_namespace.unwrap_or(namespace).to_string()]),
194 topology_key: "kubernetes.io/hostname".to_string(),
195 match_label_keys: None,
196 mismatch_label_keys: None,
197 namespace_selector: None,
198 }]),
199 }),
200 pod_anti_affinity: None,
201 }
202 });
203
204 make_test_pod_with_containers(
205 namespace,
206 name,
207 labels,
208 annotations,
209 affinity,
210 vec![make_test_container(name, command)],
211 )
212}
213
214pub fn make_test_pod<'a>(
215 namespace: &'a str,
216 name: &'a str,
217 command: &'a str,
218 labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
219 annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
220) -> Pod {
221 make_test_pod_with_affinity(namespace, name, command, labels, annotations, None, None)
222}
223
224pub fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
225 serde_json::from_str(s)
226}
227
228pub fn generate_long_string(a: usize, b: usize) -> String {
229 (0..a).fold(String::new(), |mut acc, i| {
230 let istr = i.to_string();
231 for _ in 0..b {
232 acc.push_str(&istr);
233 }
234 acc
235 })
236}
237
238pub async fn smoke_check_first_line(log_reader: &mut Reader) {
242 let first_line = log_reader
244 .read_line()
245 .await
246 .expect("unable to read first line");
247 let expected_pat = "INFO vector::app:";
248 assert!(
249 first_line.contains(expected_pat),
250 "Expected a line ending with {expected_pat:?} but got {first_line:?}; vector might be malfunctioning"
251 );
252}
253
254pub enum FlowControlCommand {
255 GoOn,
256 Terminate,
257}
258
259pub async fn look_for_log_line<P>(
260 log_reader: &mut Reader,
261 mut predicate: P,
262) -> Result<(), Box<dyn std::error::Error>>
263where
264 P: FnMut(serde_json::Value) -> FlowControlCommand,
265{
266 let mut lines_till_we_give_up = 10000;
267 while let Some(line) = log_reader.read_line().await {
268 debug!("Got line: {:?}", line);
269
270 lines_till_we_give_up -= 1;
271 if lines_till_we_give_up <= 0 {
272 info!("Giving up");
273 log_reader.kill().await?;
274 break;
275 }
276
277 if !line.starts_with('{') {
278 continue;
280 }
281
282 let val = match parse_json(&line) {
283 Ok(val) => val,
284 Err(err) if err.is_eof() => {
285 error!(
289 "The JSON line we just got was incomplete, most likely it was too long, so we're skipping it"
290 );
291 continue;
292 }
293 Err(err) => return Err(err.into()),
294 };
295
296 match predicate(val) {
297 FlowControlCommand::GoOn => {
298 }
300 FlowControlCommand::Terminate => {
301 log_reader.kill().await?;
306 }
307 }
308 }
309
310 log_reader.wait().await.expect("log reader wait failed");
312
313 Ok(())
314}
315
316pub async fn create_affinity_pod(
319 framework: &Framework,
320 namespace: &str,
321 affinity_label: &str,
322) -> Result<Manager<CommandBuilder>, Box<dyn std::error::Error>> {
323 let test_pod = framework
324 .test_pod(test_pod::Config::from_pod(&make_test_pod(
325 namespace,
326 "affinity-pod",
327 "tail -f /dev/null",
328 vec![(affinity_label, "yes")],
329 vec![],
330 ))?)
331 .await?;
332 framework
333 .wait(
334 namespace,
335 vec!["pods/affinity-pod"],
336 WaitFor::Condition("initialized"),
337 vec!["--timeout=60s"],
338 )
339 .await?;
340
341 Ok(test_pod)
342}