1use std::{collections::HashSet, env, path::PathBuf, process::Command};
2
3use anyhow::Result;
4
5use super::config::{IntegrationRunnerConfig, RustToolchainConfig};
6use crate::{
7 app::{self, CommandExt as _},
8 environment::{Environment, append_environment_variables},
9 testing::{
10 build::prepare_build_command,
11 docker::{DOCKER_SOCKET, docker_command},
12 },
13 util::{ChainArgs as _, IS_A_TTY},
14};
15
16const MOUNT_PATH: &str = "/home/vector";
17const TARGET_PATH: &str = "/home/target";
18const VOLUME_TARGET: &str = "vector_target";
19const VOLUME_CARGO_GIT: &str = "vector_cargo_git";
20const VOLUME_CARGO_REGISTRY: &str = "vector_cargo_registry";
21const RUNNER_HOSTNAME: &str = "runner";
22const TEST_COMMAND: &[&str] = &[
23 "cargo",
24 "nextest",
25 "run",
26 "--no-fail-fast",
27 "--no-default-features",
28];
29const UPSTREAM_IMAGE: &str = "docker.io/timberio/vector-dev:latest";
31
32pub enum RunnerState {
33 Running,
34 Restarting,
35 Created,
36 Exited,
37 Paused,
38 Dead,
39 Missing,
40 Unknown,
41}
42
43pub fn get_agent_test_runner(container: bool) -> Result<Box<dyn TestRunner>> {
44 if container {
45 Ok(Box::new(DockerTestRunner))
46 } else {
47 Ok(Box::new(LocalTestRunner))
48 }
49}
50
51pub trait TestRunner {
52 fn test(
53 &self,
54 outer_env: &Environment,
55 inner_env: &Environment,
56 features: Option<&[String]>,
57 args: &[String],
58 directory: &str,
59 ) -> Result<()>;
60}
61
62pub trait ContainerTestRunner: TestRunner {
63 fn container_name(&self) -> String;
64
65 fn image_name(&self) -> String;
66
67 fn network_name(&self) -> Option<&str>;
68
69 fn needs_docker_socket(&self) -> bool;
70
71 fn volumes(&self) -> Vec<String>;
72
73 fn state(&self) -> Result<RunnerState> {
74 let mut command = docker_command(["ps", "-a", "--format", "{{.Names}} {{.State}}"]);
75 let container_name = self.container_name();
76
77 for line in command.check_output()?.lines() {
78 if let Some((name, state)) = line.split_once(' ')
79 && name == container_name
80 {
81 return Ok(if state == "created" {
82 RunnerState::Created
83 } else if state == "dead" {
84 RunnerState::Dead
85 } else if state == "exited" || state.starts_with("Exited ") {
86 RunnerState::Exited
87 } else if state == "paused" {
88 RunnerState::Paused
89 } else if state == "restarting" {
90 RunnerState::Restarting
91 } else if state == "running" || state.starts_with("Up ") {
92 RunnerState::Running
93 } else {
94 RunnerState::Unknown
95 });
96 }
97 }
98
99 Ok(RunnerState::Missing)
100 }
101
102 fn ensure_running(
103 &self,
104 features: Option<&[String]>,
105 directory: &str,
106 config_environment_variables: &Environment,
107 ) -> Result<()> {
108 match self.state()? {
109 RunnerState::Running | RunnerState::Restarting => (),
110 RunnerState::Created | RunnerState::Exited => self.start()?,
111 RunnerState::Paused => self.unpause()?,
112 RunnerState::Dead | RunnerState::Unknown => {
113 self.remove()?;
114 self.create()?;
115 self.start()?;
116 }
117 RunnerState::Missing => {
118 self.build(features, directory, config_environment_variables)?;
119 self.ensure_volumes()?;
120 self.create()?;
121 self.start()?;
122 }
123 }
124
125 Ok(())
126 }
127
128 fn ensure_volumes(&self) -> Result<()> {
129 let mut command = docker_command(["volume", "ls", "--format", "{{.Name}}"]);
130
131 let mut volumes = HashSet::new();
132 volumes.insert(VOLUME_TARGET);
133 volumes.insert(VOLUME_CARGO_GIT);
134 volumes.insert(VOLUME_CARGO_REGISTRY);
135 for volume in command.check_output()?.lines() {
136 volumes.take(volume);
137 }
138
139 for volume in &volumes {
140 docker_command(["volume", "create", volume])
141 .wait(format!("Creating volume {volume}"))?;
142 }
143
144 Ok(())
145 }
146
147 fn build(
148 &self,
149 features: Option<&[String]>,
150 directory: &str,
151 config_env_vars: &Environment,
152 ) -> Result<()> {
153 let dockerfile: PathBuf = [app::path(), "scripts", directory, "Dockerfile"]
154 .iter()
155 .collect();
156
157 let mut command =
158 prepare_build_command(&self.image_name(), &dockerfile, features, config_env_vars);
159 waiting!("Building image {}", self.image_name());
160 command.check_run()
161 }
162
163 fn start(&self) -> Result<()> {
164 docker_command(["start", &self.container_name()])
165 .wait(format!("Starting container {}", self.container_name()))
166 }
167
168 fn remove(&self) -> Result<()> {
169 if matches!(self.state()?, RunnerState::Missing) {
170 Ok(())
171 } else {
172 docker_command(["rm", "--force", "--volumes", &self.container_name()])
173 .wait(format!("Removing container {}", self.container_name()))
174 }
175 }
176
177 fn unpause(&self) -> Result<()> {
178 docker_command(["unpause", &self.container_name()])
179 .wait(format!("Unpausing container {}", self.container_name()))
180 }
181
182 fn create(&self) -> Result<()> {
183 let network_name = self.network_name().unwrap_or("host");
184
185 let docker_socket = format!("{}:/var/run/docker.sock", DOCKER_SOCKET.display());
186 let docker_args = if self.needs_docker_socket() {
187 vec!["--volume", &docker_socket]
188 } else {
189 vec![]
190 };
191
192 let volumes = self.volumes();
193 let volumes: Vec<_> = volumes
194 .iter()
195 .flat_map(|volume| ["--volume", volume])
196 .collect();
197
198 docker_command(
199 [
200 "create",
201 "--name",
202 &self.container_name(),
203 "--network",
204 network_name,
205 "--hostname",
206 RUNNER_HOSTNAME,
207 "--workdir",
208 MOUNT_PATH,
209 "--volume",
210 &format!("{}:{MOUNT_PATH}", app::path()),
211 "--volume",
212 &format!("{VOLUME_TARGET}:{TARGET_PATH}"),
213 "--volume",
214 &format!("{VOLUME_CARGO_GIT}:/usr/local/cargo/git"),
215 "--volume",
216 &format!("{VOLUME_CARGO_REGISTRY}:/usr/local/cargo/registry"),
217 ]
218 .chain_args(volumes)
219 .chain_args(docker_args)
220 .chain_args([&self.image_name(), "/bin/sleep", "infinity"]),
221 )
222 .wait(format!("Creating container {}", self.container_name()))
223 }
224}
225
226impl<T> TestRunner for T
227where
228 T: ContainerTestRunner,
229{
230 fn test(
231 &self,
232 outer_env: &Environment,
233 config_environment_variables: &Environment,
234 features: Option<&[String]>,
235 args: &[String],
236 directory: &str,
237 ) -> Result<()> {
238 self.ensure_running(features, directory, config_environment_variables)?;
239
240 let mut command = docker_command(["exec"]);
241 if *IS_A_TTY {
242 command.arg("--tty");
243 }
244
245 command.args(["--env", "RUST_BACKTRACE=1"]);
246 command.args(["--env", &format!("CARGO_BUILD_TARGET_DIR={TARGET_PATH}")]);
247 for (key, value) in outer_env {
248 if let Some(value) = value {
249 command.env(key, value);
250 }
251 command.args(["--env", key]);
252 }
253 append_environment_variables(&mut command, config_environment_variables);
254
255 command.arg(self.container_name());
256 command.args(TEST_COMMAND);
257 command.args(args);
258
259 command.check_run()
260 }
261}
262
263#[derive(Debug)]
264pub(super) struct IntegrationTestRunner {
265 integration: Option<String>,
267 needs_docker_socket: bool,
268 network: Option<String>,
269 volumes: Vec<String>,
270}
271
272impl IntegrationTestRunner {
273 pub(super) fn new(
274 integration: Option<String>,
275 config: &IntegrationRunnerConfig,
276 network: Option<String>,
277 ) -> Result<Self> {
278 Ok(Self {
279 integration,
280 needs_docker_socket: config.needs_docker_socket,
281 network,
282 volumes: config
283 .volumes
284 .iter()
285 .map(|(a, b)| format!("{a}:{b}"))
286 .collect(),
287 })
288 }
289
290 pub(super) fn ensure_network(&self) -> Result<()> {
291 if let Some(network_name) = &self.network {
292 let mut command = docker_command(["network", "ls", "--format", "{{.Name}}"]);
293
294 if command
295 .check_output()?
296 .lines()
297 .any(|network| network == network_name)
298 {
299 return Ok(());
300 }
301
302 docker_command(["network", "create", network_name]).wait("Creating network")
303 } else {
304 Ok(())
305 }
306 }
307}
308
309impl ContainerTestRunner for IntegrationTestRunner {
310 fn network_name(&self) -> Option<&str> {
311 self.network.as_deref()
312 }
313
314 fn container_name(&self) -> String {
315 if let Some(integration) = self.integration.as_ref() {
316 format!(
317 "vector-test-runner-{}-{}",
318 integration,
319 RustToolchainConfig::rust_version()
320 )
321 } else {
322 format!("vector-test-runner-{}", RustToolchainConfig::rust_version())
323 }
324 }
325
326 fn image_name(&self) -> String {
327 format!("{}:latest", self.container_name())
328 }
329
330 fn needs_docker_socket(&self) -> bool {
331 self.needs_docker_socket
332 }
333
334 fn volumes(&self) -> Vec<String> {
335 self.volumes.clone()
336 }
337}
338
339pub struct DockerTestRunner;
340
341impl ContainerTestRunner for DockerTestRunner {
342 fn network_name(&self) -> Option<&str> {
343 None
344 }
345
346 fn container_name(&self) -> String {
347 format!("vector-test-runner-{}", RustToolchainConfig::rust_version())
348 }
349
350 fn image_name(&self) -> String {
351 env::var("ENVIRONMENT_UPSTREAM").unwrap_or_else(|_| UPSTREAM_IMAGE.to_string())
352 }
353
354 fn needs_docker_socket(&self) -> bool {
355 false
356 }
357
358 fn volumes(&self) -> Vec<String> {
359 Vec::default()
360 }
361}
362
363pub struct LocalTestRunner;
364
365impl TestRunner for LocalTestRunner {
366 fn test(
367 &self,
368 outer_env: &Environment,
369 inner_env: &Environment,
370 _features: Option<&[String]>,
371 args: &[String],
372 _directory: &str,
373 ) -> Result<()> {
374 let mut command = Command::new(TEST_COMMAND[0]);
375 command.args(&TEST_COMMAND[1..]);
376 command.args(args);
377
378 for (key, value) in outer_env {
379 if let Some(value) = value {
380 command.env(key, value);
381 }
382 }
383 for (key, value) in inner_env {
384 if let Some(value) = value {
385 command.env(key, value);
386 }
387 }
388
389 command.check_run()
390 }
391}