1use std::{collections::BTreeMap, fs, path::Path, path::PathBuf, process::Command};
2
3use anyhow::{bail, Context, Result};
4use tempfile::{Builder, NamedTempFile};
5
6use super::config::{
7 ComposeConfig, ComposeTestConfig, Environment, RustToolchainConfig, E2E_TESTS_DIR,
8 INTEGRATION_TESTS_DIR,
9};
10use super::runner::{ContainerTestRunner as _, IntegrationTestRunner, TestRunner as _};
11use super::state::EnvsDir;
12use crate::app::CommandExt as _;
13use crate::testing::build::ALL_INTEGRATIONS_FEATURE_FLAG;
14use crate::testing::docker::{CONTAINER_TOOL, DOCKER_SOCKET};
15
16const NETWORK_ENV_VAR: &str = "VECTOR_NETWORK";
17const E2E_FEATURE_FLAG: &str = "all-e2e-tests";
18
19#[derive(Clone, Copy, PartialEq, Eq)]
20pub(crate) enum ComposeTestKind {
21 E2E,
22 Integration,
23}
24
25#[derive(Clone, Copy)]
26pub(crate) struct ComposeTestLocalConfig {
27 pub(crate) kind: ComposeTestKind,
28 pub(crate) directory: &'static str,
29 pub(crate) feature_flag: &'static str,
30}
31
32impl ComposeTestLocalConfig {
33 pub(crate) fn integration() -> Self {
36 Self {
37 kind: ComposeTestKind::Integration,
38 directory: INTEGRATION_TESTS_DIR,
39 feature_flag: ALL_INTEGRATIONS_FEATURE_FLAG,
40 }
41 }
42
43 pub(crate) fn e2e() -> Self {
46 Self {
47 kind: ComposeTestKind::E2E,
48 directory: E2E_TESTS_DIR,
49 feature_flag: E2E_FEATURE_FLAG,
50 }
51 }
52}
53
54pub(crate) struct ComposeTest {
55 local_config: ComposeTestLocalConfig,
56 test_name: String,
57 environment: String,
58 config: ComposeTestConfig,
59 envs_dir: EnvsDir,
60 runner: IntegrationTestRunner,
61 compose: Option<Compose>,
62 env_config: Environment,
63 build_all: bool,
64 retries: u8,
65}
66
67impl ComposeTest {
68 pub(crate) fn generate(
69 local_config: ComposeTestLocalConfig,
70 test_name: impl Into<String>,
71 environment: impl Into<String>,
72 build_all: bool,
73 retries: u8,
74 ) -> Result<ComposeTest> {
75 let test_name: String = test_name.into();
76 let environment = environment.into();
77 let (test_dir, config) = ComposeTestConfig::load(local_config.directory, &test_name)?;
78 let envs_dir = EnvsDir::new(&test_name);
79 let Some(mut env_config) = config.environments().get(&environment).cloned() else {
80 bail!("Could not find environment named {environment:?}");
81 };
82
83 let network_name = format!("vector-integration-tests-{test_name}");
84 let compose = Compose::new(test_dir, env_config.clone(), network_name.clone())?;
85
86 let runner_name = (!build_all).then(|| test_name.clone());
88
89 let runner = IntegrationTestRunner::new(
90 runner_name,
91 &config.runner,
92 compose.is_some().then_some(network_name),
93 )?;
94
95 env_config.insert("VECTOR_IMAGE".to_string(), Some(runner.image_name()));
96
97 Ok(ComposeTest {
98 local_config,
99 test_name,
100 environment,
101 config,
102 envs_dir,
103 runner,
104 compose,
105 env_config,
106 build_all,
107 retries,
108 })
109 }
110
111 pub(crate) fn test(&self, extra_args: Vec<String>) -> Result<()> {
112 let active = self.envs_dir.check_active(&self.environment)?;
113 self.config.check_required()?;
114
115 if !active {
116 self.start()?;
117 }
118
119 let mut env_vars = self.config.env.clone();
120 for (key, value) in config_env(&self.env_config) {
122 env_vars.insert(key, Some(value));
123 }
124
125 env_vars.insert("TEST_LOG".to_string(), Some("info".into()));
126 let mut args = self.config.args.clone().unwrap_or_default();
127
128 args.push("--features".to_string());
129
130 args.push(if self.build_all {
131 self.local_config.feature_flag.to_string()
132 } else {
133 self.config.features.join(",")
134 });
135
136 match self.config.test {
138 Some(ref test_arg) => {
139 args.push("--test".to_string());
140 args.push(test_arg.to_string());
141 }
142 None => args.push("--lib".to_string()),
143 }
144
145 if let Some(ref filter) = self.config.test_filter {
147 args.push(filter.to_string());
148 }
149 args.extend(extra_args);
150
151 if !args.contains(&"--test-threads".to_string()) {
153 args.push("--no-capture".to_string());
154 }
155
156 if self.retries > 0 {
157 args.push("--retries".to_string());
158 args.push(self.retries.to_string());
159 }
160
161 self.runner.test(
162 &env_vars,
163 &self.config.runner.env,
164 Some(&self.config.features),
165 &args,
166 self.local_config.directory,
167 )?;
168
169 if !active {
170 self.runner.remove()?;
171 self.stop()?;
172 }
173 Ok(())
174 }
175
176 pub(crate) fn start(&self) -> Result<()> {
177 if self.local_config.kind == ComposeTestKind::E2E {
181 self.runner
182 .build(Some(&self.config.features), self.local_config.directory)?;
183 }
184
185 self.config.check_required()?;
186 if let Some(compose) = &self.compose {
187 self.runner.ensure_network()?;
188
189 if self.envs_dir.check_active(&self.environment)? {
190 bail!("environment is already up");
191 }
192
193 compose.start(&self.env_config)?;
194
195 self.envs_dir.save(&self.environment, &self.env_config)
196 } else {
197 Ok(())
198 }
199 }
200
201 pub(crate) fn stop(&self) -> Result<()> {
202 if let Some(compose) = &self.compose {
203 if self.envs_dir.load()?.is_none() {
205 bail!("No environment for {} is up.", self.test_name);
206 }
207
208 self.runner.remove()?;
209 compose.stop()?;
210 self.envs_dir.remove()?;
211 }
212
213 Ok(())
214 }
215}
216
217struct Compose {
218 original_path: PathBuf,
219 test_dir: PathBuf,
220 env: Environment,
221 #[cfg_attr(target_family = "windows", allow(dead_code))]
222 config: ComposeConfig,
223 network: String,
224 temp_file: NamedTempFile,
225}
226
227impl Compose {
228 fn new(test_dir: PathBuf, env: Environment, network: String) -> Result<Option<Self>> {
229 let original_path: PathBuf = [&test_dir, Path::new("compose.yaml")].iter().collect();
230
231 match original_path.try_exists() {
232 Err(error) => {
233 Err(error).with_context(|| format!("Could not lookup {}", original_path.display()))
234 }
235 Ok(false) => Ok(None),
236 Ok(true) => {
237 let mut config = ComposeConfig::parse(&original_path)?;
238 config.networks.insert(
240 "default".to_string(),
241 BTreeMap::from_iter([
242 ("name".to_string(), network.clone()),
243 ("external".to_string(), "true".to_string()),
244 ]),
245 );
246
247 let temp_file = Builder::new()
251 .prefix("compose-temp-")
252 .suffix(".yaml")
253 .tempfile_in(&test_dir)
254 .with_context(|| "Failed to create temporary compose file")?;
255
256 fs::write(
257 temp_file.path(),
258 serde_yaml::to_string(&config)
259 .with_context(|| "Failed to serialize modified compose.yaml")?,
260 )?;
261
262 Ok(Some(Self {
263 original_path,
264 test_dir,
265 env,
266 config,
267 network,
268 temp_file,
269 }))
270 }
271 }
272 }
273
274 fn start(&self, config: &Environment) -> Result<()> {
275 self.prepare()?;
276 self.run("Starting", &["up", "--detach"], Some(config))
277 }
278
279 fn stop(&self) -> Result<()> {
280 self.run(
282 "Stopping",
283 &["down", "--timeout", "0", "--volumes", "--remove-orphans"],
284 None,
285 )
286 }
287
288 fn run(&self, action: &str, args: &[&'static str], config: Option<&Environment>) -> Result<()> {
289 let mut command = Command::new(CONTAINER_TOOL.clone());
290 command.arg("compose");
291 command.arg("--file");
298 if self.temp_file.path().exists() {
299 command.arg(self.temp_file.path());
300 } else {
301 command.arg(&self.original_path);
302 }
303
304 command.args(args);
305
306 command.current_dir(&self.test_dir);
307
308 command.env("DOCKER_SOCKET", &*DOCKER_SOCKET);
309 command.env(NETWORK_ENV_VAR, &self.network);
310
311 command.env("RUST_VERSION", RustToolchainConfig::rust_version());
313
314 for (key, value) in &self.env {
315 if let Some(value) = value {
316 command.env(key, value);
317 }
318 }
319 if let Some(config) = config {
320 command.envs(config_env(config));
321 }
322
323 waiting!("{action} service environment");
324 command.check_run()
325 }
326
327 fn prepare(&self) -> Result<()> {
328 #[cfg(unix)]
329 unix::prepare_compose_volumes(&self.config, &self.test_dir)?;
330 Ok(())
331 }
332}
333
334fn config_env(config: &Environment) -> impl Iterator<Item = (String, String)> + '_ {
335 config.iter().filter_map(|(var, value)| {
336 value.as_ref().map(|value| {
337 (
338 format!("CONFIG_{}", var.replace('-', "_").to_uppercase()),
339 value.to_string(),
340 )
341 })
342 })
343}
344
345#[cfg(unix)]
346mod unix {
347 use std::fs::{self, Metadata, Permissions};
348 use std::os::unix::fs::PermissionsExt as _;
349 use std::path::{Path, PathBuf};
350
351 use super::super::config::ComposeConfig;
352 use crate::testing::config::VolumeMount;
353 use anyhow::{Context, Result};
354
355 const ALL_READ: u32 = 0o444;
357 const ALL_READ_DIR: u32 = 0o555;
359
360 pub fn prepare_compose_volumes(config: &ComposeConfig, test_dir: &Path) -> Result<()> {
362 for service in config.services.values() {
363 if let Some(volumes) = &service.volumes {
364 for volume in volumes {
365 let source = match volume {
366 VolumeMount::Short(s) => {
367 s.split_once(':').map(|(s, _)| s).ok_or_else(|| {
368 anyhow::anyhow!("Invalid short volume mount format: {s}")
369 })?
370 }
371 VolumeMount::Long { source, .. } => source,
372 };
373
374 if !config.volumes.contains_key(source)
375 && !source.starts_with('/')
376 && !source.starts_with('$')
377 {
378 let path: PathBuf = [test_dir, Path::new(source)].iter().collect();
379 add_read_permission(&path)?;
380 }
381 }
382 }
383 }
384 Ok(())
385 }
386
387 fn add_read_permission(path: &Path) -> Result<()> {
389 let metadata = path
390 .metadata()
391 .with_context(|| format!("Could not get permissions on {}", path.display()))?;
392
393 if metadata.is_file() {
394 add_permission(path, &metadata, ALL_READ)
395 } else {
396 if metadata.is_dir() {
397 add_permission(path, &metadata, ALL_READ_DIR)?;
398 for entry in fs::read_dir(path)
399 .with_context(|| format!("Could not read directory {}", path.display()))?
400 {
401 let entry = entry.with_context(|| {
402 format!("Could not read directory entry in {}", path.display())
403 })?;
404 add_read_permission(&entry.path())?;
405 }
406 }
407 Ok(())
408 }
409 }
410
411 fn add_permission(path: &Path, metadata: &Metadata, bits: u32) -> Result<()> {
412 let perms = metadata.permissions();
413 let new_perms = Permissions::from_mode(perms.mode() | bits);
414 if new_perms != perms {
415 fs::set_permissions(path, new_perms)
416 .with_context(|| format!("Could not set permissions on {}", path.display()))?;
417 }
418 Ok(())
419 }
420}