1#![allow(missing_docs)]
2use std::{collections::HashMap, env, path::PathBuf};
3
4use bollard::{
5 errors::Error as DockerError,
6 models::HostConfig,
7 query_parameters::{
8 CreateContainerOptionsBuilder, CreateImageOptionsBuilder, ListImagesOptionsBuilder,
9 RemoveContainerOptions, StartContainerOptions, StopContainerOptions,
10 },
11 secret::ContainerCreateBody,
12 Docker, API_DEFAULT_VERSION,
13};
14use futures::StreamExt;
15use http::uri::Uri;
16use snafu::Snafu;
17use vector_lib::configurable::configurable_component;
18
19const DEFAULT_TIMEOUT: u64 = 120;
21
22#[derive(Debug, Snafu)]
23pub enum Error {
24 #[snafu(display("URL has no host."))]
25 NoHost,
26}
27
28#[configurable_component]
34#[derive(Clone, Debug)]
35#[serde(deny_unknown_fields)]
36pub struct DockerTlsConfig {
37 ca_file: PathBuf,
39
40 crt_file: PathBuf,
42
43 key_file: PathBuf,
45}
46
47pub fn docker(host: Option<String>, tls: Option<DockerTlsConfig>) -> crate::Result<Docker> {
48 let host = host.or_else(|| env::var("DOCKER_HOST").ok());
49
50 match host {
51 None => Docker::connect_with_defaults().map_err(Into::into),
52 Some(host) => {
53 let scheme = host
54 .parse::<Uri>()
55 .ok()
56 .and_then(|uri| uri.into_parts().scheme);
57
58 match scheme.as_ref().map(|scheme| scheme.as_str()) {
59 Some("http") | Some("tcp") => {
60 let host = get_authority(&host)?;
61 Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
62 .map_err(Into::into)
63 }
64 Some("https") => {
65 let host = get_authority(&host)?;
66 let tls = tls
67 .or_else(default_certs)
68 .ok_or(DockerError::NoHomePathError)?;
69 Docker::connect_with_ssl(
70 &host,
71 &tls.key_file,
72 &tls.crt_file,
73 &tls.ca_file,
74 DEFAULT_TIMEOUT,
75 API_DEFAULT_VERSION,
76 )
77 .map_err(Into::into)
78 }
79 Some("unix") | Some("npipe") | None => {
80 Docker::connect_with_defaults().map_err(Into::into)
81 }
82 Some(scheme) => Err(format!("Unknown scheme: {scheme}").into()),
83 }
84 }
85 }
86}
87
88fn default_certs() -> Option<DockerTlsConfig> {
90 let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
91 let base = match from_env {
92 Ok(path) => PathBuf::from(path),
93 Err(_) => dirs_next::home_dir()?.join(".docker"),
94 };
95 Some(DockerTlsConfig {
96 ca_file: base.join("ca.pem"),
97 key_file: base.join("key.pem"),
98 crt_file: base.join("cert.pem"),
99 })
100}
101
102fn get_authority(url: &str) -> Result<String, Error> {
103 url.parse::<Uri>()
104 .ok()
105 .and_then(|uri| uri.authority().map(<_>::to_string))
106 .ok_or(Error::NoHost)
107}
108
109async fn pull_image(docker: &Docker, image: &str, tag: &str) {
110 let mut filters = HashMap::new();
111 filters.insert(
112 String::from("reference"),
113 vec![format!("{}:{}", image, tag)],
114 );
115
116 let options = Some(ListImagesOptionsBuilder::new().filters(&filters).build());
117
118 let images = docker.list_images(options).await.unwrap();
119 if images.is_empty() {
120 let options = Some(
122 CreateImageOptionsBuilder::new()
123 .from_image(image)
124 .tag(tag)
125 .build(),
126 );
127
128 docker
129 .create_image(options, None, None)
130 .for_each(|item| async move {
131 let info = item.unwrap();
132 if let Some(error) = info.error {
133 panic!("{error:?}");
134 }
135 })
136 .await
137 }
138}
139
140async fn remove_container(docker: &Docker, id: &str) {
141 trace!("Stopping container.");
142
143 _ = docker
144 .stop_container(id, None::<StopContainerOptions>)
145 .await
146 .map_err(|e| error!(%e));
147
148 trace!("Removing container.");
149
150 _ = docker
152 .remove_container(id, None::<RemoveContainerOptions>)
153 .await
154 .map_err(|e| error!(%e));
155}
156
157pub struct Container {
158 image: &'static str,
159 tag: &'static str,
160 binds: Option<Vec<String>>,
161 cmd: Option<Vec<String>>,
162}
163
164impl Container {
165 pub const fn new(image: &'static str, tag: &'static str) -> Self {
166 Self {
167 image,
168 tag,
169 binds: None,
170 cmd: None,
171 }
172 }
173
174 pub fn bind(mut self, src: impl std::fmt::Display, dst: &str) -> Self {
175 let bind = format!("{src}:{dst}");
176 self.binds.get_or_insert_with(Vec::new).push(bind);
177 self
178 }
179
180 pub fn cmd(mut self, option: &str) -> Self {
181 self.cmd.get_or_insert_with(Vec::new).push(option.into());
182 self
183 }
184
185 pub async fn run<T>(self, doit: impl futures::Future<Output = T>) -> T {
186 let docker = docker(None, None).unwrap();
187
188 pull_image(&docker, self.image, self.tag).await;
189
190 let options = CreateContainerOptionsBuilder::new()
191 .name(&format!("vector_test_{}", uuid::Uuid::new_v4()))
192 .build();
193
194 let config = ContainerCreateBody {
195 image: Some(format!("{}:{}", &self.image, &self.tag)),
196 cmd: self.cmd,
197 host_config: Some(HostConfig {
198 network_mode: Some(String::from("host")),
199 extra_hosts: Some(vec!["host.docker.internal:host-gateway".into()]),
200 binds: self.binds,
201 ..Default::default()
202 }),
203 ..Default::default()
204 };
205
206 let container = docker
207 .create_container(Some(options), config)
208 .await
209 .unwrap();
210
211 docker
212 .start_container(&container.id, None::<StartContainerOptions>)
213 .await
214 .unwrap();
215
216 let result = doit.await;
217
218 remove_container(&docker, &container.id).await;
219
220 result
221 }
222}