1use anyhow::Result;
2use std::{collections::HashSet, process::Command};
3
4use super::config::{IntegrationRunnerConfig, RustToolchainConfig};
5use crate::{
6 app::{self, CommandExt as _},
7 testing::{
8 build::prepare_build_command,
9 docker::{DOCKER_SOCKET, docker_command},
10 test_runner_dockerfile,
11 },
12 utils::{
13 IS_A_TTY,
14 command::ChainArgs as _,
15 environment::{Environment, append_environment_variables},
16 },
17};
18
19const MOUNT_PATH: &str = "/home/vector";
20const TARGET_PATH: &str = "/home/target";
21const VOLUME_TARGET: &str = "vector_target";
22const VOLUME_CARGO_GIT: &str = "vector_cargo_git";
23const VOLUME_CARGO_REGISTRY: &str = "vector_cargo_registry";
24const RUNNER_HOSTNAME: &str = "runner";
25const TEST_COMMAND: &[&str] = &[
26 "cargo",
27 "nextest",
28 "run",
29 "--no-fail-fast",
30 "--no-default-features",
31];
32const COVERAGE_COMMAND: &[&str] = &[
33 "cargo",
34 "llvm-cov",
35 "nextest",
36 "--workspace",
37 "--no-fail-fast",
38 "--no-default-features",
39];
40const COVERAGE_OUTPUT_DIR: &str = "/coverage";
42pub(crate) const LOCAL_COVERAGE_OUTPUT_DIR: &str = "target/coverage";
44
45pub enum RunnerState {
46 Running,
47 Restarting,
48 Created,
49 Exited,
50 Paused,
51 Dead,
52 Missing,
53 Unknown,
54}
55
56pub trait TestRunner {
57 #[allow(clippy::too_many_arguments)]
58 fn test(
59 &self,
60 outer_env: &Environment,
61 inner_env: &Environment,
62 features: Option<&[String]>,
63 args: &[String],
64 build: bool,
65 coverage: bool,
66 coverage_env: Option<&str>,
67 ) -> Result<()>;
68}
69
70pub(crate) fn coverage_filename(coverage_env: Option<&str>) -> String {
72 match coverage_env {
73 Some(env) => format!("lcov-{env}.info"),
74 None => "lcov.info".to_string(),
75 }
76}
77
78pub trait ContainerTestRunner: TestRunner {
79 fn container_name(&self) -> String;
80
81 fn image_name(&self) -> String;
82
83 fn network_name(&self) -> Option<&str>;
84
85 fn needs_docker_socket(&self) -> bool;
86
87 fn volumes(&self) -> Vec<String>;
88
89 fn state(&self) -> Result<RunnerState> {
90 let mut command = docker_command(["ps", "-a", "--format", "{{.Names}} {{.State}}"]);
91 let container_name = self.container_name();
92
93 for line in command.check_output()?.lines() {
94 if let Some((name, state)) = line.split_once(' ')
95 && name == container_name
96 {
97 return Ok(if state == "created" {
98 RunnerState::Created
99 } else if state == "dead" {
100 RunnerState::Dead
101 } else if state == "exited" || state.starts_with("Exited ") {
102 RunnerState::Exited
103 } else if state == "paused" {
104 RunnerState::Paused
105 } else if state == "restarting" {
106 RunnerState::Restarting
107 } else if state == "running" || state.starts_with("Up ") {
108 RunnerState::Running
109 } else {
110 RunnerState::Unknown
111 });
112 }
113 }
114
115 Ok(RunnerState::Missing)
116 }
117
118 fn ensure_running(
119 &self,
120 features: Option<&[String]>,
121 config_environment_variables: &Environment,
122 build: bool,
123 ) -> Result<()> {
124 match self.state()? {
125 RunnerState::Running | RunnerState::Restarting => (),
126 RunnerState::Created | RunnerState::Exited => self.start()?,
127 RunnerState::Paused => self.unpause()?,
128 RunnerState::Dead | RunnerState::Unknown => {
129 self.remove()?;
130 self.create()?;
131 self.start()?;
132 }
133 RunnerState::Missing => {
134 self.build(features, config_environment_variables, build)?;
135 self.create()?;
136 self.start()?;
137 }
138 }
139
140 Ok(())
141 }
142
143 fn ensure_volumes(&self) -> Result<()> {
144 let mut command = docker_command(["volume", "ls", "--format", "{{.Name}}"]);
145
146 let mut volumes = HashSet::new();
147 volumes.insert(VOLUME_TARGET);
148 volumes.insert(VOLUME_CARGO_GIT);
149 volumes.insert(VOLUME_CARGO_REGISTRY);
150 for volume in command.check_output()?.lines() {
151 volumes.take(volume);
152 }
153
154 for volume in &volumes {
155 docker_command(["volume", "create", volume])
156 .wait(format!("Creating volume {volume}"))?;
157 }
158
159 Ok(())
160 }
161
162 fn build(
163 &self,
164 features: Option<&[String]>,
165 config_env_vars: &Environment,
166 build: bool,
167 ) -> Result<()> {
168 let image_name = self.image_name();
169
170 let dockerfile = test_runner_dockerfile();
171 let mut command =
172 prepare_build_command(&image_name, &dockerfile, features, config_env_vars, build);
173 waiting!("Building image {}", image_name);
174 command.check_run()
175 }
176
177 fn start(&self) -> Result<()> {
178 self.ensure_volumes()?;
179 docker_command(["start", &self.container_name()])
180 .wait(format!("Starting container {}", self.container_name()))
181 }
182
183 fn remove(&self) -> Result<()> {
184 if matches!(self.state()?, RunnerState::Missing) {
185 Ok(())
186 } else {
187 docker_command(["rm", "--force", "--volumes", &self.container_name()])
188 .wait(format!("Removing container {}", self.container_name()))
189 }
190 }
191
192 fn unpause(&self) -> Result<()> {
193 docker_command(["unpause", &self.container_name()])
194 .wait(format!("Unpausing container {}", self.container_name()))
195 }
196
197 fn create(&self) -> Result<()> {
198 let network_name = self.network_name().unwrap_or("host");
199
200 let docker_socket = format!("{}:/var/run/docker.sock", DOCKER_SOCKET.display());
201 let docker_args = if self.needs_docker_socket() {
202 vec!["--volume", &docker_socket]
203 } else {
204 vec![]
205 };
206
207 let volumes = self.volumes();
208 let volumes: Vec<_> = volumes
209 .iter()
210 .flat_map(|volume| ["--volume", volume])
211 .collect();
212
213 self.ensure_volumes()?;
214 docker_command(
215 [
216 "create",
217 "--name",
218 &self.container_name(),
219 "--network",
220 network_name,
221 "--hostname",
222 RUNNER_HOSTNAME,
223 "--workdir",
224 MOUNT_PATH,
225 "--volume",
226 &format!("{}:{MOUNT_PATH}", app::path()),
227 "--volume",
228 &format!("{VOLUME_TARGET}:{TARGET_PATH}"),
229 "--volume",
230 &format!("{VOLUME_CARGO_GIT}:/usr/local/cargo/git"),
231 "--volume",
232 &format!("{VOLUME_CARGO_REGISTRY}:/usr/local/cargo/registry"),
233 ]
234 .chain_args(volumes)
235 .chain_args(docker_args)
236 .chain_args([&self.image_name(), "/bin/sleep", "infinity"]),
237 )
238 .wait(format!("Creating container {}", self.container_name()))
239 }
240}
241
242impl<T> TestRunner for T
243where
244 T: ContainerTestRunner,
245{
246 fn test(
247 &self,
248 outer_env: &Environment,
249 config_environment_variables: &Environment,
250 features: Option<&[String]>,
251 args: &[String],
252 build: bool,
253 coverage: bool,
254 coverage_env: Option<&str>,
255 ) -> Result<()> {
256 self.ensure_running(features, config_environment_variables, build)?;
257
258 let mut command = docker_command(["exec"]);
259 if *IS_A_TTY {
260 command.arg("--tty");
261 }
262
263 command.args(["--env", "RUST_BACKTRACE=1"]);
264 command.args(["--env", &format!("CARGO_BUILD_TARGET_DIR={TARGET_PATH}")]);
265 for (key, value) in outer_env {
266 if let Some(value) = value {
267 command.env(key, value);
268 }
269 command.args(["--env", key]);
270 }
271 append_environment_variables(&mut command, config_environment_variables);
272
273 command.arg(self.container_name());
274 command.args(if coverage {
275 COVERAGE_COMMAND
276 } else {
277 TEST_COMMAND
278 });
279 command.args(args);
280 if coverage {
281 let filename = coverage_filename(coverage_env);
282 command.args([
283 "--lcov",
284 "--output-path",
285 &format!("{COVERAGE_OUTPUT_DIR}/{filename}"),
286 ]);
287 }
288
289 command.check_run()
290 }
291}
292
293#[derive(Debug)]
294pub(super) struct IntegrationTestRunner {
295 integration: Option<String>,
297 needs_docker_socket: bool,
298 network: Option<String>,
299 volumes: Vec<String>,
300}
301
302impl IntegrationTestRunner {
303 pub(super) fn new(
304 integration: Option<String>,
305 config: &IntegrationRunnerConfig,
306 network: Option<String>,
307 ) -> Result<Self> {
308 let mut volumes: Vec<String> = config
309 .volumes
310 .iter()
311 .map(|(a, b)| format!("{a}:{b}"))
312 .collect();
313
314 volumes.push(format!("{VOLUME_TARGET}:/output"));
315
316 let coverage_dir = std::path::Path::new(app::path()).join(LOCAL_COVERAGE_OUTPUT_DIR);
319 std::fs::create_dir_all(&coverage_dir)?;
320 volumes.push(format!("{}:{COVERAGE_OUTPUT_DIR}", coverage_dir.display()));
321
322 Ok(Self {
323 integration,
324 needs_docker_socket: config.needs_docker_socket,
325 network,
326 volumes,
327 })
328 }
329
330 pub(super) fn is_shared_runner(&self) -> bool {
332 self.integration.is_none()
333 }
334
335 pub(super) fn ensure_network(&self) -> Result<()> {
336 if let Some(network_name) = &self.network {
337 let mut command = docker_command(["network", "ls", "--format", "{{.Name}}"]);
338
339 if command
340 .check_output()?
341 .lines()
342 .any(|network| network == network_name)
343 {
344 return Ok(());
345 }
346
347 docker_command(["network", "create", network_name]).wait("Creating network")
348 } else {
349 Ok(())
350 }
351 }
352
353 pub(super) fn ensure_external_volumes(&self) -> Result<()> {
354 let mut command = docker_command(["volume", "ls", "--format", "{{.Name}}"]);
356 let existing_volumes: HashSet<String> =
357 command.check_output()?.lines().map(String::from).collect();
358
359 for volume_spec in &self.volumes {
361 if let Some((volume_name, _)) = volume_spec.split_once(':') {
362 if !volume_name.starts_with('/') && !existing_volumes.contains(volume_name) {
364 docker_command(["volume", "create", volume_name])
365 .wait(format!("Creating volume {volume_name}"))?;
366 }
367 }
368 }
369
370 Ok(())
371 }
372}
373
374impl ContainerTestRunner for IntegrationTestRunner {
375 fn network_name(&self) -> Option<&str> {
376 self.network.as_deref()
377 }
378
379 fn container_name(&self) -> String {
380 if let Some(integration) = self.integration.as_ref() {
381 format!(
382 "vector-test-runner-{}-{}",
383 integration,
384 RustToolchainConfig::rust_version()
385 )
386 } else {
387 format!("vector-test-runner-{}", RustToolchainConfig::rust_version())
388 }
389 }
390
391 fn image_name(&self) -> String {
392 format!("{}:latest", self.container_name())
393 }
394
395 fn needs_docker_socket(&self) -> bool {
396 self.needs_docker_socket
397 }
398
399 fn volumes(&self) -> Vec<String> {
400 self.volumes.clone()
401 }
402}
403
404pub struct LocalTestRunner;
405
406impl TestRunner for LocalTestRunner {
407 fn test(
408 &self,
409 outer_env: &Environment,
410 inner_env: &Environment,
411 _features: Option<&[String]>,
412 args: &[String],
413 _build: bool,
414 coverage: bool,
415 coverage_env: Option<&str>,
416 ) -> Result<()> {
417 let test_cmd = if coverage {
418 COVERAGE_COMMAND
419 } else {
420 TEST_COMMAND
421 };
422 let mut command = Command::new(test_cmd[0]);
423 command.args(&test_cmd[1..]);
424 command.args(args);
425 if coverage {
426 std::fs::create_dir_all(LOCAL_COVERAGE_OUTPUT_DIR)?;
427 let filename = coverage_filename(coverage_env);
428 command.args([
429 "--lcov",
430 "--output-path",
431 &format!("{LOCAL_COVERAGE_OUTPUT_DIR}/{filename}"),
432 ]);
433 }
434
435 for (key, value) in outer_env {
436 if let Some(value) = value {
437 command.env(key, value);
438 }
439 }
440 for (key, value) in inner_env {
441 if let Some(value) = value {
442 command.env(key, value);
443 }
444 }
445
446 command.check_run()
447 }
448}