k8s_e2e_tests/
lib.rs

1#![deny(warnings)]
2
3use std::{collections::BTreeMap, env};
4
5use indoc::formatdoc;
6use k8s_openapi::{
7    api::core::v1::{Affinity, Container, Pod, PodAffinity, PodAffinityTerm, PodSpec},
8    apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta},
9};
10use k8s_test_framework::{
11    CommandBuilder, Framework, Interface, Manager, Reader, test_pod, wait_for_resource::WaitFor,
12};
13use rand::{
14    distr::{Alphanumeric, SampleString},
15    rng,
16};
17use tracing::{debug, error, info};
18
19pub mod metrics;
20
21pub const BUSYBOX_IMAGE: &str = "busybox:1.28";
22
23/// Returns the Helm chart repo to use for E2E tests.
24/// Set `HELM_CHART_REPO` to override the default (e.g., a local chart path).
25pub fn helm_chart_repo() -> String {
26    env::var("HELM_CHART_REPO").unwrap_or_else(|_| "https://helm.vector.dev".to_string())
27}
28
29pub fn init() {
30    _ = env_logger::builder().is_test(true).try_init();
31}
32
33pub fn get_namespace() -> String {
34    // Generate a random alphanumeric (lowercase) string to ensure each test is run with unique names.
35    // There is a 36 ^ 5 chance of a name collision, which is likely to be an acceptable risk.
36    let id = Alphanumeric.sample_string(&mut rng(), 5).to_lowercase();
37
38    format!("vector-{id}")
39}
40
41pub fn get_namespace_appended(namespace: &str, suffix: &str) -> String {
42    format!("{namespace}-{suffix}")
43}
44
45/// Gets a name we can use for roles to prevent them conflicting with other tests.
46/// Uses the provided namespace as the root.
47pub fn get_override_name(namespace: &str, suffix: &str) -> String {
48    format!("{namespace}-{suffix}")
49}
50
51/// Is the MULTINODE environment variable set?
52pub fn is_multinode() -> bool {
53    env::var("MULTINODE").is_ok()
54}
55
56/// Create config adding fullnameOverride entry. This allows multiple tests
57/// to be run against the same cluster without the role names clashing.
58pub fn config_override_name(name: &str, cleanup: bool) -> String {
59    let vectordir = if is_multinode() {
60        format!("{name}-vector")
61    } else {
62        "vector".to_string()
63    };
64
65    let volumeconfig = if is_multinode() {
66        formatdoc!(
67            r#"
68            dataVolume:
69              hostPath:
70                path: /var/lib/{}/
71            "#,
72            vectordir,
73        )
74    } else {
75        String::new()
76    };
77
78    let cleanupconfig = if cleanup {
79        formatdoc!(
80            r#"
81        extraVolumeMounts:
82          - name: var-lib
83            mountPath: /var/writablelib
84            readOnly: false
85
86        lifecycle:
87          preStop:
88            exec:
89              command:
90                - sh
91                - -c
92                - rm -rf /var/writablelib/{}
93                "#,
94            vectordir,
95        )
96    } else {
97        String::new()
98    };
99
100    formatdoc!(
101        r#"
102        fullnameOverride: "{}"
103        {}
104        {}
105        "#,
106        name,
107        volumeconfig,
108        cleanupconfig,
109    )
110}
111
112pub fn make_framework() -> Framework {
113    let interface = Interface::from_env().expect("interface is not ready");
114    Framework::new(interface)
115}
116
117pub fn collect_btree<'a>(
118    items: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
119) -> Option<std::collections::BTreeMap<String, String>> {
120    let collected: std::collections::BTreeMap<String, String> = items
121        .into_iter()
122        .map(|(key, val)| (key.to_owned(), val.to_owned()))
123        .collect();
124    if collected.is_empty() {
125        return None;
126    }
127    Some(collected)
128}
129
130pub fn make_test_container<'a>(name: &'a str, command: &'a str) -> Container {
131    Container {
132        name: name.to_owned(),
133        image: Some(BUSYBOX_IMAGE.to_owned()),
134        command: Some(vec!["sh".to_owned()]),
135        args: Some(vec!["-c".to_owned(), command.to_owned()]),
136        ..Container::default()
137    }
138}
139
140pub fn make_test_pod_with_containers<'a>(
141    namespace: &'a str,
142    name: &'a str,
143    labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
144    annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
145    affinity: Option<Affinity>,
146    containers: Vec<Container>,
147) -> Pod {
148    Pod {
149        metadata: ObjectMeta {
150            name: Some(name.to_owned()),
151            namespace: Some(namespace.to_owned()),
152            labels: collect_btree(labels),
153            annotations: collect_btree(annotations),
154            ..ObjectMeta::default()
155        },
156        spec: Some(PodSpec {
157            containers,
158            restart_policy: Some("Never".to_owned()),
159            affinity,
160            ..PodSpec::default()
161        }),
162        ..Pod::default()
163    }
164}
165
166/// Since the tests only scan the logs from an agent on a single node, we want to make sure that all the test pods are on
167/// the same node so the agent picks them all.
168pub fn make_test_pod_with_affinity<'a>(
169    namespace: &'a str,
170    name: &'a str,
171    command: &'a str,
172    labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
173    annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
174    affinity_label: Option<(&'a str, &'a str)>,
175    affinity_namespace: Option<&'a str>,
176) -> Pod {
177    let affinity = affinity_label.map(|(label, value)| {
178        let selector = LabelSelector {
179            match_expressions: None,
180            match_labels: Some({
181                let mut map = BTreeMap::new();
182                map.insert(label.to_string(), value.to_string());
183                map
184            }),
185        };
186
187        Affinity {
188            node_affinity: None,
189            pod_affinity: Some(PodAffinity {
190                preferred_during_scheduling_ignored_during_execution: None,
191                required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm {
192                    label_selector: Some(selector),
193                    namespaces: Some(vec![affinity_namespace.unwrap_or(namespace).to_string()]),
194                    topology_key: "kubernetes.io/hostname".to_string(),
195                    match_label_keys: None,
196                    mismatch_label_keys: None,
197                    namespace_selector: None,
198                }]),
199            }),
200            pod_anti_affinity: None,
201        }
202    });
203
204    make_test_pod_with_containers(
205        namespace,
206        name,
207        labels,
208        annotations,
209        affinity,
210        vec![make_test_container(name, command)],
211    )
212}
213
214pub fn make_test_pod<'a>(
215    namespace: &'a str,
216    name: &'a str,
217    command: &'a str,
218    labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
219    annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
220) -> Pod {
221    make_test_pod_with_affinity(namespace, name, command, labels, annotations, None, None)
222}
223
224pub fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
225    serde_json::from_str(s)
226}
227
228pub fn generate_long_string(a: usize, b: usize) -> String {
229    (0..a).fold(String::new(), |mut acc, i| {
230        let istr = i.to_string();
231        for _ in 0..b {
232            acc.push_str(&istr);
233        }
234        acc
235    })
236}
237
238/// Read the first line from vector logs and assert that it matches the expected
239/// one.
240/// This allows detecting the situations where things have gone very wrong.
241pub async fn smoke_check_first_line(log_reader: &mut Reader) {
242    // Wait for first line as a smoke check.
243    let first_line = log_reader
244        .read_line()
245        .await
246        .expect("unable to read first line");
247    let expected_pat = "INFO vector::app:";
248    assert!(
249        first_line.contains(expected_pat),
250        "Expected a line ending with {expected_pat:?} but got {first_line:?}; vector might be malfunctioning"
251    );
252}
253
254pub enum FlowControlCommand {
255    GoOn,
256    Terminate,
257}
258
259pub async fn look_for_log_line<P>(
260    log_reader: &mut Reader,
261    mut predicate: P,
262) -> Result<(), Box<dyn std::error::Error>>
263where
264    P: FnMut(serde_json::Value) -> FlowControlCommand,
265{
266    let mut lines_till_we_give_up = 10000;
267    while let Some(line) = log_reader.read_line().await {
268        debug!("Got line: {:?}", line);
269
270        lines_till_we_give_up -= 1;
271        if lines_till_we_give_up <= 0 {
272            info!("Giving up");
273            log_reader.kill().await?;
274            break;
275        }
276
277        if !line.starts_with('{') {
278            // This isn't a json, must be an entry from Vector's own log stream.
279            continue;
280        }
281
282        let val = match parse_json(&line) {
283            Ok(val) => val,
284            Err(err) if err.is_eof() => {
285                // We got an EOF error, this is most likely some very long line,
286                // we don't produce lines this bing is our test cases, so we'll
287                // just skip the error - as if it wasn't a JSON string.
288                error!(
289                    "The JSON line we just got was incomplete, most likely it was too long, so we're skipping it"
290                );
291                continue;
292            }
293            Err(err) => return Err(err.into()),
294        };
295
296        match predicate(val) {
297            FlowControlCommand::GoOn => {
298                // Not what we were looking for, go on.
299            }
300            FlowControlCommand::Terminate => {
301                // We are told we should stop, request that log reader is
302                // killed.
303                // This doesn't immediately stop the reading because we want to
304                // process the pending buffers first.
305                log_reader.kill().await?;
306            }
307        }
308    }
309
310    // Ensure log reader exited.
311    log_reader.wait().await.expect("log reader wait failed");
312
313    Ok(())
314}
315
316/// Create a pod for our other pods to have an affinity to ensure they are all deployed on
317/// the same node.
318pub async fn create_affinity_pod(
319    framework: &Framework,
320    namespace: &str,
321    affinity_label: &str,
322) -> Result<Manager<CommandBuilder>, Box<dyn std::error::Error>> {
323    let test_pod = framework
324        .test_pod(test_pod::Config::from_pod(&make_test_pod(
325            namespace,
326            "affinity-pod",
327            "tail -f /dev/null",
328            vec![(affinity_label, "yes")],
329            vec![],
330        ))?)
331        .await?;
332    framework
333        .wait(
334            namespace,
335            vec!["pods/affinity-pod"],
336            WaitFor::Condition("initialized"),
337            vec!["--timeout=60s"],
338        )
339        .await?;
340
341    Ok(test_pod)
342}