vdev/testing/
integration.rs

1use std::{
2    collections::BTreeMap,
3    fs,
4    path::{Path, PathBuf},
5    process::Command,
6};
7
8use anyhow::{Context, Result, bail};
9use tempfile::{Builder, NamedTempFile};
10
11use super::{
12    config::{
13        ComposeConfig, ComposeTestConfig, E2E_TESTS_DIR, INTEGRATION_TESTS_DIR, RustToolchainConfig,
14    },
15    runner::{ContainerTestRunner as _, IntegrationTestRunner, TestRunner as _},
16    state::EnvsDir,
17};
18use crate::{
19    app::CommandExt as _,
20    environment::{Environment, extract_present, rename_environment_keys},
21    testing::{
22        build::ALL_INTEGRATIONS_FEATURE_FLAG,
23        docker::{CONTAINER_TOOL, DOCKER_SOCKET},
24    },
25};
26
27const NETWORK_ENV_VAR: &str = "VECTOR_NETWORK";
28const E2E_FEATURE_FLAG: &str = "all-e2e-tests";
29
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
31pub(crate) enum ComposeTestKind {
32    E2E,
33    Integration,
34}
35
36#[derive(Clone, Copy, Debug)]
37pub(crate) struct ComposeTestLocalConfig {
38    pub(crate) kind: ComposeTestKind,
39    pub(crate) directory: &'static str,
40    pub(crate) feature_flag: &'static str,
41}
42
43impl ComposeTestLocalConfig {
44    /// Integration tests are located in the `scripts/integration` dir,
45    /// and are the full feature flag is `all-integration-tests`.
46    pub(crate) fn integration() -> Self {
47        Self {
48            kind: ComposeTestKind::Integration,
49            directory: INTEGRATION_TESTS_DIR,
50            feature_flag: ALL_INTEGRATIONS_FEATURE_FLAG,
51        }
52    }
53
54    /// E2E tests are located in the `scripts/e2e` dir,
55    /// and are the full feature flag is `all-e2e-tests`.
56    pub(crate) fn e2e() -> Self {
57        Self {
58            kind: ComposeTestKind::E2E,
59            directory: E2E_TESTS_DIR,
60            feature_flag: E2E_FEATURE_FLAG,
61        }
62    }
63}
64
65#[derive(Debug)]
66pub(crate) struct ComposeTest {
67    local_config: ComposeTestLocalConfig,
68    test_name: String,
69    environment: String,
70    config: ComposeTestConfig,
71    envs_dir: EnvsDir,
72    runner: IntegrationTestRunner,
73    compose: Option<Compose>,
74    env_config: Environment,
75    build_all: bool,
76    retries: u8,
77}
78
79impl ComposeTest {
80    pub(crate) fn generate(
81        local_config: ComposeTestLocalConfig,
82        test_name: impl Into<String>,
83        environment: impl Into<String>,
84        build_all: bool,
85        retries: u8,
86    ) -> Result<ComposeTest> {
87        let test_name: String = test_name.into();
88        let environment = environment.into();
89        let (test_dir, config) = ComposeTestConfig::load(local_config.directory, &test_name)?;
90        let envs_dir = EnvsDir::new(&test_name);
91        let Some(mut env_config) = config.environments().get(&environment).cloned() else {
92            bail!("Could not find environment named {environment:?}");
93        };
94
95        let network_name = format!("vector-integration-tests-{test_name}");
96        let compose = Compose::new(test_dir, env_config.clone(), network_name.clone())?;
97
98        // None if compiling with all integration test feature flag.
99        let runner_name = (!build_all).then(|| test_name.clone());
100
101        let runner = IntegrationTestRunner::new(
102            runner_name,
103            &config.runner,
104            compose.is_some().then_some(network_name),
105        )?;
106
107        env_config.insert("VECTOR_IMAGE".to_string(), Some(runner.image_name()));
108
109        let compose_test = ComposeTest {
110            local_config,
111            test_name,
112            environment,
113            config,
114            envs_dir,
115            runner,
116            compose,
117            env_config: rename_environment_keys(&env_config),
118            build_all,
119            retries,
120        };
121        trace!("Generated {compose_test:#?}");
122        Ok(compose_test)
123    }
124
125    pub(crate) fn test(&self, extra_args: Vec<String>) -> Result<()> {
126        let active = self.envs_dir.check_active(&self.environment)?;
127        self.config.check_required()?;
128
129        if !active {
130            self.start()?;
131        }
132
133        let mut env_vars = self.config.env.clone();
134        // Make sure the test runner has the same config environment vars as the services do.
135        for (key, value) in self.env_config.clone() {
136            env_vars.insert(key, value);
137        }
138
139        env_vars.insert("TEST_LOG".to_string(), Some("info".into()));
140        let mut args = self.config.args.clone().unwrap_or_default();
141
142        args.push("--features".to_string());
143
144        args.push(if self.build_all {
145            self.local_config.feature_flag.to_string()
146        } else {
147            self.config.features.join(",")
148        });
149
150        // If the test field is not present then use the --lib flag
151        match self.config.test {
152            Some(ref test_arg) => {
153                args.push("--test".to_string());
154                args.push(test_arg.to_string());
155            }
156            None => args.push("--lib".to_string()),
157        }
158
159        // Ensure the test_filter args are passed as well
160        if let Some(ref filter) = self.config.test_filter {
161            args.push(filter.to_string());
162        }
163        args.extend(extra_args);
164
165        // Some arguments are not compatible with the --no-capture arg
166        if !args.contains(&"--test-threads".to_string()) {
167            args.push("--no-capture".to_string());
168        }
169
170        if self.retries > 0 {
171            args.push("--retries".to_string());
172            args.push(self.retries.to_string());
173        }
174
175        self.runner.test(
176            &env_vars,
177            &self.config.runner.env,
178            Some(&self.config.features),
179            &args,
180            self.local_config.directory,
181        )?;
182
183        if !active {
184            self.runner.remove()?;
185            self.stop()?;
186        }
187        Ok(())
188    }
189
190    pub(crate) fn start(&self) -> Result<()> {
191        // For end-to-end tests, we want to run vector as a service, leveraging the
192        // image for the runner. So we must build that image before starting the
193        // compose so that it is available.
194        if self.local_config.kind == ComposeTestKind::E2E {
195            self.runner.build(
196                Some(&self.config.features),
197                self.local_config.directory,
198                &self.env_config,
199            )?;
200        }
201
202        self.config.check_required()?;
203        if let Some(compose) = &self.compose {
204            self.runner.ensure_network()?;
205
206            if self.envs_dir.check_active(&self.environment)? {
207                bail!("environment is already up");
208            }
209
210            compose.start(&self.env_config)?;
211
212            self.envs_dir.save(&self.environment, &self.env_config)
213        } else {
214            Ok(())
215        }
216    }
217
218    pub(crate) fn stop(&self) -> Result<()> {
219        if let Some(compose) = &self.compose {
220            // TODO: Is this check really needed?
221            if self.envs_dir.load()?.is_none() {
222                bail!("No environment for {} is up.", self.test_name);
223            }
224
225            self.runner.remove()?;
226            compose.stop()?;
227            self.envs_dir.remove()?;
228        }
229
230        Ok(())
231    }
232}
233
234#[derive(Debug)]
235struct Compose {
236    original_path: PathBuf,
237    test_dir: PathBuf,
238    env: Environment,
239    #[cfg_attr(target_family = "windows", allow(dead_code))]
240    config: ComposeConfig,
241    network: String,
242    temp_file: NamedTempFile,
243}
244
245impl Compose {
246    fn new(test_dir: PathBuf, env: Environment, network: String) -> Result<Option<Self>> {
247        let original_path: PathBuf = [&test_dir, Path::new("compose.yaml")].iter().collect();
248
249        match original_path.try_exists() {
250            Err(error) => {
251                Err(error).with_context(|| format!("Could not lookup {}", original_path.display()))
252            }
253            Ok(false) => Ok(None),
254            Ok(true) => {
255                let mut config = ComposeConfig::parse(&original_path)?;
256                // Inject the networks block
257                config.networks.insert(
258                    "default".to_string(),
259                    BTreeMap::from_iter([
260                        ("name".to_string(), network.clone()),
261                        ("external".to_string(), "true".to_string()),
262                    ]),
263                );
264
265                // Create a named tempfile, there may be resource leakage here in case of SIGINT
266                // Tried tempfile::tempfile() but this returns a File object without a usable path
267                // https://docs.rs/tempfile/latest/tempfile/#resource-leaking
268                let temp_file = Builder::new()
269                    .prefix("compose-temp-")
270                    .suffix(".yaml")
271                    .tempfile_in(&test_dir)
272                    .with_context(|| "Failed to create temporary compose file")?;
273
274                fs::write(
275                    temp_file.path(),
276                    serde_yaml::to_string(&config)
277                        .with_context(|| "Failed to serialize modified compose.yaml")?,
278                )?;
279
280                Ok(Some(Self {
281                    original_path,
282                    test_dir,
283                    env,
284                    config,
285                    network,
286                    temp_file,
287                }))
288            }
289        }
290    }
291
292    fn start(&self, environment: &Environment) -> Result<()> {
293        #[cfg(unix)]
294        unix::prepare_compose_volumes(&self.config, &self.test_dir, environment)?;
295
296        self.run("Starting", &["up", "--detach"], Some(environment))
297    }
298
299    fn stop(&self) -> Result<()> {
300        // The config settings are not needed when stopping a compose setup.
301        self.run(
302            "Stopping",
303            &["down", "--timeout", "0", "--volumes", "--remove-orphans"],
304            None,
305        )
306    }
307
308    fn run(
309        &self,
310        action: &str,
311        args: &[&'static str],
312        environment: Option<&Environment>,
313    ) -> Result<()> {
314        let mut command = Command::new(CONTAINER_TOOL.clone());
315        command.arg("compose");
316        // When the integration test environment is already active, the tempfile path does not
317        // exist because `Compose::new()` has not been called. In this case, the `stop` command
318        // needs to use the calculated path from the integration name instead of the nonexistent
319        // tempfile path. This is because `stop` doesn't go through the same logic as `start`
320        // and doesn't create a new tempfile before calling docker compose.
321        // If stop command needs to use some of the injected bits then we need to rebuild it
322        command.arg("--file");
323        if self.temp_file.path().exists() {
324            command.arg(self.temp_file.path());
325        } else {
326            command.arg(&self.original_path);
327        }
328
329        command.args(args);
330
331        command.current_dir(&self.test_dir);
332
333        command.env("DOCKER_SOCKET", &*DOCKER_SOCKET);
334        command.env(NETWORK_ENV_VAR, &self.network);
335
336        // some services require this in order to build Vector
337        command.env("RUST_VERSION", RustToolchainConfig::rust_version());
338
339        for (key, value) in &self.env {
340            if let Some(value) = value {
341                command.env(key, value);
342            }
343        }
344        if let Some(environment) = environment {
345            command.envs(extract_present(environment));
346        }
347
348        waiting!("{action} service environment");
349        command.check_run()
350    }
351}
352
353#[cfg(unix)]
354mod unix {
355    use std::{
356        fs::{self, Metadata, Permissions},
357        os::unix::fs::PermissionsExt as _,
358        path::{Path, PathBuf},
359    };
360
361    use anyhow::{Context, Result};
362
363    use super::super::config::ComposeConfig;
364    use crate::{
365        environment::{Environment, resolve_placeholders},
366        testing::config::VolumeMount,
367    };
368
369    /// Unix permissions mask to allow everybody to read a file
370    const ALL_READ: u32 = 0o444;
371    /// Unix permissions mask to allow everybody to read a directory
372    const ALL_READ_DIR: u32 = 0o555;
373
374    /// Fix up potential issues before starting a compose container
375    pub fn prepare_compose_volumes(
376        config: &ComposeConfig,
377        test_dir: &Path,
378        environment: &Environment,
379    ) -> Result<()> {
380        for service in config.services.values() {
381            if let Some(volumes) = &service.volumes {
382                for volume in volumes {
383                    let source = match volume {
384                        VolumeMount::Short(s) => {
385                            s.split_once(':').map(|(s, _)| s).ok_or_else(|| {
386                                anyhow::anyhow!("Invalid short volume mount format: {s}")
387                            })?
388                        }
389                        VolumeMount::Long { source, .. } => source,
390                    };
391                    let source = resolve_placeholders(source, environment);
392                    if !config.volumes.contains_key(&source)
393                        && !source.starts_with('/')
394                        && !source.starts_with('$')
395                    {
396                        let path: PathBuf = [test_dir, Path::new(&source)].iter().collect();
397                        add_read_permission(&path)?;
398                    }
399                }
400            }
401        }
402        Ok(())
403    }
404
405    /// Recursively add read permissions to the
406    fn add_read_permission(path: &Path) -> Result<()> {
407        let metadata = path
408            .metadata()
409            .with_context(|| format!("Could not get permissions on {}", path.display()))?;
410
411        if metadata.is_file() {
412            add_permission(path, &metadata, ALL_READ)
413        } else {
414            if metadata.is_dir() {
415                add_permission(path, &metadata, ALL_READ_DIR)?;
416                for entry in fs::read_dir(path)
417                    .with_context(|| format!("Could not read directory {}", path.display()))?
418                {
419                    let entry = entry.with_context(|| {
420                        format!("Could not read directory entry in {}", path.display())
421                    })?;
422                    add_read_permission(&entry.path())?;
423                }
424            }
425            Ok(())
426        }
427    }
428
429    fn add_permission(path: &Path, metadata: &Metadata, bits: u32) -> Result<()> {
430        let perms = metadata.permissions();
431        let new_perms = Permissions::from_mode(perms.mode() | bits);
432        if new_perms != perms {
433            fs::set_permissions(path, new_perms)
434                .with_context(|| format!("Could not set permissions on {}", path.display()))?;
435        }
436        Ok(())
437    }
438}