vdev/testing/
integration.rs

1use std::{
2    path::{Path, PathBuf},
3    process::Command,
4};
5
6use anyhow::{Context, Result, bail};
7
8use super::{
9    config::{
10        ComposeConfig, ComposeTestConfig, E2E_TESTS_DIR, INTEGRATION_TESTS_DIR, RustToolchainConfig,
11    },
12    runner::{ContainerTestRunner as _, IntegrationTestRunner, TestRunner as _},
13};
14use crate::{
15    app::CommandExt as _,
16    testing::{
17        build::ALL_INTEGRATIONS_FEATURE_FLAG,
18        docker::{CONTAINER_TOOL, DOCKER_SOCKET},
19    },
20    utils::environment::{Environment, extract_present, rename_environment_keys},
21};
22
23const NETWORK_ENV_VAR: &str = "VECTOR_NETWORK";
24const E2E_FEATURE_FLAG: &str = "all-e2e-tests";
25
26/// Check if a Docker image exists locally
27fn docker_image_exists(image_name: &str) -> Result<bool> {
28    use crate::testing::docker::docker_command;
29    let output =
30        docker_command(["images", "--format", "{{.Repository}}:{{.Tag}}"]).check_output()?;
31    Ok(output.lines().any(|line| line == image_name))
32}
33
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35pub(crate) enum ComposeTestKind {
36    E2E,
37    Integration,
38}
39
40#[derive(Clone, Copy, Debug)]
41pub(crate) struct ComposeTestLocalConfig {
42    pub(crate) kind: ComposeTestKind,
43    pub(crate) directory: &'static str,
44    pub(crate) feature_flag: &'static str,
45}
46
47impl ComposeTestLocalConfig {
48    /// Integration tests are located in the `tests/integration` dir,
49    /// and are the full feature flag is `all-integration-tests`.
50    pub(crate) fn integration() -> Self {
51        Self {
52            kind: ComposeTestKind::Integration,
53            directory: INTEGRATION_TESTS_DIR,
54            feature_flag: ALL_INTEGRATIONS_FEATURE_FLAG,
55        }
56    }
57
58    /// E2E tests are located in the `tests/e2e` dir,
59    /// and the full feature flag is `all-e2e-tests`.
60    pub(crate) fn e2e() -> Self {
61        Self {
62            kind: ComposeTestKind::E2E,
63            directory: E2E_TESTS_DIR,
64            feature_flag: E2E_FEATURE_FLAG,
65        }
66    }
67}
68
69#[derive(Debug)]
70pub(crate) struct ComposeTest {
71    local_config: ComposeTestLocalConfig,
72    test_name: String,
73    environment: String,
74    config: ComposeTestConfig,
75    runner: IntegrationTestRunner,
76    compose: Option<Compose>,
77    env_config: Environment,
78    retries: u8,
79    coverage: bool,
80}
81
82impl ComposeTest {
83    pub(crate) fn generate(
84        local_config: ComposeTestLocalConfig,
85        test_name: impl Into<String>,
86        environment: impl Into<String>,
87        retries: u8,
88        coverage: bool,
89    ) -> Result<ComposeTest> {
90        let test_name: String = test_name.into();
91        let environment = environment.into();
92        let (test_dir, config) = ComposeTestConfig::load(local_config.directory, &test_name)?;
93        let Some(mut env_config) = config.environments().get(&environment).cloned() else {
94            bail!("Could not find environment named {environment:?}");
95        };
96
97        let network_name = format!("vector-integration-tests-{test_name}");
98        let compose = Compose::new(test_dir, env_config.clone(), network_name.clone())?;
99
100        // Auto-detect: If shared image exists, use it. Otherwise use per-test image.
101        // Shared image: vector-test-runner-1.90:latest (compiled with all-integration-tests)
102        // Per-test image: vector-test-runner-clickhouse-1.90:latest (compiled with specific features)
103        let shared_image_name = format!(
104            "vector-test-runner-{}:latest",
105            RustToolchainConfig::rust_version()
106        );
107        let runner_name = if docker_image_exists(&shared_image_name).unwrap_or(false) {
108            info!("Using shared runner image: {shared_image_name}");
109            None
110        } else {
111            info!("Shared runner image not found, will build image for: {test_name}");
112            Some(test_name.clone())
113        };
114
115        let runner = IntegrationTestRunner::new(
116            runner_name,
117            &config.runner,
118            compose.is_some().then_some(network_name),
119        )?;
120
121        env_config.insert("VECTOR_IMAGE".to_string(), Some(runner.image_name()));
122
123        let compose_test = ComposeTest {
124            local_config,
125            test_name,
126            environment,
127            config,
128            runner,
129            compose,
130            env_config: rename_environment_keys(&env_config),
131            retries,
132            coverage,
133        };
134        trace!("Generated {compose_test:#?}");
135        Ok(compose_test)
136    }
137
138    fn project_name(&self) -> String {
139        // Docker Compose project names must consist only of lowercase alphanumeric characters,
140        // hyphens, and underscores. Replace any dots with hyphens.
141        let sanitized_env = self.environment.replace('.', "-");
142        format!(
143            "vector-{}-{}-{}",
144            self.local_config.directory, self.test_name, sanitized_env
145        )
146    }
147
148    fn is_running(&self) -> Result<bool> {
149        let Some(compose) = &self.compose else {
150            return Ok(false);
151        };
152
153        let output = Command::new(CONTAINER_TOOL.clone())
154            .args([
155                "compose",
156                "--project-name",
157                &self.project_name(),
158                "ps",
159                "--format",
160                "json",
161                "--status",
162                "running",
163            ])
164            .current_dir(&compose.test_dir)
165            .envs(
166                compose
167                    .env
168                    .iter()
169                    .filter_map(|(k, v)| v.as_ref().map(|val| (k, val))),
170            )
171            .output()
172            .with_context(|| "Failed to check if compose environment is running")?;
173
174        // If stdout is empty or "[]", no containers are running
175        Ok(!output.stdout.is_empty() && output.stdout != b"[]\n" && output.stdout != b"[]")
176    }
177
178    pub(crate) fn test(&self, extra_args: Vec<String>) -> Result<()> {
179        let was_running = self.is_running()?;
180        self.config.check_required()?;
181
182        if !was_running {
183            self.start()?;
184        }
185
186        let mut env_vars = self.config.env.clone();
187        // Make sure the test runner has the same config environment vars as the services do.
188        for (key, value) in self.env_config.clone() {
189            env_vars.insert(key, value);
190        }
191
192        env_vars.insert("VECTOR_LOG".to_string(), Some("info".into()));
193        let mut args = self.config.args.clone().unwrap_or_default();
194
195        args.push("--features".to_string());
196
197        // If using shared runner: use 'all-integration-tests' or 'all-e2e-tests'
198        // If using per-test runner: use test-specific features from test.yaml
199        args.push(if self.runner.is_shared_runner() {
200            format!("{},vendored", self.local_config.feature_flag)
201        } else {
202            format!("{},vendored", self.config.features.join(","))
203        });
204
205        // If the test field is not present then use the --lib flag
206        match self.config.test {
207            Some(ref test_arg) => {
208                args.push("--test".to_string());
209                args.push(test_arg.clone());
210            }
211            None => args.push("--lib".to_string()),
212        }
213
214        // Ensure the test_filter args are passed as well
215        if let Some(ref filter) = self.config.test_filter {
216            args.push(filter.clone());
217        }
218        args.extend(extra_args);
219
220        // Some arguments are not compatible with the --no-capture arg
221        if !args.contains(&"--test-threads".to_string()) {
222            args.push("--no-capture".to_string());
223        }
224
225        if self.retries > 0 {
226            args.push("--retries".to_string());
227            args.push(self.retries.to_string());
228        }
229
230        self.runner.test(
231            &env_vars,
232            &self.config.runner.env,
233            Some(&self.config.features),
234            &args,
235            self.local_config.kind == ComposeTestKind::E2E,
236            self.coverage,
237            if self.coverage {
238                Some(self.environment.as_str())
239            } else {
240                None
241            },
242        )?;
243
244        Ok(())
245    }
246
247    pub(crate) fn start(&self) -> Result<()> {
248        // For end-to-end tests, we want to run vector as a service, leveraging the
249        // image for the runner. So we must build that image before starting the
250        // compose so that it is available.
251        if self.local_config.kind == ComposeTestKind::E2E {
252            self.runner.build(
253                Some(&self.config.features),
254                &self.env_config,
255                true, // E2E tests build Vector in the image
256            )?;
257        }
258
259        self.config.check_required()?;
260        if let Some(compose) = &self.compose {
261            self.runner.ensure_network()?;
262            self.runner.ensure_external_volumes()?;
263
264            if self.is_running()? {
265                bail!("environment is already up");
266            }
267
268            let project_name = self.project_name();
269            compose.start(&self.env_config, &project_name)?;
270        }
271        Ok(())
272    }
273
274    pub(crate) fn stop(&self) -> Result<()> {
275        if let Some(compose) = &self.compose {
276            if !self.is_running()? {
277                bail!("No environment for {} is up.", self.test_name);
278            }
279
280            let project_name = self.project_name();
281            compose.stop(&self.env_config, &project_name)?;
282        }
283
284        self.runner.remove()?;
285
286        Ok(())
287    }
288}
289
290#[derive(Debug)]
291struct Compose {
292    yaml_path: PathBuf,
293    test_dir: PathBuf,
294    env: Environment,
295    #[cfg_attr(target_family = "windows", allow(dead_code))]
296    config: ComposeConfig,
297    network: String,
298}
299
300impl Compose {
301    fn new(test_dir: PathBuf, env: Environment, network: String) -> Result<Option<Self>> {
302        let yaml_path: PathBuf = [&test_dir, Path::new("compose.yaml")].iter().collect();
303
304        match yaml_path.try_exists() {
305            Err(error) => {
306                Err(error).with_context(|| format!("Could not lookup {}", yaml_path.display()))
307            }
308            Ok(false) => Ok(None),
309            Ok(true) => {
310                // Parse config only for unix volume permission checking
311                let config = ComposeConfig::parse(&yaml_path)?;
312
313                Ok(Some(Self {
314                    yaml_path,
315                    test_dir,
316                    env,
317                    config,
318                    network,
319                }))
320            }
321        }
322    }
323
324    fn start(&self, environment: &Environment, project_name: &str) -> Result<()> {
325        #[cfg(unix)]
326        unix::prepare_compose_volumes(&self.config, &self.test_dir, environment)?;
327
328        self.run(
329            "Starting",
330            &["up", "--detach"],
331            Some(environment),
332            project_name,
333        )
334    }
335
336    fn stop(&self, environment: &Environment, project_name: &str) -> Result<()> {
337        self.run(
338            "Stopping",
339            &["down", "--timeout", "0", "--volumes", "--remove-orphans"],
340            Some(environment),
341            project_name,
342        )
343    }
344
345    fn run(
346        &self,
347        action: &str,
348        args: &[&'static str],
349        environment: Option<&Environment>,
350        project_name: &str,
351    ) -> Result<()> {
352        let mut command = Command::new(CONTAINER_TOOL.clone());
353        command.arg("compose");
354        command.arg("--project-name");
355        command.arg(project_name);
356        command.arg("--file");
357        command.arg(&self.yaml_path);
358
359        command.args(args);
360
361        command.current_dir(&self.test_dir);
362
363        command.env("DOCKER_SOCKET", &*DOCKER_SOCKET);
364        command.env(NETWORK_ENV_VAR, &self.network);
365
366        // some services require this in order to build Vector
367        command.env("RUST_VERSION", RustToolchainConfig::rust_version());
368
369        for (key, value) in &self.env {
370            if let Some(value) = value {
371                command.env(key, value);
372            }
373        }
374        if let Some(environment) = environment {
375            command.envs(extract_present(environment));
376        }
377
378        waiting!("{action} service environment");
379        command.check_run()
380    }
381}
382
383#[cfg(unix)]
384mod unix {
385    use std::{
386        fs::{self, Metadata, Permissions},
387        os::unix::fs::PermissionsExt as _,
388        path::{Path, PathBuf},
389    };
390
391    use anyhow::{Context, Result};
392
393    use super::super::config::ComposeConfig;
394    use crate::{
395        testing::config::VolumeMount,
396        utils::environment::{Environment, resolve_placeholders},
397    };
398
399    /// Unix permissions mask to allow everybody to read a file
400    const ALL_READ: u32 = 0o444;
401    /// Unix permissions mask to allow everybody to read a directory
402    const ALL_READ_DIR: u32 = 0o555;
403
404    /// Fix up potential issues before starting a compose container
405    pub fn prepare_compose_volumes(
406        config: &ComposeConfig,
407        test_dir: &Path,
408        environment: &Environment,
409    ) -> Result<()> {
410        for service in config.services.values() {
411            if let Some(volumes) = &service.volumes {
412                for volume in volumes {
413                    let source = match volume {
414                        VolumeMount::Short(s) => {
415                            s.split_once(':').map(|(s, _)| s).ok_or_else(|| {
416                                anyhow::anyhow!("Invalid short volume mount format: {s}")
417                            })?
418                        }
419                        VolumeMount::Long { source, .. } => source,
420                    };
421                    let source = resolve_placeholders(source, environment);
422                    if !config.volumes.contains_key(&source)
423                        && !source.starts_with('/')
424                        && !source.starts_with('$')
425                    {
426                        let path: PathBuf = [test_dir, Path::new(&source)].iter().collect();
427                        add_read_permission(&path)?;
428                    }
429                }
430            }
431        }
432        Ok(())
433    }
434
435    /// Recursively add read permissions to the
436    fn add_read_permission(path: &Path) -> Result<()> {
437        let metadata = path
438            .metadata()
439            .with_context(|| format!("Could not get permissions on {}", path.display()))?;
440
441        if metadata.is_file() {
442            add_permission(path, &metadata, ALL_READ)
443        } else {
444            if metadata.is_dir() {
445                add_permission(path, &metadata, ALL_READ_DIR)?;
446                for entry in fs::read_dir(path)
447                    .with_context(|| format!("Could not read directory {}", path.display()))?
448                {
449                    let entry = entry.with_context(|| {
450                        format!("Could not read directory entry in {}", path.display())
451                    })?;
452                    add_read_permission(&entry.path())?;
453                }
454            }
455            Ok(())
456        }
457    }
458
459    fn add_permission(path: &Path, metadata: &Metadata, bits: u32) -> Result<()> {
460        let perms = metadata.permissions();
461        let new_perms = Permissions::from_mode(perms.mode() | bits);
462        if new_perms != perms {
463            fs::set_permissions(path, new_perms)
464                .with_context(|| format!("Could not set permissions on {}", path.display()))?;
465        }
466        Ok(())
467    }
468}