1#![allow(missing_docs)]
2use std::{collections::HashMap, env, path::PathBuf};
3
4use bollard::{
5 API_DEFAULT_VERSION, Docker,
6 errors::Error as DockerError,
7 models::HostConfig,
8 query_parameters::{
9 CreateContainerOptionsBuilder, CreateImageOptionsBuilder, ListImagesOptionsBuilder,
10 RemoveContainerOptions, StartContainerOptions, StopContainerOptions,
11 },
12 secret::ContainerCreateBody,
13};
14use futures::StreamExt;
15use http::uri::Uri;
16use snafu::Snafu;
17use vector_lib::configurable::configurable_component;
18
19const DEFAULT_TIMEOUT: u64 = 120;
21
22#[derive(Debug, Snafu)]
23pub enum Error {
24 #[snafu(display("URL has no host."))]
25 NoHost,
26}
27
28#[configurable_component]
34#[derive(Clone, Debug)]
35#[serde(deny_unknown_fields)]
36pub struct DockerTlsConfig {
37 ca_file: PathBuf,
39
40 crt_file: PathBuf,
42
43 key_file: PathBuf,
45}
46
47pub fn docker(host: Option<String>, tls: Option<DockerTlsConfig>) -> crate::Result<Docker> {
48 let host = host.or_else(|| env::var("DOCKER_HOST").ok());
49
50 match host {
51 None => Docker::connect_with_defaults().map_err(Into::into),
52 Some(host) => {
53 let scheme = host
54 .parse::<Uri>()
55 .ok()
56 .and_then(|uri| uri.into_parts().scheme);
57
58 match scheme.as_ref().map(|scheme| scheme.as_str()) {
59 Some("http") | Some("tcp") => {
60 let host = get_authority(&host)?;
61 Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
62 .map_err(Into::into)
63 }
64 Some("https") => {
65 let host = get_authority(&host)?;
66 let tls = tls
67 .or_else(default_certs)
68 .ok_or(DockerError::NoHomePathError)?;
69 Docker::connect_with_ssl(
70 &host,
71 &tls.key_file,
72 &tls.crt_file,
73 &tls.ca_file,
74 DEFAULT_TIMEOUT,
75 API_DEFAULT_VERSION,
76 )
77 .map_err(Into::into)
78 }
79 Some("unix") | Some("npipe") | None => {
80 Docker::connect_with_socket(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
81 .map_err(Into::into)
82 }
83 Some(scheme) => Err(format!("Unknown scheme: {scheme}").into()),
84 }
85 }
86 }
87}
88
89fn default_certs() -> Option<DockerTlsConfig> {
91 let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
92 let base = match from_env {
93 Ok(path) => PathBuf::from(path),
94 Err(_) => dirs_next::home_dir()?.join(".docker"),
95 };
96 Some(DockerTlsConfig {
97 ca_file: base.join("ca.pem"),
98 key_file: base.join("key.pem"),
99 crt_file: base.join("cert.pem"),
100 })
101}
102
103fn get_authority(url: &str) -> Result<String, Error> {
104 url.parse::<Uri>()
105 .ok()
106 .and_then(|uri| uri.authority().map(<_>::to_string))
107 .ok_or(Error::NoHost)
108}
109
110async fn pull_image(docker: &Docker, image: &str, tag: &str) {
111 let mut filters = HashMap::new();
112 filters.insert(
113 String::from("reference"),
114 vec![format!("{}:{}", image, tag)],
115 );
116
117 let options = Some(ListImagesOptionsBuilder::new().filters(&filters).build());
118
119 let images = docker.list_images(options).await.unwrap();
120 if images.is_empty() {
121 let options = Some(
123 CreateImageOptionsBuilder::new()
124 .from_image(image)
125 .tag(tag)
126 .build(),
127 );
128
129 docker
130 .create_image(options, None, None)
131 .for_each(|item| async move {
132 let info = item.unwrap();
133 if let Some(error) = info.error {
134 panic!("{error:?}");
135 }
136 })
137 .await
138 }
139}
140
141async fn remove_container(docker: &Docker, id: &str) {
142 trace!("Stopping container.");
143
144 _ = docker
145 .stop_container(id, None::<StopContainerOptions>)
146 .await
147 .map_err(|e| error!(%e));
148
149 trace!("Removing container.");
150
151 _ = docker
153 .remove_container(id, None::<RemoveContainerOptions>)
154 .await
155 .map_err(|e| error!(%e));
156}
157
158pub struct Container {
159 image: &'static str,
160 tag: &'static str,
161 binds: Option<Vec<String>>,
162 cmd: Option<Vec<String>>,
163}
164
165impl Container {
166 pub const fn new(image: &'static str, tag: &'static str) -> Self {
167 Self {
168 image,
169 tag,
170 binds: None,
171 cmd: None,
172 }
173 }
174
175 pub fn bind(mut self, src: impl std::fmt::Display, dst: &str) -> Self {
176 let bind = format!("{src}:{dst}");
177 self.binds.get_or_insert_with(Vec::new).push(bind);
178 self
179 }
180
181 pub fn cmd(mut self, option: &str) -> Self {
182 self.cmd.get_or_insert_with(Vec::new).push(option.into());
183 self
184 }
185
186 pub async fn run<T>(self, doit: impl futures::Future<Output = T>) -> T {
187 let docker = docker(None, None).unwrap();
188
189 pull_image(&docker, self.image, self.tag).await;
190
191 let options = CreateContainerOptionsBuilder::new()
192 .name(&format!("vector_test_{}", uuid::Uuid::new_v4()))
193 .build();
194
195 let config = ContainerCreateBody {
196 image: Some(format!("{}:{}", &self.image, &self.tag)),
197 cmd: self.cmd,
198 host_config: Some(HostConfig {
199 network_mode: Some(String::from("host")),
200 extra_hosts: Some(vec!["host.docker.internal:host-gateway".into()]),
201 binds: self.binds,
202 ..Default::default()
203 }),
204 ..Default::default()
205 };
206
207 let container = docker
208 .create_container(Some(options), config)
209 .await
210 .unwrap();
211
212 docker
213 .start_container(&container.id, None::<StartContainerOptions>)
214 .await
215 .unwrap();
216
217 let result = doit.await;
218
219 remove_container(&docker, &container.id).await;
220
221 result
222 }
223}