1use std::{
2 path::{Path, PathBuf},
3 process::Command,
4};
5
6use anyhow::{Context, Result, bail};
7
8use super::{
9 config::{
10 ComposeConfig, ComposeTestConfig, E2E_TESTS_DIR, INTEGRATION_TESTS_DIR, RustToolchainConfig,
11 },
12 runner::{ContainerTestRunner as _, IntegrationTestRunner, TestRunner as _},
13};
14use crate::{
15 app::CommandExt as _,
16 testing::{
17 build::ALL_INTEGRATIONS_FEATURE_FLAG,
18 docker::{CONTAINER_TOOL, DOCKER_SOCKET},
19 },
20 utils::environment::{Environment, extract_present, rename_environment_keys},
21};
22
23const NETWORK_ENV_VAR: &str = "VECTOR_NETWORK";
24const E2E_FEATURE_FLAG: &str = "all-e2e-tests";
25
26fn docker_image_exists(image_name: &str) -> Result<bool> {
28 use crate::testing::docker::docker_command;
29 let output =
30 docker_command(["images", "--format", "{{.Repository}}:{{.Tag}}"]).check_output()?;
31 Ok(output.lines().any(|line| line == image_name))
32}
33
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35pub(crate) enum ComposeTestKind {
36 E2E,
37 Integration,
38}
39
40#[derive(Clone, Copy, Debug)]
41pub(crate) struct ComposeTestLocalConfig {
42 pub(crate) kind: ComposeTestKind,
43 pub(crate) directory: &'static str,
44 pub(crate) feature_flag: &'static str,
45}
46
47impl ComposeTestLocalConfig {
48 pub(crate) fn integration() -> Self {
51 Self {
52 kind: ComposeTestKind::Integration,
53 directory: INTEGRATION_TESTS_DIR,
54 feature_flag: ALL_INTEGRATIONS_FEATURE_FLAG,
55 }
56 }
57
58 pub(crate) fn e2e() -> Self {
61 Self {
62 kind: ComposeTestKind::E2E,
63 directory: E2E_TESTS_DIR,
64 feature_flag: E2E_FEATURE_FLAG,
65 }
66 }
67}
68
69#[derive(Debug)]
70pub(crate) struct ComposeTest {
71 local_config: ComposeTestLocalConfig,
72 test_name: String,
73 environment: String,
74 config: ComposeTestConfig,
75 runner: IntegrationTestRunner,
76 compose: Option<Compose>,
77 env_config: Environment,
78 retries: u8,
79 coverage: bool,
80}
81
82impl ComposeTest {
83 pub(crate) fn generate(
84 local_config: ComposeTestLocalConfig,
85 test_name: impl Into<String>,
86 environment: impl Into<String>,
87 retries: u8,
88 coverage: bool,
89 ) -> Result<ComposeTest> {
90 let test_name: String = test_name.into();
91 let environment = environment.into();
92 let (test_dir, config) = ComposeTestConfig::load(local_config.directory, &test_name)?;
93 let Some(mut env_config) = config.environments().get(&environment).cloned() else {
94 bail!("Could not find environment named {environment:?}");
95 };
96
97 let network_name = format!("vector-integration-tests-{test_name}");
98 let compose = Compose::new(test_dir, env_config.clone(), network_name.clone())?;
99
100 let shared_image_name = format!(
104 "vector-test-runner-{}:latest",
105 RustToolchainConfig::rust_version()
106 );
107 let runner_name = if docker_image_exists(&shared_image_name).unwrap_or(false) {
108 info!("Using shared runner image: {shared_image_name}");
109 None
110 } else {
111 info!("Shared runner image not found, will build image for: {test_name}");
112 Some(test_name.clone())
113 };
114
115 let runner = IntegrationTestRunner::new(
116 runner_name,
117 &config.runner,
118 compose.is_some().then_some(network_name),
119 )?;
120
121 env_config.insert("VECTOR_IMAGE".to_string(), Some(runner.image_name()));
122
123 let compose_test = ComposeTest {
124 local_config,
125 test_name,
126 environment,
127 config,
128 runner,
129 compose,
130 env_config: rename_environment_keys(&env_config),
131 retries,
132 coverage,
133 };
134 trace!("Generated {compose_test:#?}");
135 Ok(compose_test)
136 }
137
138 fn project_name(&self) -> String {
139 let sanitized_env = self.environment.replace('.', "-");
142 format!(
143 "vector-{}-{}-{}",
144 self.local_config.directory, self.test_name, sanitized_env
145 )
146 }
147
148 fn is_running(&self) -> Result<bool> {
149 let Some(compose) = &self.compose else {
150 return Ok(false);
151 };
152
153 let output = Command::new(CONTAINER_TOOL.clone())
154 .args([
155 "compose",
156 "--project-name",
157 &self.project_name(),
158 "ps",
159 "--format",
160 "json",
161 "--status",
162 "running",
163 ])
164 .current_dir(&compose.test_dir)
165 .envs(
166 compose
167 .env
168 .iter()
169 .filter_map(|(k, v)| v.as_ref().map(|val| (k, val))),
170 )
171 .output()
172 .with_context(|| "Failed to check if compose environment is running")?;
173
174 Ok(!output.stdout.is_empty() && output.stdout != b"[]\n" && output.stdout != b"[]")
176 }
177
178 pub(crate) fn test(&self, extra_args: Vec<String>) -> Result<()> {
179 let was_running = self.is_running()?;
180 self.config.check_required()?;
181
182 if !was_running {
183 self.start()?;
184 }
185
186 let mut env_vars = self.config.env.clone();
187 for (key, value) in self.env_config.clone() {
189 env_vars.insert(key, value);
190 }
191
192 env_vars.insert("VECTOR_LOG".to_string(), Some("info".into()));
193 let mut args = self.config.args.clone().unwrap_or_default();
194
195 args.push("--features".to_string());
196
197 args.push(if self.runner.is_shared_runner() {
200 format!("{},vendored", self.local_config.feature_flag)
201 } else {
202 format!("{},vendored", self.config.features.join(","))
203 });
204
205 match self.config.test {
207 Some(ref test_arg) => {
208 args.push("--test".to_string());
209 args.push(test_arg.clone());
210 }
211 None => args.push("--lib".to_string()),
212 }
213
214 if let Some(ref filter) = self.config.test_filter {
216 args.push(filter.clone());
217 }
218 args.extend(extra_args);
219
220 if !args.contains(&"--test-threads".to_string()) {
222 args.push("--no-capture".to_string());
223 }
224
225 if self.retries > 0 {
226 args.push("--retries".to_string());
227 args.push(self.retries.to_string());
228 }
229
230 self.runner.test(
231 &env_vars,
232 &self.config.runner.env,
233 Some(&self.config.features),
234 &args,
235 self.local_config.kind == ComposeTestKind::E2E,
236 self.coverage,
237 if self.coverage {
238 Some(self.environment.as_str())
239 } else {
240 None
241 },
242 )?;
243
244 Ok(())
245 }
246
247 pub(crate) fn start(&self) -> Result<()> {
248 if self.local_config.kind == ComposeTestKind::E2E {
252 self.runner.build(
253 Some(&self.config.features),
254 &self.env_config,
255 true, )?;
257 }
258
259 self.config.check_required()?;
260 if let Some(compose) = &self.compose {
261 self.runner.ensure_network()?;
262 self.runner.ensure_external_volumes()?;
263
264 if self.is_running()? {
265 bail!("environment is already up");
266 }
267
268 let project_name = self.project_name();
269 compose.start(&self.env_config, &project_name)?;
270 }
271 Ok(())
272 }
273
274 pub(crate) fn stop(&self) -> Result<()> {
275 if let Some(compose) = &self.compose {
276 if !self.is_running()? {
277 bail!("No environment for {} is up.", self.test_name);
278 }
279
280 let project_name = self.project_name();
281 compose.stop(&self.env_config, &project_name)?;
282 }
283
284 self.runner.remove()?;
285
286 Ok(())
287 }
288}
289
290#[derive(Debug)]
291struct Compose {
292 yaml_path: PathBuf,
293 test_dir: PathBuf,
294 env: Environment,
295 #[cfg_attr(target_family = "windows", allow(dead_code))]
296 config: ComposeConfig,
297 network: String,
298}
299
300impl Compose {
301 fn new(test_dir: PathBuf, env: Environment, network: String) -> Result<Option<Self>> {
302 let yaml_path: PathBuf = [&test_dir, Path::new("compose.yaml")].iter().collect();
303
304 match yaml_path.try_exists() {
305 Err(error) => {
306 Err(error).with_context(|| format!("Could not lookup {}", yaml_path.display()))
307 }
308 Ok(false) => Ok(None),
309 Ok(true) => {
310 let config = ComposeConfig::parse(&yaml_path)?;
312
313 Ok(Some(Self {
314 yaml_path,
315 test_dir,
316 env,
317 config,
318 network,
319 }))
320 }
321 }
322 }
323
324 fn start(&self, environment: &Environment, project_name: &str) -> Result<()> {
325 #[cfg(unix)]
326 unix::prepare_compose_volumes(&self.config, &self.test_dir, environment)?;
327
328 self.run(
329 "Starting",
330 &["up", "--detach"],
331 Some(environment),
332 project_name,
333 )
334 }
335
336 fn stop(&self, environment: &Environment, project_name: &str) -> Result<()> {
337 self.run(
338 "Stopping",
339 &["down", "--timeout", "0", "--volumes", "--remove-orphans"],
340 Some(environment),
341 project_name,
342 )
343 }
344
345 fn run(
346 &self,
347 action: &str,
348 args: &[&'static str],
349 environment: Option<&Environment>,
350 project_name: &str,
351 ) -> Result<()> {
352 let mut command = Command::new(CONTAINER_TOOL.clone());
353 command.arg("compose");
354 command.arg("--project-name");
355 command.arg(project_name);
356 command.arg("--file");
357 command.arg(&self.yaml_path);
358
359 command.args(args);
360
361 command.current_dir(&self.test_dir);
362
363 command.env("DOCKER_SOCKET", &*DOCKER_SOCKET);
364 command.env(NETWORK_ENV_VAR, &self.network);
365
366 command.env("RUST_VERSION", RustToolchainConfig::rust_version());
368
369 for (key, value) in &self.env {
370 if let Some(value) = value {
371 command.env(key, value);
372 }
373 }
374 if let Some(environment) = environment {
375 command.envs(extract_present(environment));
376 }
377
378 waiting!("{action} service environment");
379 command.check_run()
380 }
381}
382
383#[cfg(unix)]
384mod unix {
385 use std::{
386 fs::{self, Metadata, Permissions},
387 os::unix::fs::PermissionsExt as _,
388 path::{Path, PathBuf},
389 };
390
391 use anyhow::{Context, Result};
392
393 use super::super::config::ComposeConfig;
394 use crate::{
395 testing::config::VolumeMount,
396 utils::environment::{Environment, resolve_placeholders},
397 };
398
399 const ALL_READ: u32 = 0o444;
401 const ALL_READ_DIR: u32 = 0o555;
403
404 pub fn prepare_compose_volumes(
406 config: &ComposeConfig,
407 test_dir: &Path,
408 environment: &Environment,
409 ) -> Result<()> {
410 for service in config.services.values() {
411 if let Some(volumes) = &service.volumes {
412 for volume in volumes {
413 let source = match volume {
414 VolumeMount::Short(s) => {
415 s.split_once(':').map(|(s, _)| s).ok_or_else(|| {
416 anyhow::anyhow!("Invalid short volume mount format: {s}")
417 })?
418 }
419 VolumeMount::Long { source, .. } => source,
420 };
421 let source = resolve_placeholders(source, environment);
422 if !config.volumes.contains_key(&source)
423 && !source.starts_with('/')
424 && !source.starts_with('$')
425 {
426 let path: PathBuf = [test_dir, Path::new(&source)].iter().collect();
427 add_read_permission(&path)?;
428 }
429 }
430 }
431 }
432 Ok(())
433 }
434
435 fn add_read_permission(path: &Path) -> Result<()> {
437 let metadata = path
438 .metadata()
439 .with_context(|| format!("Could not get permissions on {}", path.display()))?;
440
441 if metadata.is_file() {
442 add_permission(path, &metadata, ALL_READ)
443 } else {
444 if metadata.is_dir() {
445 add_permission(path, &metadata, ALL_READ_DIR)?;
446 for entry in fs::read_dir(path)
447 .with_context(|| format!("Could not read directory {}", path.display()))?
448 {
449 let entry = entry.with_context(|| {
450 format!("Could not read directory entry in {}", path.display())
451 })?;
452 add_read_permission(&entry.path())?;
453 }
454 }
455 Ok(())
456 }
457 }
458
459 fn add_permission(path: &Path, metadata: &Metadata, bits: u32) -> Result<()> {
460 let perms = metadata.permissions();
461 let new_perms = Permissions::from_mode(perms.mode() | bits);
462 if new_perms != perms {
463 fs::set_permissions(path, new_perms)
464 .with_context(|| format!("Could not set permissions on {}", path.display()))?;
465 }
466 Ok(())
467 }
468}