vdev/testing/
integration.rs

1use std::{collections::BTreeMap, fs, path::Path, path::PathBuf, process::Command};
2
3use anyhow::{bail, Context, Result};
4use tempfile::{Builder, NamedTempFile};
5
6use super::config::{
7    ComposeConfig, ComposeTestConfig, Environment, RustToolchainConfig, E2E_TESTS_DIR,
8    INTEGRATION_TESTS_DIR,
9};
10use super::runner::{ContainerTestRunner as _, IntegrationTestRunner, TestRunner as _};
11use super::state::EnvsDir;
12use crate::app::CommandExt as _;
13use crate::testing::build::ALL_INTEGRATIONS_FEATURE_FLAG;
14use crate::testing::docker::{CONTAINER_TOOL, DOCKER_SOCKET};
15
16const NETWORK_ENV_VAR: &str = "VECTOR_NETWORK";
17const E2E_FEATURE_FLAG: &str = "all-e2e-tests";
18
19#[derive(Clone, Copy, PartialEq, Eq)]
20pub(crate) enum ComposeTestKind {
21    E2E,
22    Integration,
23}
24
25#[derive(Clone, Copy)]
26pub(crate) struct ComposeTestLocalConfig {
27    pub(crate) kind: ComposeTestKind,
28    pub(crate) directory: &'static str,
29    pub(crate) feature_flag: &'static str,
30}
31
32impl ComposeTestLocalConfig {
33    /// Integration tests are located in the `scripts/integration` dir,
34    /// and are the full feature flag is `all-integration-tests`.
35    pub(crate) fn integration() -> Self {
36        Self {
37            kind: ComposeTestKind::Integration,
38            directory: INTEGRATION_TESTS_DIR,
39            feature_flag: ALL_INTEGRATIONS_FEATURE_FLAG,
40        }
41    }
42
43    /// E2E tests are located in the `scripts/e2e` dir,
44    /// and are the full feature flag is `all-e2e-tests`.
45    pub(crate) fn e2e() -> Self {
46        Self {
47            kind: ComposeTestKind::E2E,
48            directory: E2E_TESTS_DIR,
49            feature_flag: E2E_FEATURE_FLAG,
50        }
51    }
52}
53
54pub(crate) struct ComposeTest {
55    local_config: ComposeTestLocalConfig,
56    test_name: String,
57    environment: String,
58    config: ComposeTestConfig,
59    envs_dir: EnvsDir,
60    runner: IntegrationTestRunner,
61    compose: Option<Compose>,
62    env_config: Environment,
63    build_all: bool,
64    retries: u8,
65}
66
67impl ComposeTest {
68    pub(crate) fn generate(
69        local_config: ComposeTestLocalConfig,
70        test_name: impl Into<String>,
71        environment: impl Into<String>,
72        build_all: bool,
73        retries: u8,
74    ) -> Result<ComposeTest> {
75        let test_name: String = test_name.into();
76        let environment = environment.into();
77        let (test_dir, config) = ComposeTestConfig::load(local_config.directory, &test_name)?;
78        let envs_dir = EnvsDir::new(&test_name);
79        let Some(mut env_config) = config.environments().get(&environment).cloned() else {
80            bail!("Could not find environment named {environment:?}");
81        };
82
83        let network_name = format!("vector-integration-tests-{test_name}");
84        let compose = Compose::new(test_dir, env_config.clone(), network_name.clone())?;
85
86        // None if compiling with all integration test feature flag.
87        let runner_name = (!build_all).then(|| test_name.clone());
88
89        let runner = IntegrationTestRunner::new(
90            runner_name,
91            &config.runner,
92            compose.is_some().then_some(network_name),
93        )?;
94
95        env_config.insert("VECTOR_IMAGE".to_string(), Some(runner.image_name()));
96
97        Ok(ComposeTest {
98            local_config,
99            test_name,
100            environment,
101            config,
102            envs_dir,
103            runner,
104            compose,
105            env_config,
106            build_all,
107            retries,
108        })
109    }
110
111    pub(crate) fn test(&self, extra_args: Vec<String>) -> Result<()> {
112        let active = self.envs_dir.check_active(&self.environment)?;
113        self.config.check_required()?;
114
115        if !active {
116            self.start()?;
117        }
118
119        let mut env_vars = self.config.env.clone();
120        // Make sure the test runner has the same config environment vars as the services do.
121        for (key, value) in config_env(&self.env_config) {
122            env_vars.insert(key, Some(value));
123        }
124
125        env_vars.insert("TEST_LOG".to_string(), Some("info".into()));
126        let mut args = self.config.args.clone().unwrap_or_default();
127
128        args.push("--features".to_string());
129
130        args.push(if self.build_all {
131            self.local_config.feature_flag.to_string()
132        } else {
133            self.config.features.join(",")
134        });
135
136        // If the test field is not present then use the --lib flag
137        match self.config.test {
138            Some(ref test_arg) => {
139                args.push("--test".to_string());
140                args.push(test_arg.to_string());
141            }
142            None => args.push("--lib".to_string()),
143        }
144
145        // Ensure the test_filter args are passed as well
146        if let Some(ref filter) = self.config.test_filter {
147            args.push(filter.to_string());
148        }
149        args.extend(extra_args);
150
151        // Some arguments are not compatible with the --no-capture arg
152        if !args.contains(&"--test-threads".to_string()) {
153            args.push("--no-capture".to_string());
154        }
155
156        if self.retries > 0 {
157            args.push("--retries".to_string());
158            args.push(self.retries.to_string());
159        }
160
161        self.runner.test(
162            &env_vars,
163            &self.config.runner.env,
164            Some(&self.config.features),
165            &args,
166            self.local_config.directory,
167        )?;
168
169        if !active {
170            self.runner.remove()?;
171            self.stop()?;
172        }
173        Ok(())
174    }
175
176    pub(crate) fn start(&self) -> Result<()> {
177        // For end-to-end tests, we want to run vector as a service, leveraging the
178        // image for the runner. So we must build that image before starting the
179        // compose so that it is available.
180        if self.local_config.kind == ComposeTestKind::E2E {
181            self.runner
182                .build(Some(&self.config.features), self.local_config.directory)?;
183        }
184
185        self.config.check_required()?;
186        if let Some(compose) = &self.compose {
187            self.runner.ensure_network()?;
188
189            if self.envs_dir.check_active(&self.environment)? {
190                bail!("environment is already up");
191            }
192
193            compose.start(&self.env_config)?;
194
195            self.envs_dir.save(&self.environment, &self.env_config)
196        } else {
197            Ok(())
198        }
199    }
200
201    pub(crate) fn stop(&self) -> Result<()> {
202        if let Some(compose) = &self.compose {
203            // TODO: Is this check really needed?
204            if self.envs_dir.load()?.is_none() {
205                bail!("No environment for {} is up.", self.test_name);
206            }
207
208            self.runner.remove()?;
209            compose.stop()?;
210            self.envs_dir.remove()?;
211        }
212
213        Ok(())
214    }
215}
216
217struct Compose {
218    original_path: PathBuf,
219    test_dir: PathBuf,
220    env: Environment,
221    #[cfg_attr(target_family = "windows", allow(dead_code))]
222    config: ComposeConfig,
223    network: String,
224    temp_file: NamedTempFile,
225}
226
227impl Compose {
228    fn new(test_dir: PathBuf, env: Environment, network: String) -> Result<Option<Self>> {
229        let original_path: PathBuf = [&test_dir, Path::new("compose.yaml")].iter().collect();
230
231        match original_path.try_exists() {
232            Err(error) => {
233                Err(error).with_context(|| format!("Could not lookup {}", original_path.display()))
234            }
235            Ok(false) => Ok(None),
236            Ok(true) => {
237                let mut config = ComposeConfig::parse(&original_path)?;
238                // Inject the networks block
239                config.networks.insert(
240                    "default".to_string(),
241                    BTreeMap::from_iter([
242                        ("name".to_string(), network.clone()),
243                        ("external".to_string(), "true".to_string()),
244                    ]),
245                );
246
247                // Create a named tempfile, there may be resource leakage here in case of SIGINT
248                // Tried tempfile::tempfile() but this returns a File object without a usable path
249                // https://docs.rs/tempfile/latest/tempfile/#resource-leaking
250                let temp_file = Builder::new()
251                    .prefix("compose-temp-")
252                    .suffix(".yaml")
253                    .tempfile_in(&test_dir)
254                    .with_context(|| "Failed to create temporary compose file")?;
255
256                fs::write(
257                    temp_file.path(),
258                    serde_yaml::to_string(&config)
259                        .with_context(|| "Failed to serialize modified compose.yaml")?,
260                )?;
261
262                Ok(Some(Self {
263                    original_path,
264                    test_dir,
265                    env,
266                    config,
267                    network,
268                    temp_file,
269                }))
270            }
271        }
272    }
273
274    fn start(&self, config: &Environment) -> Result<()> {
275        self.prepare()?;
276        self.run("Starting", &["up", "--detach"], Some(config))
277    }
278
279    fn stop(&self) -> Result<()> {
280        // The config settings are not needed when stopping a compose setup.
281        self.run(
282            "Stopping",
283            &["down", "--timeout", "0", "--volumes", "--remove-orphans"],
284            None,
285        )
286    }
287
288    fn run(&self, action: &str, args: &[&'static str], config: Option<&Environment>) -> Result<()> {
289        let mut command = Command::new(CONTAINER_TOOL.clone());
290        command.arg("compose");
291        // When the integration test environment is already active, the tempfile path does not
292        // exist because `Compose::new()` has not been called. In this case, the `stop` command
293        // needs to use the calculated path from the integration name instead of the nonexistent
294        // tempfile path. This is because `stop` doesn't go through the same logic as `start`
295        // and doesn't create a new tempfile before calling docker compose.
296        // If stop command needs to use some of the injected bits then we need to rebuild it
297        command.arg("--file");
298        if self.temp_file.path().exists() {
299            command.arg(self.temp_file.path());
300        } else {
301            command.arg(&self.original_path);
302        }
303
304        command.args(args);
305
306        command.current_dir(&self.test_dir);
307
308        command.env("DOCKER_SOCKET", &*DOCKER_SOCKET);
309        command.env(NETWORK_ENV_VAR, &self.network);
310
311        // some services require this in order to build Vector
312        command.env("RUST_VERSION", RustToolchainConfig::rust_version());
313
314        for (key, value) in &self.env {
315            if let Some(value) = value {
316                command.env(key, value);
317            }
318        }
319        if let Some(config) = config {
320            command.envs(config_env(config));
321        }
322
323        waiting!("{action} service environment");
324        command.check_run()
325    }
326
327    fn prepare(&self) -> Result<()> {
328        #[cfg(unix)]
329        unix::prepare_compose_volumes(&self.config, &self.test_dir)?;
330        Ok(())
331    }
332}
333
334fn config_env(config: &Environment) -> impl Iterator<Item = (String, String)> + '_ {
335    config.iter().filter_map(|(var, value)| {
336        value.as_ref().map(|value| {
337            (
338                format!("CONFIG_{}", var.replace('-', "_").to_uppercase()),
339                value.to_string(),
340            )
341        })
342    })
343}
344
345#[cfg(unix)]
346mod unix {
347    use std::fs::{self, Metadata, Permissions};
348    use std::os::unix::fs::PermissionsExt as _;
349    use std::path::{Path, PathBuf};
350
351    use super::super::config::ComposeConfig;
352    use crate::testing::config::VolumeMount;
353    use anyhow::{Context, Result};
354
355    /// Unix permissions mask to allow everybody to read a file
356    const ALL_READ: u32 = 0o444;
357    /// Unix permissions mask to allow everybody to read a directory
358    const ALL_READ_DIR: u32 = 0o555;
359
360    /// Fix up potential issues before starting a compose container
361    pub fn prepare_compose_volumes(config: &ComposeConfig, test_dir: &Path) -> Result<()> {
362        for service in config.services.values() {
363            if let Some(volumes) = &service.volumes {
364                for volume in volumes {
365                    let source = match volume {
366                        VolumeMount::Short(s) => {
367                            s.split_once(':').map(|(s, _)| s).ok_or_else(|| {
368                                anyhow::anyhow!("Invalid short volume mount format: {s}")
369                            })?
370                        }
371                        VolumeMount::Long { source, .. } => source,
372                    };
373
374                    if !config.volumes.contains_key(source)
375                        && !source.starts_with('/')
376                        && !source.starts_with('$')
377                    {
378                        let path: PathBuf = [test_dir, Path::new(source)].iter().collect();
379                        add_read_permission(&path)?;
380                    }
381                }
382            }
383        }
384        Ok(())
385    }
386
387    /// Recursively add read permissions to the
388    fn add_read_permission(path: &Path) -> Result<()> {
389        let metadata = path
390            .metadata()
391            .with_context(|| format!("Could not get permissions on {}", path.display()))?;
392
393        if metadata.is_file() {
394            add_permission(path, &metadata, ALL_READ)
395        } else {
396            if metadata.is_dir() {
397                add_permission(path, &metadata, ALL_READ_DIR)?;
398                for entry in fs::read_dir(path)
399                    .with_context(|| format!("Could not read directory {}", path.display()))?
400                {
401                    let entry = entry.with_context(|| {
402                        format!("Could not read directory entry in {}", path.display())
403                    })?;
404                    add_read_permission(&entry.path())?;
405                }
406            }
407            Ok(())
408        }
409    }
410
411    fn add_permission(path: &Path, metadata: &Metadata, bits: u32) -> Result<()> {
412        let perms = metadata.permissions();
413        let new_perms = Permissions::from_mode(perms.mode() | bits);
414        if new_perms != perms {
415            fs::set_permissions(path, new_perms)
416                .with_context(|| format!("Could not set permissions on {}", path.display()))?;
417        }
418        Ok(())
419    }
420}