vdev/testing/
runner.rs

1use anyhow::Result;
2use std::{collections::HashSet, process::Command};
3
4use super::config::{IntegrationRunnerConfig, RustToolchainConfig};
5use crate::{
6    app::{self, CommandExt as _},
7    testing::{
8        build::prepare_build_command,
9        docker::{DOCKER_SOCKET, docker_command},
10        test_runner_dockerfile,
11    },
12    utils::{
13        IS_A_TTY,
14        command::ChainArgs as _,
15        environment::{Environment, append_environment_variables},
16    },
17};
18
19const MOUNT_PATH: &str = "/home/vector";
20const TARGET_PATH: &str = "/home/target";
21const VOLUME_TARGET: &str = "vector_target";
22const VOLUME_CARGO_GIT: &str = "vector_cargo_git";
23const VOLUME_CARGO_REGISTRY: &str = "vector_cargo_registry";
24const RUNNER_HOSTNAME: &str = "runner";
25const TEST_COMMAND: &[&str] = &[
26    "cargo",
27    "nextest",
28    "run",
29    "--no-fail-fast",
30    "--no-default-features",
31];
32const COVERAGE_COMMAND: &[&str] = &[
33    "cargo",
34    "llvm-cov",
35    "nextest",
36    "--workspace",
37    "--no-fail-fast",
38    "--no-default-features",
39];
40/// Coverage output path inside the test runner container.
41const COVERAGE_OUTPUT_DIR: &str = "/coverage";
42/// Coverage output path on the host (relative to project root).
43pub(crate) const LOCAL_COVERAGE_OUTPUT_DIR: &str = "target/coverage";
44
45pub enum RunnerState {
46    Running,
47    Restarting,
48    Created,
49    Exited,
50    Paused,
51    Dead,
52    Missing,
53    Unknown,
54}
55
56pub trait TestRunner {
57    #[allow(clippy::too_many_arguments)]
58    fn test(
59        &self,
60        outer_env: &Environment,
61        inner_env: &Environment,
62        features: Option<&[String]>,
63        args: &[String],
64        build: bool,
65        coverage: bool,
66        coverage_env: Option<&str>,
67    ) -> Result<()>;
68}
69
70/// Return the coverage output filename, optionally scoped to an environment.
71pub(crate) fn coverage_filename(coverage_env: Option<&str>) -> String {
72    match coverage_env {
73        Some(env) => format!("lcov-{env}.info"),
74        None => "lcov.info".to_string(),
75    }
76}
77
78pub trait ContainerTestRunner: TestRunner {
79    fn container_name(&self) -> String;
80
81    fn image_name(&self) -> String;
82
83    fn network_name(&self) -> Option<&str>;
84
85    fn needs_docker_socket(&self) -> bool;
86
87    fn volumes(&self) -> Vec<String>;
88
89    fn state(&self) -> Result<RunnerState> {
90        let mut command = docker_command(["ps", "-a", "--format", "{{.Names}} {{.State}}"]);
91        let container_name = self.container_name();
92
93        for line in command.check_output()?.lines() {
94            if let Some((name, state)) = line.split_once(' ')
95                && name == container_name
96            {
97                return Ok(if state == "created" {
98                    RunnerState::Created
99                } else if state == "dead" {
100                    RunnerState::Dead
101                } else if state == "exited" || state.starts_with("Exited ") {
102                    RunnerState::Exited
103                } else if state == "paused" {
104                    RunnerState::Paused
105                } else if state == "restarting" {
106                    RunnerState::Restarting
107                } else if state == "running" || state.starts_with("Up ") {
108                    RunnerState::Running
109                } else {
110                    RunnerState::Unknown
111                });
112            }
113        }
114
115        Ok(RunnerState::Missing)
116    }
117
118    fn ensure_running(
119        &self,
120        features: Option<&[String]>,
121        config_environment_variables: &Environment,
122        build: bool,
123    ) -> Result<()> {
124        match self.state()? {
125            RunnerState::Running | RunnerState::Restarting => (),
126            RunnerState::Created | RunnerState::Exited => self.start()?,
127            RunnerState::Paused => self.unpause()?,
128            RunnerState::Dead | RunnerState::Unknown => {
129                self.remove()?;
130                self.create()?;
131                self.start()?;
132            }
133            RunnerState::Missing => {
134                self.build(features, config_environment_variables, build)?;
135                self.create()?;
136                self.start()?;
137            }
138        }
139
140        Ok(())
141    }
142
143    fn ensure_volumes(&self) -> Result<()> {
144        let mut command = docker_command(["volume", "ls", "--format", "{{.Name}}"]);
145
146        let mut volumes = HashSet::new();
147        volumes.insert(VOLUME_TARGET);
148        volumes.insert(VOLUME_CARGO_GIT);
149        volumes.insert(VOLUME_CARGO_REGISTRY);
150        for volume in command.check_output()?.lines() {
151            volumes.take(volume);
152        }
153
154        for volume in &volumes {
155            docker_command(["volume", "create", volume])
156                .wait(format!("Creating volume {volume}"))?;
157        }
158
159        Ok(())
160    }
161
162    fn build(
163        &self,
164        features: Option<&[String]>,
165        config_env_vars: &Environment,
166        build: bool,
167    ) -> Result<()> {
168        let image_name = self.image_name();
169
170        let dockerfile = test_runner_dockerfile();
171        let mut command =
172            prepare_build_command(&image_name, &dockerfile, features, config_env_vars, build);
173        waiting!("Building image {}", image_name);
174        command.check_run()
175    }
176
177    fn start(&self) -> Result<()> {
178        self.ensure_volumes()?;
179        docker_command(["start", &self.container_name()])
180            .wait(format!("Starting container {}", self.container_name()))
181    }
182
183    fn remove(&self) -> Result<()> {
184        if matches!(self.state()?, RunnerState::Missing) {
185            Ok(())
186        } else {
187            docker_command(["rm", "--force", "--volumes", &self.container_name()])
188                .wait(format!("Removing container {}", self.container_name()))
189        }
190    }
191
192    fn unpause(&self) -> Result<()> {
193        docker_command(["unpause", &self.container_name()])
194            .wait(format!("Unpausing container {}", self.container_name()))
195    }
196
197    fn create(&self) -> Result<()> {
198        let network_name = self.network_name().unwrap_or("host");
199
200        let docker_socket = format!("{}:/var/run/docker.sock", DOCKER_SOCKET.display());
201        let docker_args = if self.needs_docker_socket() {
202            vec!["--volume", &docker_socket]
203        } else {
204            vec![]
205        };
206
207        let volumes = self.volumes();
208        let volumes: Vec<_> = volumes
209            .iter()
210            .flat_map(|volume| ["--volume", volume])
211            .collect();
212
213        self.ensure_volumes()?;
214        docker_command(
215            [
216                "create",
217                "--name",
218                &self.container_name(),
219                "--network",
220                network_name,
221                "--hostname",
222                RUNNER_HOSTNAME,
223                "--workdir",
224                MOUNT_PATH,
225                "--volume",
226                &format!("{}:{MOUNT_PATH}", app::path()),
227                "--volume",
228                &format!("{VOLUME_TARGET}:{TARGET_PATH}"),
229                "--volume",
230                &format!("{VOLUME_CARGO_GIT}:/usr/local/cargo/git"),
231                "--volume",
232                &format!("{VOLUME_CARGO_REGISTRY}:/usr/local/cargo/registry"),
233            ]
234            .chain_args(volumes)
235            .chain_args(docker_args)
236            .chain_args([&self.image_name(), "/bin/sleep", "infinity"]),
237        )
238        .wait(format!("Creating container {}", self.container_name()))
239    }
240}
241
242impl<T> TestRunner for T
243where
244    T: ContainerTestRunner,
245{
246    fn test(
247        &self,
248        outer_env: &Environment,
249        config_environment_variables: &Environment,
250        features: Option<&[String]>,
251        args: &[String],
252        build: bool,
253        coverage: bool,
254        coverage_env: Option<&str>,
255    ) -> Result<()> {
256        self.ensure_running(features, config_environment_variables, build)?;
257
258        let mut command = docker_command(["exec"]);
259        if *IS_A_TTY {
260            command.arg("--tty");
261        }
262
263        command.args(["--env", "RUST_BACKTRACE=1"]);
264        command.args(["--env", &format!("CARGO_BUILD_TARGET_DIR={TARGET_PATH}")]);
265        for (key, value) in outer_env {
266            if let Some(value) = value {
267                command.env(key, value);
268            }
269            command.args(["--env", key]);
270        }
271        append_environment_variables(&mut command, config_environment_variables);
272
273        command.arg(self.container_name());
274        command.args(if coverage {
275            COVERAGE_COMMAND
276        } else {
277            TEST_COMMAND
278        });
279        command.args(args);
280        if coverage {
281            let filename = coverage_filename(coverage_env);
282            command.args([
283                "--lcov",
284                "--output-path",
285                &format!("{COVERAGE_OUTPUT_DIR}/{filename}"),
286            ]);
287        }
288
289        command.check_run()
290    }
291}
292
293#[derive(Debug)]
294pub(super) struct IntegrationTestRunner {
295    // The integration is None when compiling the runner image with the `all-integration-tests` feature.
296    integration: Option<String>,
297    needs_docker_socket: bool,
298    network: Option<String>,
299    volumes: Vec<String>,
300}
301
302impl IntegrationTestRunner {
303    pub(super) fn new(
304        integration: Option<String>,
305        config: &IntegrationRunnerConfig,
306        network: Option<String>,
307    ) -> Result<Self> {
308        let mut volumes: Vec<String> = config
309            .volumes
310            .iter()
311            .map(|(a, b)| format!("{a}:{b}"))
312            .collect();
313
314        volumes.push(format!("{VOLUME_TARGET}:/output"));
315
316        // Always mount the coverage directory so the container can be reused
317        // for coverage runs without needing to be recreated.
318        let coverage_dir = std::path::Path::new(app::path()).join(LOCAL_COVERAGE_OUTPUT_DIR);
319        std::fs::create_dir_all(&coverage_dir)?;
320        volumes.push(format!("{}:{COVERAGE_OUTPUT_DIR}", coverage_dir.display()));
321
322        Ok(Self {
323            integration,
324            needs_docker_socket: config.needs_docker_socket,
325            network,
326            volumes,
327        })
328    }
329
330    /// Returns true if this runner uses the shared image (with all features)
331    pub(super) fn is_shared_runner(&self) -> bool {
332        self.integration.is_none()
333    }
334
335    pub(super) fn ensure_network(&self) -> Result<()> {
336        if let Some(network_name) = &self.network {
337            let mut command = docker_command(["network", "ls", "--format", "{{.Name}}"]);
338
339            if command
340                .check_output()?
341                .lines()
342                .any(|network| network == network_name)
343            {
344                return Ok(());
345            }
346
347            docker_command(["network", "create", network_name]).wait("Creating network")
348        } else {
349            Ok(())
350        }
351    }
352
353    pub(super) fn ensure_external_volumes(&self) -> Result<()> {
354        // Get list of existing volumes
355        let mut command = docker_command(["volume", "ls", "--format", "{{.Name}}"]);
356        let existing_volumes: HashSet<String> =
357            command.check_output()?.lines().map(String::from).collect();
358
359        // Extract volume names from self.volumes (format is "volume_name:/mount/path")
360        for volume_spec in &self.volumes {
361            if let Some((volume_name, _)) = volume_spec.split_once(':') {
362                // Only create named volumes (not paths like /host/path)
363                if !volume_name.starts_with('/') && !existing_volumes.contains(volume_name) {
364                    docker_command(["volume", "create", volume_name])
365                        .wait(format!("Creating volume {volume_name}"))?;
366                }
367            }
368        }
369
370        Ok(())
371    }
372}
373
374impl ContainerTestRunner for IntegrationTestRunner {
375    fn network_name(&self) -> Option<&str> {
376        self.network.as_deref()
377    }
378
379    fn container_name(&self) -> String {
380        if let Some(integration) = self.integration.as_ref() {
381            format!(
382                "vector-test-runner-{}-{}",
383                integration,
384                RustToolchainConfig::rust_version()
385            )
386        } else {
387            format!("vector-test-runner-{}", RustToolchainConfig::rust_version())
388        }
389    }
390
391    fn image_name(&self) -> String {
392        format!("{}:latest", self.container_name())
393    }
394
395    fn needs_docker_socket(&self) -> bool {
396        self.needs_docker_socket
397    }
398
399    fn volumes(&self) -> Vec<String> {
400        self.volumes.clone()
401    }
402}
403
404pub struct LocalTestRunner;
405
406impl TestRunner for LocalTestRunner {
407    fn test(
408        &self,
409        outer_env: &Environment,
410        inner_env: &Environment,
411        _features: Option<&[String]>,
412        args: &[String],
413        _build: bool,
414        coverage: bool,
415        coverage_env: Option<&str>,
416    ) -> Result<()> {
417        let test_cmd = if coverage {
418            COVERAGE_COMMAND
419        } else {
420            TEST_COMMAND
421        };
422        let mut command = Command::new(test_cmd[0]);
423        command.args(&test_cmd[1..]);
424        command.args(args);
425        if coverage {
426            std::fs::create_dir_all(LOCAL_COVERAGE_OUTPUT_DIR)?;
427            let filename = coverage_filename(coverage_env);
428            command.args([
429                "--lcov",
430                "--output-path",
431                &format!("{LOCAL_COVERAGE_OUTPUT_DIR}/{filename}"),
432            ]);
433        }
434
435        for (key, value) in outer_env {
436            if let Some(value) = value {
437                command.env(key, value);
438            }
439        }
440        for (key, value) in inner_env {
441            if let Some(value) = value {
442                command.env(key, value);
443            }
444        }
445
446        command.check_run()
447    }
448}