vdev/testing/
runner.rs

1use std::{collections::HashSet, env, path::PathBuf, process::Command};
2
3use anyhow::Result;
4
5use super::config::{IntegrationRunnerConfig, RustToolchainConfig};
6use crate::{
7    app::{self, CommandExt as _},
8    environment::{Environment, append_environment_variables},
9    testing::{
10        build::prepare_build_command,
11        docker::{DOCKER_SOCKET, docker_command},
12    },
13    util::{ChainArgs as _, IS_A_TTY},
14};
15
16const MOUNT_PATH: &str = "/home/vector";
17const TARGET_PATH: &str = "/home/target";
18const VOLUME_TARGET: &str = "vector_target";
19const VOLUME_CARGO_GIT: &str = "vector_cargo_git";
20const VOLUME_CARGO_REGISTRY: &str = "vector_cargo_registry";
21const RUNNER_HOSTNAME: &str = "runner";
22const TEST_COMMAND: &[&str] = &[
23    "cargo",
24    "nextest",
25    "run",
26    "--no-fail-fast",
27    "--no-default-features",
28];
29// The upstream container we publish artifacts to on a successful master build.
30const UPSTREAM_IMAGE: &str = "docker.io/timberio/vector-dev:latest";
31
32pub enum RunnerState {
33    Running,
34    Restarting,
35    Created,
36    Exited,
37    Paused,
38    Dead,
39    Missing,
40    Unknown,
41}
42
43pub fn get_agent_test_runner(container: bool) -> Result<Box<dyn TestRunner>> {
44    if container {
45        Ok(Box::new(DockerTestRunner))
46    } else {
47        Ok(Box::new(LocalTestRunner))
48    }
49}
50
51pub trait TestRunner {
52    fn test(
53        &self,
54        outer_env: &Environment,
55        inner_env: &Environment,
56        features: Option<&[String]>,
57        args: &[String],
58        directory: &str,
59    ) -> Result<()>;
60}
61
62pub trait ContainerTestRunner: TestRunner {
63    fn container_name(&self) -> String;
64
65    fn image_name(&self) -> String;
66
67    fn network_name(&self) -> Option<&str>;
68
69    fn needs_docker_socket(&self) -> bool;
70
71    fn volumes(&self) -> Vec<String>;
72
73    fn state(&self) -> Result<RunnerState> {
74        let mut command = docker_command(["ps", "-a", "--format", "{{.Names}} {{.State}}"]);
75        let container_name = self.container_name();
76
77        for line in command.check_output()?.lines() {
78            if let Some((name, state)) = line.split_once(' ')
79                && name == container_name
80            {
81                return Ok(if state == "created" {
82                    RunnerState::Created
83                } else if state == "dead" {
84                    RunnerState::Dead
85                } else if state == "exited" || state.starts_with("Exited ") {
86                    RunnerState::Exited
87                } else if state == "paused" {
88                    RunnerState::Paused
89                } else if state == "restarting" {
90                    RunnerState::Restarting
91                } else if state == "running" || state.starts_with("Up ") {
92                    RunnerState::Running
93                } else {
94                    RunnerState::Unknown
95                });
96            }
97        }
98
99        Ok(RunnerState::Missing)
100    }
101
102    fn ensure_running(
103        &self,
104        features: Option<&[String]>,
105        directory: &str,
106        config_environment_variables: &Environment,
107    ) -> Result<()> {
108        match self.state()? {
109            RunnerState::Running | RunnerState::Restarting => (),
110            RunnerState::Created | RunnerState::Exited => self.start()?,
111            RunnerState::Paused => self.unpause()?,
112            RunnerState::Dead | RunnerState::Unknown => {
113                self.remove()?;
114                self.create()?;
115                self.start()?;
116            }
117            RunnerState::Missing => {
118                self.build(features, directory, config_environment_variables)?;
119                self.ensure_volumes()?;
120                self.create()?;
121                self.start()?;
122            }
123        }
124
125        Ok(())
126    }
127
128    fn ensure_volumes(&self) -> Result<()> {
129        let mut command = docker_command(["volume", "ls", "--format", "{{.Name}}"]);
130
131        let mut volumes = HashSet::new();
132        volumes.insert(VOLUME_TARGET);
133        volumes.insert(VOLUME_CARGO_GIT);
134        volumes.insert(VOLUME_CARGO_REGISTRY);
135        for volume in command.check_output()?.lines() {
136            volumes.take(volume);
137        }
138
139        for volume in &volumes {
140            docker_command(["volume", "create", volume])
141                .wait(format!("Creating volume {volume}"))?;
142        }
143
144        Ok(())
145    }
146
147    fn build(
148        &self,
149        features: Option<&[String]>,
150        directory: &str,
151        config_env_vars: &Environment,
152    ) -> Result<()> {
153        let dockerfile: PathBuf = [app::path(), "scripts", directory, "Dockerfile"]
154            .iter()
155            .collect();
156
157        let mut command =
158            prepare_build_command(&self.image_name(), &dockerfile, features, config_env_vars);
159        waiting!("Building image {}", self.image_name());
160        command.check_run()
161    }
162
163    fn start(&self) -> Result<()> {
164        docker_command(["start", &self.container_name()])
165            .wait(format!("Starting container {}", self.container_name()))
166    }
167
168    fn remove(&self) -> Result<()> {
169        if matches!(self.state()?, RunnerState::Missing) {
170            Ok(())
171        } else {
172            docker_command(["rm", "--force", "--volumes", &self.container_name()])
173                .wait(format!("Removing container {}", self.container_name()))
174        }
175    }
176
177    fn unpause(&self) -> Result<()> {
178        docker_command(["unpause", &self.container_name()])
179            .wait(format!("Unpausing container {}", self.container_name()))
180    }
181
182    fn create(&self) -> Result<()> {
183        let network_name = self.network_name().unwrap_or("host");
184
185        let docker_socket = format!("{}:/var/run/docker.sock", DOCKER_SOCKET.display());
186        let docker_args = if self.needs_docker_socket() {
187            vec!["--volume", &docker_socket]
188        } else {
189            vec![]
190        };
191
192        let volumes = self.volumes();
193        let volumes: Vec<_> = volumes
194            .iter()
195            .flat_map(|volume| ["--volume", volume])
196            .collect();
197
198        docker_command(
199            [
200                "create",
201                "--name",
202                &self.container_name(),
203                "--network",
204                network_name,
205                "--hostname",
206                RUNNER_HOSTNAME,
207                "--workdir",
208                MOUNT_PATH,
209                "--volume",
210                &format!("{}:{MOUNT_PATH}", app::path()),
211                "--volume",
212                &format!("{VOLUME_TARGET}:{TARGET_PATH}"),
213                "--volume",
214                &format!("{VOLUME_CARGO_GIT}:/usr/local/cargo/git"),
215                "--volume",
216                &format!("{VOLUME_CARGO_REGISTRY}:/usr/local/cargo/registry"),
217            ]
218            .chain_args(volumes)
219            .chain_args(docker_args)
220            .chain_args([&self.image_name(), "/bin/sleep", "infinity"]),
221        )
222        .wait(format!("Creating container {}", self.container_name()))
223    }
224}
225
226impl<T> TestRunner for T
227where
228    T: ContainerTestRunner,
229{
230    fn test(
231        &self,
232        outer_env: &Environment,
233        config_environment_variables: &Environment,
234        features: Option<&[String]>,
235        args: &[String],
236        directory: &str,
237    ) -> Result<()> {
238        self.ensure_running(features, directory, config_environment_variables)?;
239
240        let mut command = docker_command(["exec"]);
241        if *IS_A_TTY {
242            command.arg("--tty");
243        }
244
245        command.args(["--env", "RUST_BACKTRACE=1"]);
246        command.args(["--env", &format!("CARGO_BUILD_TARGET_DIR={TARGET_PATH}")]);
247        for (key, value) in outer_env {
248            if let Some(value) = value {
249                command.env(key, value);
250            }
251            command.args(["--env", key]);
252        }
253        append_environment_variables(&mut command, config_environment_variables);
254
255        command.arg(self.container_name());
256        command.args(TEST_COMMAND);
257        command.args(args);
258
259        command.check_run()
260    }
261}
262
263#[derive(Debug)]
264pub(super) struct IntegrationTestRunner {
265    // The integration is None when compiling the runner image with the `all-integration-tests` feature.
266    integration: Option<String>,
267    needs_docker_socket: bool,
268    network: Option<String>,
269    volumes: Vec<String>,
270}
271
272impl IntegrationTestRunner {
273    pub(super) fn new(
274        integration: Option<String>,
275        config: &IntegrationRunnerConfig,
276        network: Option<String>,
277    ) -> Result<Self> {
278        Ok(Self {
279            integration,
280            needs_docker_socket: config.needs_docker_socket,
281            network,
282            volumes: config
283                .volumes
284                .iter()
285                .map(|(a, b)| format!("{a}:{b}"))
286                .collect(),
287        })
288    }
289
290    pub(super) fn ensure_network(&self) -> Result<()> {
291        if let Some(network_name) = &self.network {
292            let mut command = docker_command(["network", "ls", "--format", "{{.Name}}"]);
293
294            if command
295                .check_output()?
296                .lines()
297                .any(|network| network == network_name)
298            {
299                return Ok(());
300            }
301
302            docker_command(["network", "create", network_name]).wait("Creating network")
303        } else {
304            Ok(())
305        }
306    }
307}
308
309impl ContainerTestRunner for IntegrationTestRunner {
310    fn network_name(&self) -> Option<&str> {
311        self.network.as_deref()
312    }
313
314    fn container_name(&self) -> String {
315        if let Some(integration) = self.integration.as_ref() {
316            format!(
317                "vector-test-runner-{}-{}",
318                integration,
319                RustToolchainConfig::rust_version()
320            )
321        } else {
322            format!("vector-test-runner-{}", RustToolchainConfig::rust_version())
323        }
324    }
325
326    fn image_name(&self) -> String {
327        format!("{}:latest", self.container_name())
328    }
329
330    fn needs_docker_socket(&self) -> bool {
331        self.needs_docker_socket
332    }
333
334    fn volumes(&self) -> Vec<String> {
335        self.volumes.clone()
336    }
337}
338
339pub struct DockerTestRunner;
340
341impl ContainerTestRunner for DockerTestRunner {
342    fn network_name(&self) -> Option<&str> {
343        None
344    }
345
346    fn container_name(&self) -> String {
347        format!("vector-test-runner-{}", RustToolchainConfig::rust_version())
348    }
349
350    fn image_name(&self) -> String {
351        env::var("ENVIRONMENT_UPSTREAM").unwrap_or_else(|_| UPSTREAM_IMAGE.to_string())
352    }
353
354    fn needs_docker_socket(&self) -> bool {
355        false
356    }
357
358    fn volumes(&self) -> Vec<String> {
359        Vec::default()
360    }
361}
362
363pub struct LocalTestRunner;
364
365impl TestRunner for LocalTestRunner {
366    fn test(
367        &self,
368        outer_env: &Environment,
369        inner_env: &Environment,
370        _features: Option<&[String]>,
371        args: &[String],
372        _directory: &str,
373    ) -> Result<()> {
374        let mut command = Command::new(TEST_COMMAND[0]);
375        command.args(&TEST_COMMAND[1..]);
376        command.args(args);
377
378        for (key, value) in outer_env {
379            if let Some(value) = value {
380                command.env(key, value);
381            }
382        }
383        for (key, value) in inner_env {
384            if let Some(value) = value {
385                command.env(key, value);
386            }
387        }
388
389        command.check_run()
390    }
391}