1#![deny(warnings)]
2
3use std::{collections::BTreeMap, env};
4
5use indoc::formatdoc;
6use k8s_openapi::{
7 api::core::v1::{Affinity, Container, Pod, PodAffinity, PodAffinityTerm, PodSpec},
8 apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta},
9};
10use k8s_test_framework::{
11 test_pod, wait_for_resource::WaitFor, CommandBuilder, Framework, Interface, Manager, Reader,
12};
13use rand::distr::{Alphanumeric, SampleString};
14use rand::rng;
15use tracing::{debug, error, info};
16
17pub mod metrics;
18
19pub const BUSYBOX_IMAGE: &str = "busybox:1.28";
20
21pub fn init() {
22 _ = env_logger::builder().is_test(true).try_init();
23}
24
25pub fn get_namespace() -> String {
26 let id = Alphanumeric.sample_string(&mut rng(), 5).to_lowercase();
29
30 format!("vector-{id}")
31}
32
33pub fn get_namespace_appended(namespace: &str, suffix: &str) -> String {
34 format!("{namespace}-{suffix}")
35}
36
37pub fn get_override_name(namespace: &str, suffix: &str) -> String {
40 format!("{namespace}-{suffix}")
41}
42
43pub fn is_multinode() -> bool {
45 env::var("MULTINODE").is_ok()
46}
47
48pub fn config_override_name(name: &str, cleanup: bool) -> String {
51 let vectordir = if is_multinode() {
52 format!("{name}-vector")
53 } else {
54 "vector".to_string()
55 };
56
57 let volumeconfig = if is_multinode() {
58 formatdoc!(
59 r#"
60 dataVolume:
61 hostPath:
62 path: /var/lib/{}/
63 "#,
64 vectordir,
65 )
66 } else {
67 String::new()
68 };
69
70 let cleanupconfig = if cleanup {
71 formatdoc!(
72 r#"
73 extraVolumeMounts:
74 - name: var-lib
75 mountPath: /var/writablelib
76 readOnly: false
77
78 lifecycle:
79 preStop:
80 exec:
81 command:
82 - sh
83 - -c
84 - rm -rf /var/writablelib/{}
85 "#,
86 vectordir,
87 )
88 } else {
89 String::new()
90 };
91
92 formatdoc!(
93 r#"
94 fullnameOverride: "{}"
95 {}
96 {}
97 "#,
98 name,
99 volumeconfig,
100 cleanupconfig,
101 )
102}
103
104pub fn make_framework() -> Framework {
105 let interface = Interface::from_env().expect("interface is not ready");
106 Framework::new(interface)
107}
108
109pub fn collect_btree<'a>(
110 items: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
111) -> Option<std::collections::BTreeMap<String, String>> {
112 let collected: std::collections::BTreeMap<String, String> = items
113 .into_iter()
114 .map(|(key, val)| (key.to_owned(), val.to_owned()))
115 .collect();
116 if collected.is_empty() {
117 return None;
118 }
119 Some(collected)
120}
121
122pub fn make_test_container<'a>(name: &'a str, command: &'a str) -> Container {
123 Container {
124 name: name.to_owned(),
125 image: Some(BUSYBOX_IMAGE.to_owned()),
126 command: Some(vec!["sh".to_owned()]),
127 args: Some(vec!["-c".to_owned(), command.to_owned()]),
128 ..Container::default()
129 }
130}
131
132pub fn make_test_pod_with_containers<'a>(
133 namespace: &'a str,
134 name: &'a str,
135 labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
136 annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
137 affinity: Option<Affinity>,
138 containers: Vec<Container>,
139) -> Pod {
140 Pod {
141 metadata: ObjectMeta {
142 name: Some(name.to_owned()),
143 namespace: Some(namespace.to_owned()),
144 labels: collect_btree(labels),
145 annotations: collect_btree(annotations),
146 ..ObjectMeta::default()
147 },
148 spec: Some(PodSpec {
149 containers,
150 restart_policy: Some("Never".to_owned()),
151 affinity,
152 ..PodSpec::default()
153 }),
154 ..Pod::default()
155 }
156}
157
158pub fn make_test_pod_with_affinity<'a>(
161 namespace: &'a str,
162 name: &'a str,
163 command: &'a str,
164 labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
165 annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
166 affinity_label: Option<(&'a str, &'a str)>,
167 affinity_namespace: Option<&'a str>,
168) -> Pod {
169 let affinity = affinity_label.map(|(label, value)| {
170 let selector = LabelSelector {
171 match_expressions: None,
172 match_labels: Some({
173 let mut map = BTreeMap::new();
174 map.insert(label.to_string(), value.to_string());
175 map
176 }),
177 };
178
179 Affinity {
180 node_affinity: None,
181 pod_affinity: Some(PodAffinity {
182 preferred_during_scheduling_ignored_during_execution: None,
183 required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm {
184 label_selector: Some(selector),
185 namespaces: Some(vec![affinity_namespace.unwrap_or(namespace).to_string()]),
186 topology_key: "kubernetes.io/hostname".to_string(),
187 }]),
188 }),
189 pod_anti_affinity: None,
190 }
191 });
192
193 make_test_pod_with_containers(
194 namespace,
195 name,
196 labels,
197 annotations,
198 affinity,
199 vec![make_test_container(name, command)],
200 )
201}
202
203pub fn make_test_pod<'a>(
204 namespace: &'a str,
205 name: &'a str,
206 command: &'a str,
207 labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
208 annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
209) -> Pod {
210 make_test_pod_with_affinity(namespace, name, command, labels, annotations, None, None)
211}
212
213pub fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
214 serde_json::from_str(s)
215}
216
217pub fn generate_long_string(a: usize, b: usize) -> String {
218 (0..a).fold(String::new(), |mut acc, i| {
219 let istr = i.to_string();
220 for _ in 0..b {
221 acc.push_str(&istr);
222 }
223 acc
224 })
225}
226
227pub async fn smoke_check_first_line(log_reader: &mut Reader) {
231 let first_line = log_reader
233 .read_line()
234 .await
235 .expect("unable to read first line");
236 let expected_pat = "INFO vector::app:";
237 assert!(
238 first_line.contains(expected_pat),
239 "Expected a line ending with {expected_pat:?} but got {first_line:?}; vector might be malfunctioning"
240 );
241}
242
243pub enum FlowControlCommand {
244 GoOn,
245 Terminate,
246}
247
248pub async fn look_for_log_line<P>(
249 log_reader: &mut Reader,
250 mut predicate: P,
251) -> Result<(), Box<dyn std::error::Error>>
252where
253 P: FnMut(serde_json::Value) -> FlowControlCommand,
254{
255 let mut lines_till_we_give_up = 10000;
256 while let Some(line) = log_reader.read_line().await {
257 debug!("Got line: {:?}", line);
258
259 lines_till_we_give_up -= 1;
260 if lines_till_we_give_up <= 0 {
261 info!("Giving up");
262 log_reader.kill().await?;
263 break;
264 }
265
266 if !line.starts_with('{') {
267 continue;
269 }
270
271 let val = match parse_json(&line) {
272 Ok(val) => val,
273 Err(err) if err.is_eof() => {
274 error!("The JSON line we just got was incomplete, most likely it was too long, so we're skipping it");
278 continue;
279 }
280 Err(err) => return Err(err.into()),
281 };
282
283 match predicate(val) {
284 FlowControlCommand::GoOn => {
285 }
287 FlowControlCommand::Terminate => {
288 log_reader.kill().await?;
293 }
294 }
295 }
296
297 log_reader.wait().await.expect("log reader wait failed");
299
300 Ok(())
301}
302
303pub async fn create_affinity_pod(
306 framework: &Framework,
307 namespace: &str,
308 affinity_label: &str,
309) -> Result<Manager<CommandBuilder>, Box<dyn std::error::Error>> {
310 let test_pod = framework
311 .test_pod(test_pod::Config::from_pod(&make_test_pod(
312 namespace,
313 "affinity-pod",
314 "tail -f /dev/null",
315 vec![(affinity_label, "yes")],
316 vec![],
317 ))?)
318 .await?;
319 framework
320 .wait(
321 namespace,
322 vec!["pods/affinity-pod"],
323 WaitFor::Condition("initialized"),
324 vec!["--timeout=60s"],
325 )
326 .await?;
327
328 Ok(test_pod)
329}