k8s_e2e_tests/
lib.rs

1#![deny(warnings)]
2
3use std::{collections::BTreeMap, env};
4
5use indoc::formatdoc;
6use k8s_openapi::{
7    api::core::v1::{Affinity, Container, Pod, PodAffinity, PodAffinityTerm, PodSpec},
8    apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta},
9};
10use k8s_test_framework::{
11    CommandBuilder, Framework, Interface, Manager, Reader, test_pod, wait_for_resource::WaitFor,
12};
13use rand::{
14    distr::{Alphanumeric, SampleString},
15    rng,
16};
17use tracing::{debug, error, info};
18
19pub mod metrics;
20
21pub const BUSYBOX_IMAGE: &str = "busybox:1.28";
22
23pub fn init() {
24    _ = env_logger::builder().is_test(true).try_init();
25}
26
27pub fn get_namespace() -> String {
28    // Generate a random alphanumeric (lowercase) string to ensure each test is run with unique names.
29    // There is a 36 ^ 5 chance of a name collision, which is likely to be an acceptable risk.
30    let id = Alphanumeric.sample_string(&mut rng(), 5).to_lowercase();
31
32    format!("vector-{id}")
33}
34
35pub fn get_namespace_appended(namespace: &str, suffix: &str) -> String {
36    format!("{namespace}-{suffix}")
37}
38
39/// Gets a name we can use for roles to prevent them conflicting with other tests.
40/// Uses the provided namespace as the root.
41pub fn get_override_name(namespace: &str, suffix: &str) -> String {
42    format!("{namespace}-{suffix}")
43}
44
45/// Is the MULTINODE environment variable set?
46pub fn is_multinode() -> bool {
47    env::var("MULTINODE").is_ok()
48}
49
50/// Create config adding fullnameOverride entry. This allows multiple tests
51/// to be run against the same cluster without the role names clashing.
52pub fn config_override_name(name: &str, cleanup: bool) -> String {
53    let vectordir = if is_multinode() {
54        format!("{name}-vector")
55    } else {
56        "vector".to_string()
57    };
58
59    let volumeconfig = if is_multinode() {
60        formatdoc!(
61            r#"
62            dataVolume:
63              hostPath:
64                path: /var/lib/{}/
65            "#,
66            vectordir,
67        )
68    } else {
69        String::new()
70    };
71
72    let cleanupconfig = if cleanup {
73        formatdoc!(
74            r#"
75        extraVolumeMounts:
76          - name: var-lib
77            mountPath: /var/writablelib
78            readOnly: false
79
80        lifecycle:
81          preStop:
82            exec:
83              command:
84                - sh
85                - -c
86                - rm -rf /var/writablelib/{}
87                "#,
88            vectordir,
89        )
90    } else {
91        String::new()
92    };
93
94    formatdoc!(
95        r#"
96        fullnameOverride: "{}"
97        {}
98        {}
99        "#,
100        name,
101        volumeconfig,
102        cleanupconfig,
103    )
104}
105
106pub fn make_framework() -> Framework {
107    let interface = Interface::from_env().expect("interface is not ready");
108    Framework::new(interface)
109}
110
111pub fn collect_btree<'a>(
112    items: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
113) -> Option<std::collections::BTreeMap<String, String>> {
114    let collected: std::collections::BTreeMap<String, String> = items
115        .into_iter()
116        .map(|(key, val)| (key.to_owned(), val.to_owned()))
117        .collect();
118    if collected.is_empty() {
119        return None;
120    }
121    Some(collected)
122}
123
124pub fn make_test_container<'a>(name: &'a str, command: &'a str) -> Container {
125    Container {
126        name: name.to_owned(),
127        image: Some(BUSYBOX_IMAGE.to_owned()),
128        command: Some(vec!["sh".to_owned()]),
129        args: Some(vec!["-c".to_owned(), command.to_owned()]),
130        ..Container::default()
131    }
132}
133
134pub fn make_test_pod_with_containers<'a>(
135    namespace: &'a str,
136    name: &'a str,
137    labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
138    annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
139    affinity: Option<Affinity>,
140    containers: Vec<Container>,
141) -> Pod {
142    Pod {
143        metadata: ObjectMeta {
144            name: Some(name.to_owned()),
145            namespace: Some(namespace.to_owned()),
146            labels: collect_btree(labels),
147            annotations: collect_btree(annotations),
148            ..ObjectMeta::default()
149        },
150        spec: Some(PodSpec {
151            containers,
152            restart_policy: Some("Never".to_owned()),
153            affinity,
154            ..PodSpec::default()
155        }),
156        ..Pod::default()
157    }
158}
159
160/// Since the tests only scan the logs from an agent on a single node, we want to make sure that all the test pods are on
161/// the same node so the agent picks them all.
162pub fn make_test_pod_with_affinity<'a>(
163    namespace: &'a str,
164    name: &'a str,
165    command: &'a str,
166    labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
167    annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
168    affinity_label: Option<(&'a str, &'a str)>,
169    affinity_namespace: Option<&'a str>,
170) -> Pod {
171    let affinity = affinity_label.map(|(label, value)| {
172        let selector = LabelSelector {
173            match_expressions: None,
174            match_labels: Some({
175                let mut map = BTreeMap::new();
176                map.insert(label.to_string(), value.to_string());
177                map
178            }),
179        };
180
181        Affinity {
182            node_affinity: None,
183            pod_affinity: Some(PodAffinity {
184                preferred_during_scheduling_ignored_during_execution: None,
185                required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm {
186                    label_selector: Some(selector),
187                    namespaces: Some(vec![affinity_namespace.unwrap_or(namespace).to_string()]),
188                    topology_key: "kubernetes.io/hostname".to_string(),
189                }]),
190            }),
191            pod_anti_affinity: None,
192        }
193    });
194
195    make_test_pod_with_containers(
196        namespace,
197        name,
198        labels,
199        annotations,
200        affinity,
201        vec![make_test_container(name, command)],
202    )
203}
204
205pub fn make_test_pod<'a>(
206    namespace: &'a str,
207    name: &'a str,
208    command: &'a str,
209    labels: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
210    annotations: impl IntoIterator<Item = (&'a str, &'a str)> + 'a,
211) -> Pod {
212    make_test_pod_with_affinity(namespace, name, command, labels, annotations, None, None)
213}
214
215pub fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
216    serde_json::from_str(s)
217}
218
219pub fn generate_long_string(a: usize, b: usize) -> String {
220    (0..a).fold(String::new(), |mut acc, i| {
221        let istr = i.to_string();
222        for _ in 0..b {
223            acc.push_str(&istr);
224        }
225        acc
226    })
227}
228
229/// Read the first line from vector logs and assert that it matches the expected
230/// one.
231/// This allows detecting the situations where things have gone very wrong.
232pub async fn smoke_check_first_line(log_reader: &mut Reader) {
233    // Wait for first line as a smoke check.
234    let first_line = log_reader
235        .read_line()
236        .await
237        .expect("unable to read first line");
238    let expected_pat = "INFO vector::app:";
239    assert!(
240        first_line.contains(expected_pat),
241        "Expected a line ending with {expected_pat:?} but got {first_line:?}; vector might be malfunctioning"
242    );
243}
244
245pub enum FlowControlCommand {
246    GoOn,
247    Terminate,
248}
249
250pub async fn look_for_log_line<P>(
251    log_reader: &mut Reader,
252    mut predicate: P,
253) -> Result<(), Box<dyn std::error::Error>>
254where
255    P: FnMut(serde_json::Value) -> FlowControlCommand,
256{
257    let mut lines_till_we_give_up = 10000;
258    while let Some(line) = log_reader.read_line().await {
259        debug!("Got line: {:?}", line);
260
261        lines_till_we_give_up -= 1;
262        if lines_till_we_give_up <= 0 {
263            info!("Giving up");
264            log_reader.kill().await?;
265            break;
266        }
267
268        if !line.starts_with('{') {
269            // This isn't a json, must be an entry from Vector's own log stream.
270            continue;
271        }
272
273        let val = match parse_json(&line) {
274            Ok(val) => val,
275            Err(err) if err.is_eof() => {
276                // We got an EOF error, this is most likely some very long line,
277                // we don't produce lines this bing is our test cases, so we'll
278                // just skip the error - as if it wasn't a JSON string.
279                error!(
280                    "The JSON line we just got was incomplete, most likely it was too long, so we're skipping it"
281                );
282                continue;
283            }
284            Err(err) => return Err(err.into()),
285        };
286
287        match predicate(val) {
288            FlowControlCommand::GoOn => {
289                // Not what we were looking for, go on.
290            }
291            FlowControlCommand::Terminate => {
292                // We are told we should stop, request that log reader is
293                // killed.
294                // This doesn't immediately stop the reading because we want to
295                // process the pending buffers first.
296                log_reader.kill().await?;
297            }
298        }
299    }
300
301    // Ensure log reader exited.
302    log_reader.wait().await.expect("log reader wait failed");
303
304    Ok(())
305}
306
307/// Create a pod for our other pods to have an affinity to ensure they are all deployed on
308/// the same node.
309pub async fn create_affinity_pod(
310    framework: &Framework,
311    namespace: &str,
312    affinity_label: &str,
313) -> Result<Manager<CommandBuilder>, Box<dyn std::error::Error>> {
314    let test_pod = framework
315        .test_pod(test_pod::Config::from_pod(&make_test_pod(
316            namespace,
317            "affinity-pod",
318            "tail -f /dev/null",
319            vec![(affinity_label, "yes")],
320            vec![],
321        ))?)
322        .await?;
323    framework
324        .wait(
325            namespace,
326            vec!["pods/affinity-pod"],
327            WaitFor::Condition("initialized"),
328            vec!["--timeout=60s"],
329        )
330        .await?;
331
332    Ok(test_pod)
333}