k8s_e2e_tests/
lib.rs

1#![deny(warnings)]
2
3use std::{collections::BTreeMap, env};
4
5use indoc::formatdoc;
6use k8s_openapi::{
7    api::core::v1::{Affinity, Container, Pod, PodAffinity, PodAffinityTerm, PodSpec},
8    apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta},
9};
10use k8s_test_framework::{
11    test_pod, wait_for_resource::WaitFor, CommandBuilder, Framework, Interface, Manager, Reader,
12};
13use rand::distr::{Alphanumeric, SampleString};
14use rand::rng;
15use tracing::{debug, error, info};
16
17pub mod metrics;
18
19pub const BUSYBOX_IMAGE: &str = "busybox:1.28";
20
21pub fn init() {
22    _ = env_logger::builder().is_test(true).try_init();
23}
24
25pub fn get_namespace() -> String {
26    // Generate a random alphanumeric (lowercase) string to ensure each test is run with unique names.
27    // There is a 36 ^ 5 chance of a name collision, which is likely to be an acceptable risk.
28    let id = Alphanumeric.sample_string(&mut rng(), 5).to_lowercase();
29
30    format!("vector-{id}")
31}
32
33pub fn get_namespace_appended(namespace: &str, suffix: &str) -> String {
34    format!("{namespace}-{suffix}")
35}
36
37/// Gets a name we can use for roles to prevent them conflicting with other tests.
38/// Uses the provided namespace as the root.
39pub fn get_override_name(namespace: &str, suffix: &str) -> String {
40    format!("{namespace}-{suffix}")
41}
42
43/// Is the MULTINODE environment variable set?
44pub fn is_multinode() -> bool {
45    env::var("MULTINODE").is_ok()
46}
47
48/// Create config adding fullnameOverride entry. This allows multiple tests
49/// to be run against the same cluster without the role names clashing.
50pub fn config_override_name(name: &str, cleanup: bool) -> String {
51    let vectordir = if is_multinode() {
52        format!("{name}-vector")
53    } else {
54        "vector".to_string()
55    };
56
57    let volumeconfig = if is_multinode() {
58        formatdoc!(
59            r#"
60            dataVolume:
61              hostPath:
62                path: /var/lib/{}/
63            "#,
64            vectordir,
65        )
66    } else {
67        String::new()
68    };
69
70    let cleanupconfig = if cleanup {
71        formatdoc!(
72            r#"
73        extraVolumeMounts:
74          - name: var-lib
75            mountPath: /var/writablelib
76            readOnly: false
77
78        lifecycle:
79          preStop:
80            exec:
81              command:
82                - sh
83                - -c
84                - rm -rf /var/writablelib/{}
85                "#,
86            vectordir,
87        )
88    } else {
89        String::new()
90    };
91
92    formatdoc!(
93        r#"
94        fullnameOverride: "{}"
95        {}
96        {}
97        "#,
98        name,
99        volumeconfig,
100        cleanupconfig,
101    )
102}
103
104pub fn make_framework() -> Framework {
105    let interface = Interface::from_env().expect("interface is not ready");
106    Framework::new(interface)
107}
108
109pub fn collect_btree<'a>(
110    items: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
111) -> Option<std::collections::BTreeMap<String, String>> {
112    let collected: std::collections::BTreeMap<String, String> = items
113        .into_iter()
114        .map(|(key, val)| (key.to_owned(), val.to_owned()))
115        .collect();
116    if collected.is_empty() {
117        return None;
118    }
119    Some(collected)
120}
121
122pub fn make_test_container<'a>(name: &'a str, command: &'a str) -> Container {
123    Container {
124        name: name.to_owned(),
125        image: Some(BUSYBOX_IMAGE.to_owned()),
126        command: Some(vec!["sh".to_owned()]),
127        args: Some(vec!["-c".to_owned(), command.to_owned()]),
128        ..Container::default()
129    }
130}
131
132pub fn make_test_pod_with_containers<'a>(
133    namespace: &'a str,
134    name: &'a str,
135    labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
136    annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
137    affinity: Option<Affinity>,
138    containers: Vec<Container>,
139) -> Pod {
140    Pod {
141        metadata: ObjectMeta {
142            name: Some(name.to_owned()),
143            namespace: Some(namespace.to_owned()),
144            labels: collect_btree(labels),
145            annotations: collect_btree(annotations),
146            ..ObjectMeta::default()
147        },
148        spec: Some(PodSpec {
149            containers,
150            restart_policy: Some("Never".to_owned()),
151            affinity,
152            ..PodSpec::default()
153        }),
154        ..Pod::default()
155    }
156}
157
158/// Since the tests only scan the logs from an agent on a single node, we want to make sure that all the test pods are on
159/// the same node so the agent picks them all.
160pub fn make_test_pod_with_affinity<'a>(
161    namespace: &'a str,
162    name: &'a str,
163    command: &'a str,
164    labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
165    annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
166    affinity_label: Option<(&'a str, &'a str)>,
167    affinity_namespace: Option<&'a str>,
168) -> Pod {
169    let affinity = affinity_label.map(|(label, value)| {
170        let selector = LabelSelector {
171            match_expressions: None,
172            match_labels: Some({
173                let mut map = BTreeMap::new();
174                map.insert(label.to_string(), value.to_string());
175                map
176            }),
177        };
178
179        Affinity {
180            node_affinity: None,
181            pod_affinity: Some(PodAffinity {
182                preferred_during_scheduling_ignored_during_execution: None,
183                required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm {
184                    label_selector: Some(selector),
185                    namespaces: Some(vec![affinity_namespace.unwrap_or(namespace).to_string()]),
186                    topology_key: "kubernetes.io/hostname".to_string(),
187                }]),
188            }),
189            pod_anti_affinity: None,
190        }
191    });
192
193    make_test_pod_with_containers(
194        namespace,
195        name,
196        labels,
197        annotations,
198        affinity,
199        vec![make_test_container(name, command)],
200    )
201}
202
203pub fn make_test_pod<'a>(
204    namespace: &'a str,
205    name: &'a str,
206    command: &'a str,
207    labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
208    annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
209) -> Pod {
210    make_test_pod_with_affinity(namespace, name, command, labels, annotations, None, None)
211}
212
213pub fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
214    serde_json::from_str(s)
215}
216
217pub fn generate_long_string(a: usize, b: usize) -> String {
218    (0..a).fold(String::new(), |mut acc, i| {
219        let istr = i.to_string();
220        for _ in 0..b {
221            acc.push_str(&istr);
222        }
223        acc
224    })
225}
226
227/// Read the first line from vector logs and assert that it matches the expected
228/// one.
229/// This allows detecting the situations where things have gone very wrong.
230pub async fn smoke_check_first_line(log_reader: &mut Reader) {
231    // Wait for first line as a smoke check.
232    let first_line = log_reader
233        .read_line()
234        .await
235        .expect("unable to read first line");
236    let expected_pat = "INFO vector::app:";
237    assert!(
238        first_line.contains(expected_pat),
239        "Expected a line ending with {expected_pat:?} but got {first_line:?}; vector might be malfunctioning"
240    );
241}
242
243pub enum FlowControlCommand {
244    GoOn,
245    Terminate,
246}
247
248pub async fn look_for_log_line<P>(
249    log_reader: &mut Reader,
250    mut predicate: P,
251) -> Result<(), Box<dyn std::error::Error>>
252where
253    P: FnMut(serde_json::Value) -> FlowControlCommand,
254{
255    let mut lines_till_we_give_up = 10000;
256    while let Some(line) = log_reader.read_line().await {
257        debug!("Got line: {:?}", line);
258
259        lines_till_we_give_up -= 1;
260        if lines_till_we_give_up <= 0 {
261            info!("Giving up");
262            log_reader.kill().await?;
263            break;
264        }
265
266        if !line.starts_with('{') {
267            // This isn't a json, must be an entry from Vector's own log stream.
268            continue;
269        }
270
271        let val = match parse_json(&line) {
272            Ok(val) => val,
273            Err(err) if err.is_eof() => {
274                // We got an EOF error, this is most likely some very long line,
275                // we don't produce lines this bing is our test cases, so we'll
276                // just skip the error - as if it wasn't a JSON string.
277                error!("The JSON line we just got was incomplete, most likely it was too long, so we're skipping it");
278                continue;
279            }
280            Err(err) => return Err(err.into()),
281        };
282
283        match predicate(val) {
284            FlowControlCommand::GoOn => {
285                // Not what we were looking for, go on.
286            }
287            FlowControlCommand::Terminate => {
288                // We are told we should stop, request that log reader is
289                // killed.
290                // This doesn't immediately stop the reading because we want to
291                // process the pending buffers first.
292                log_reader.kill().await?;
293            }
294        }
295    }
296
297    // Ensure log reader exited.
298    log_reader.wait().await.expect("log reader wait failed");
299
300    Ok(())
301}
302
303/// Create a pod for our other pods to have an affinity to ensure they are all deployed on
304/// the same node.
305pub async fn create_affinity_pod(
306    framework: &Framework,
307    namespace: &str,
308    affinity_label: &str,
309) -> Result<Manager<CommandBuilder>, Box<dyn std::error::Error>> {
310    let test_pod = framework
311        .test_pod(test_pod::Config::from_pod(&make_test_pod(
312            namespace,
313            "affinity-pod",
314            "tail -f /dev/null",
315            vec![(affinity_label, "yes")],
316            vec![],
317        ))?)
318        .await?;
319    framework
320        .wait(
321            namespace,
322            vec!["pods/affinity-pod"],
323            WaitFor::Condition("initialized"),
324            vec!["--timeout=60s"],
325        )
326        .await?;
327
328    Ok(test_pod)
329}