1use std::{
2 collections::BTreeMap,
3 fs,
4 path::{Path, PathBuf},
5 process::Command,
6};
7
8use anyhow::{Context, Result, bail};
9use tempfile::{Builder, NamedTempFile};
10
11use super::{
12 config::{
13 ComposeConfig, ComposeTestConfig, E2E_TESTS_DIR, INTEGRATION_TESTS_DIR, RustToolchainConfig,
14 },
15 runner::{ContainerTestRunner as _, IntegrationTestRunner, TestRunner as _},
16 state::EnvsDir,
17};
18use crate::{
19 app::CommandExt as _,
20 environment::{Environment, extract_present, rename_environment_keys},
21 testing::{
22 build::ALL_INTEGRATIONS_FEATURE_FLAG,
23 docker::{CONTAINER_TOOL, DOCKER_SOCKET},
24 },
25};
26
27const NETWORK_ENV_VAR: &str = "VECTOR_NETWORK";
28const E2E_FEATURE_FLAG: &str = "all-e2e-tests";
29
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
31pub(crate) enum ComposeTestKind {
32 E2E,
33 Integration,
34}
35
36#[derive(Clone, Copy, Debug)]
37pub(crate) struct ComposeTestLocalConfig {
38 pub(crate) kind: ComposeTestKind,
39 pub(crate) directory: &'static str,
40 pub(crate) feature_flag: &'static str,
41}
42
43impl ComposeTestLocalConfig {
44 pub(crate) fn integration() -> Self {
47 Self {
48 kind: ComposeTestKind::Integration,
49 directory: INTEGRATION_TESTS_DIR,
50 feature_flag: ALL_INTEGRATIONS_FEATURE_FLAG,
51 }
52 }
53
54 pub(crate) fn e2e() -> Self {
57 Self {
58 kind: ComposeTestKind::E2E,
59 directory: E2E_TESTS_DIR,
60 feature_flag: E2E_FEATURE_FLAG,
61 }
62 }
63}
64
65#[derive(Debug)]
66pub(crate) struct ComposeTest {
67 local_config: ComposeTestLocalConfig,
68 test_name: String,
69 environment: String,
70 config: ComposeTestConfig,
71 envs_dir: EnvsDir,
72 runner: IntegrationTestRunner,
73 compose: Option<Compose>,
74 env_config: Environment,
75 build_all: bool,
76 retries: u8,
77}
78
79impl ComposeTest {
80 pub(crate) fn generate(
81 local_config: ComposeTestLocalConfig,
82 test_name: impl Into<String>,
83 environment: impl Into<String>,
84 build_all: bool,
85 retries: u8,
86 ) -> Result<ComposeTest> {
87 let test_name: String = test_name.into();
88 let environment = environment.into();
89 let (test_dir, config) = ComposeTestConfig::load(local_config.directory, &test_name)?;
90 let envs_dir = EnvsDir::new(&test_name);
91 let Some(mut env_config) = config.environments().get(&environment).cloned() else {
92 bail!("Could not find environment named {environment:?}");
93 };
94
95 let network_name = format!("vector-integration-tests-{test_name}");
96 let compose = Compose::new(test_dir, env_config.clone(), network_name.clone())?;
97
98 let runner_name = (!build_all).then(|| test_name.clone());
100
101 let runner = IntegrationTestRunner::new(
102 runner_name,
103 &config.runner,
104 compose.is_some().then_some(network_name),
105 )?;
106
107 env_config.insert("VECTOR_IMAGE".to_string(), Some(runner.image_name()));
108
109 let compose_test = ComposeTest {
110 local_config,
111 test_name,
112 environment,
113 config,
114 envs_dir,
115 runner,
116 compose,
117 env_config: rename_environment_keys(&env_config),
118 build_all,
119 retries,
120 };
121 trace!("Generated {compose_test:#?}");
122 Ok(compose_test)
123 }
124
125 pub(crate) fn test(&self, extra_args: Vec<String>) -> Result<()> {
126 let active = self.envs_dir.check_active(&self.environment)?;
127 self.config.check_required()?;
128
129 if !active {
130 self.start()?;
131 }
132
133 let mut env_vars = self.config.env.clone();
134 for (key, value) in self.env_config.clone() {
136 env_vars.insert(key, value);
137 }
138
139 env_vars.insert("TEST_LOG".to_string(), Some("info".into()));
140 let mut args = self.config.args.clone().unwrap_or_default();
141
142 args.push("--features".to_string());
143
144 args.push(if self.build_all {
145 self.local_config.feature_flag.to_string()
146 } else {
147 self.config.features.join(",")
148 });
149
150 match self.config.test {
152 Some(ref test_arg) => {
153 args.push("--test".to_string());
154 args.push(test_arg.to_string());
155 }
156 None => args.push("--lib".to_string()),
157 }
158
159 if let Some(ref filter) = self.config.test_filter {
161 args.push(filter.to_string());
162 }
163 args.extend(extra_args);
164
165 if !args.contains(&"--test-threads".to_string()) {
167 args.push("--no-capture".to_string());
168 }
169
170 if self.retries > 0 {
171 args.push("--retries".to_string());
172 args.push(self.retries.to_string());
173 }
174
175 self.runner.test(
176 &env_vars,
177 &self.config.runner.env,
178 Some(&self.config.features),
179 &args,
180 self.local_config.directory,
181 )?;
182
183 if !active {
184 self.runner.remove()?;
185 self.stop()?;
186 }
187 Ok(())
188 }
189
190 pub(crate) fn start(&self) -> Result<()> {
191 if self.local_config.kind == ComposeTestKind::E2E {
195 self.runner.build(
196 Some(&self.config.features),
197 self.local_config.directory,
198 &self.env_config,
199 )?;
200 }
201
202 self.config.check_required()?;
203 if let Some(compose) = &self.compose {
204 self.runner.ensure_network()?;
205
206 if self.envs_dir.check_active(&self.environment)? {
207 bail!("environment is already up");
208 }
209
210 compose.start(&self.env_config)?;
211
212 self.envs_dir.save(&self.environment, &self.env_config)
213 } else {
214 Ok(())
215 }
216 }
217
218 pub(crate) fn stop(&self) -> Result<()> {
219 if let Some(compose) = &self.compose {
220 if self.envs_dir.load()?.is_none() {
222 bail!("No environment for {} is up.", self.test_name);
223 }
224
225 self.runner.remove()?;
226 compose.stop()?;
227 self.envs_dir.remove()?;
228 }
229
230 Ok(())
231 }
232}
233
234#[derive(Debug)]
235struct Compose {
236 original_path: PathBuf,
237 test_dir: PathBuf,
238 env: Environment,
239 #[cfg_attr(target_family = "windows", allow(dead_code))]
240 config: ComposeConfig,
241 network: String,
242 temp_file: NamedTempFile,
243}
244
245impl Compose {
246 fn new(test_dir: PathBuf, env: Environment, network: String) -> Result<Option<Self>> {
247 let original_path: PathBuf = [&test_dir, Path::new("compose.yaml")].iter().collect();
248
249 match original_path.try_exists() {
250 Err(error) => {
251 Err(error).with_context(|| format!("Could not lookup {}", original_path.display()))
252 }
253 Ok(false) => Ok(None),
254 Ok(true) => {
255 let mut config = ComposeConfig::parse(&original_path)?;
256 config.networks.insert(
258 "default".to_string(),
259 BTreeMap::from_iter([
260 ("name".to_string(), network.clone()),
261 ("external".to_string(), "true".to_string()),
262 ]),
263 );
264
265 let temp_file = Builder::new()
269 .prefix("compose-temp-")
270 .suffix(".yaml")
271 .tempfile_in(&test_dir)
272 .with_context(|| "Failed to create temporary compose file")?;
273
274 fs::write(
275 temp_file.path(),
276 serde_yaml::to_string(&config)
277 .with_context(|| "Failed to serialize modified compose.yaml")?,
278 )?;
279
280 Ok(Some(Self {
281 original_path,
282 test_dir,
283 env,
284 config,
285 network,
286 temp_file,
287 }))
288 }
289 }
290 }
291
292 fn start(&self, environment: &Environment) -> Result<()> {
293 #[cfg(unix)]
294 unix::prepare_compose_volumes(&self.config, &self.test_dir, environment)?;
295
296 self.run("Starting", &["up", "--detach"], Some(environment))
297 }
298
299 fn stop(&self) -> Result<()> {
300 self.run(
302 "Stopping",
303 &["down", "--timeout", "0", "--volumes", "--remove-orphans"],
304 None,
305 )
306 }
307
308 fn run(
309 &self,
310 action: &str,
311 args: &[&'static str],
312 environment: Option<&Environment>,
313 ) -> Result<()> {
314 let mut command = Command::new(CONTAINER_TOOL.clone());
315 command.arg("compose");
316 command.arg("--file");
323 if self.temp_file.path().exists() {
324 command.arg(self.temp_file.path());
325 } else {
326 command.arg(&self.original_path);
327 }
328
329 command.args(args);
330
331 command.current_dir(&self.test_dir);
332
333 command.env("DOCKER_SOCKET", &*DOCKER_SOCKET);
334 command.env(NETWORK_ENV_VAR, &self.network);
335
336 command.env("RUST_VERSION", RustToolchainConfig::rust_version());
338
339 for (key, value) in &self.env {
340 if let Some(value) = value {
341 command.env(key, value);
342 }
343 }
344 if let Some(environment) = environment {
345 command.envs(extract_present(environment));
346 }
347
348 waiting!("{action} service environment");
349 command.check_run()
350 }
351}
352
353#[cfg(unix)]
354mod unix {
355 use std::{
356 fs::{self, Metadata, Permissions},
357 os::unix::fs::PermissionsExt as _,
358 path::{Path, PathBuf},
359 };
360
361 use anyhow::{Context, Result};
362
363 use super::super::config::ComposeConfig;
364 use crate::{
365 environment::{Environment, resolve_placeholders},
366 testing::config::VolumeMount,
367 };
368
369 const ALL_READ: u32 = 0o444;
371 const ALL_READ_DIR: u32 = 0o555;
373
374 pub fn prepare_compose_volumes(
376 config: &ComposeConfig,
377 test_dir: &Path,
378 environment: &Environment,
379 ) -> Result<()> {
380 for service in config.services.values() {
381 if let Some(volumes) = &service.volumes {
382 for volume in volumes {
383 let source = match volume {
384 VolumeMount::Short(s) => {
385 s.split_once(':').map(|(s, _)| s).ok_or_else(|| {
386 anyhow::anyhow!("Invalid short volume mount format: {s}")
387 })?
388 }
389 VolumeMount::Long { source, .. } => source,
390 };
391 let source = resolve_placeholders(source, environment);
392 if !config.volumes.contains_key(&source)
393 && !source.starts_with('/')
394 && !source.starts_with('$')
395 {
396 let path: PathBuf = [test_dir, Path::new(&source)].iter().collect();
397 add_read_permission(&path)?;
398 }
399 }
400 }
401 }
402 Ok(())
403 }
404
405 fn add_read_permission(path: &Path) -> Result<()> {
407 let metadata = path
408 .metadata()
409 .with_context(|| format!("Could not get permissions on {}", path.display()))?;
410
411 if metadata.is_file() {
412 add_permission(path, &metadata, ALL_READ)
413 } else {
414 if metadata.is_dir() {
415 add_permission(path, &metadata, ALL_READ_DIR)?;
416 for entry in fs::read_dir(path)
417 .with_context(|| format!("Could not read directory {}", path.display()))?
418 {
419 let entry = entry.with_context(|| {
420 format!("Could not read directory entry in {}", path.display())
421 })?;
422 add_read_permission(&entry.path())?;
423 }
424 }
425 Ok(())
426 }
427 }
428
429 fn add_permission(path: &Path, metadata: &Metadata, bits: u32) -> Result<()> {
430 let perms = metadata.permissions();
431 let new_perms = Permissions::from_mode(perms.mode() | bits);
432 if new_perms != perms {
433 fs::set_permissions(path, new_perms)
434 .with_context(|| format!("Could not set permissions on {}", path.display()))?;
435 }
436 Ok(())
437 }
438}