vector/
docker.rs

1#![allow(missing_docs)]
2use std::{collections::HashMap, env, path::PathBuf};
3
4use bollard::{
5    errors::Error as DockerError,
6    models::HostConfig,
7    query_parameters::{
8        CreateContainerOptionsBuilder, CreateImageOptionsBuilder, ListImagesOptionsBuilder,
9        RemoveContainerOptions, StartContainerOptions, StopContainerOptions,
10    },
11    secret::ContainerCreateBody,
12    Docker, API_DEFAULT_VERSION,
13};
14use futures::StreamExt;
15use http::uri::Uri;
16use snafu::Snafu;
17use vector_lib::configurable::configurable_component;
18
19// From bollard source.
20const DEFAULT_TIMEOUT: u64 = 120;
21
22#[derive(Debug, Snafu)]
23pub enum Error {
24    #[snafu(display("URL has no host."))]
25    NoHost,
26}
27
28/// Configuration of TLS when connecting to the Docker daemon.
29///
30/// Only relevant when connecting to Docker with an HTTPS URL.
31///
32/// If not configured, the environment variable `DOCKER_CERT_PATH` is used. If `DOCKER_CERT_PATH` is absent, then` DOCKER_CONFIG` is used. If both environment variables are absent, the certificates in `~/.docker/` are read.
33#[configurable_component]
34#[derive(Clone, Debug)]
35#[serde(deny_unknown_fields)]
36pub struct DockerTlsConfig {
37    /// Path to the CA certificate file.
38    ca_file: PathBuf,
39
40    /// Path to the TLS certificate file.
41    crt_file: PathBuf,
42
43    /// Path to the TLS key file.
44    key_file: PathBuf,
45}
46
47pub fn docker(host: Option<String>, tls: Option<DockerTlsConfig>) -> crate::Result<Docker> {
48    let host = host.or_else(|| env::var("DOCKER_HOST").ok());
49
50    match host {
51        None => Docker::connect_with_defaults().map_err(Into::into),
52        Some(host) => {
53            let scheme = host
54                .parse::<Uri>()
55                .ok()
56                .and_then(|uri| uri.into_parts().scheme);
57
58            match scheme.as_ref().map(|scheme| scheme.as_str()) {
59                Some("http") | Some("tcp") => {
60                    let host = get_authority(&host)?;
61                    Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
62                        .map_err(Into::into)
63                }
64                Some("https") => {
65                    let host = get_authority(&host)?;
66                    let tls = tls
67                        .or_else(default_certs)
68                        .ok_or(DockerError::NoHomePathError)?;
69                    Docker::connect_with_ssl(
70                        &host,
71                        &tls.key_file,
72                        &tls.crt_file,
73                        &tls.ca_file,
74                        DEFAULT_TIMEOUT,
75                        API_DEFAULT_VERSION,
76                    )
77                    .map_err(Into::into)
78                }
79                Some("unix") | Some("npipe") | None => {
80                    Docker::connect_with_defaults().map_err(Into::into)
81                }
82                Some(scheme) => Err(format!("Unknown scheme: {scheme}").into()),
83            }
84        }
85    }
86}
87
88// From bollard source, unfortunately they don't export this function.
89fn default_certs() -> Option<DockerTlsConfig> {
90    let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
91    let base = match from_env {
92        Ok(path) => PathBuf::from(path),
93        Err(_) => dirs_next::home_dir()?.join(".docker"),
94    };
95    Some(DockerTlsConfig {
96        ca_file: base.join("ca.pem"),
97        key_file: base.join("key.pem"),
98        crt_file: base.join("cert.pem"),
99    })
100}
101
102fn get_authority(url: &str) -> Result<String, Error> {
103    url.parse::<Uri>()
104        .ok()
105        .and_then(|uri| uri.authority().map(<_>::to_string))
106        .ok_or(Error::NoHost)
107}
108
109async fn pull_image(docker: &Docker, image: &str, tag: &str) {
110    let mut filters = HashMap::new();
111    filters.insert(
112        String::from("reference"),
113        vec![format!("{}:{}", image, tag)],
114    );
115
116    let options = Some(ListImagesOptionsBuilder::new().filters(&filters).build());
117
118    let images = docker.list_images(options).await.unwrap();
119    if images.is_empty() {
120        // If not found, pull it
121        let options = Some(
122            CreateImageOptionsBuilder::new()
123                .from_image(image)
124                .tag(tag)
125                .build(),
126        );
127
128        docker
129            .create_image(options, None, None)
130            .for_each(|item| async move {
131                let info = item.unwrap();
132                if let Some(error) = info.error {
133                    panic!("{error:?}");
134                }
135            })
136            .await
137    }
138}
139
140async fn remove_container(docker: &Docker, id: &str) {
141    trace!("Stopping container.");
142
143    _ = docker
144        .stop_container(id, None::<StopContainerOptions>)
145        .await
146        .map_err(|e| error!(%e));
147
148    trace!("Removing container.");
149
150    // Don't panic, as this is unrelated to the test
151    _ = docker
152        .remove_container(id, None::<RemoveContainerOptions>)
153        .await
154        .map_err(|e| error!(%e));
155}
156
157pub struct Container {
158    image: &'static str,
159    tag: &'static str,
160    binds: Option<Vec<String>>,
161    cmd: Option<Vec<String>>,
162}
163
164impl Container {
165    pub const fn new(image: &'static str, tag: &'static str) -> Self {
166        Self {
167            image,
168            tag,
169            binds: None,
170            cmd: None,
171        }
172    }
173
174    pub fn bind(mut self, src: impl std::fmt::Display, dst: &str) -> Self {
175        let bind = format!("{src}:{dst}");
176        self.binds.get_or_insert_with(Vec::new).push(bind);
177        self
178    }
179
180    pub fn cmd(mut self, option: &str) -> Self {
181        self.cmd.get_or_insert_with(Vec::new).push(option.into());
182        self
183    }
184
185    pub async fn run<T>(self, doit: impl futures::Future<Output = T>) -> T {
186        let docker = docker(None, None).unwrap();
187
188        pull_image(&docker, self.image, self.tag).await;
189
190        let options = CreateContainerOptionsBuilder::new()
191            .name(&format!("vector_test_{}", uuid::Uuid::new_v4()))
192            .build();
193
194        let config = ContainerCreateBody {
195            image: Some(format!("{}:{}", &self.image, &self.tag)),
196            cmd: self.cmd,
197            host_config: Some(HostConfig {
198                network_mode: Some(String::from("host")),
199                extra_hosts: Some(vec!["host.docker.internal:host-gateway".into()]),
200                binds: self.binds,
201                ..Default::default()
202            }),
203            ..Default::default()
204        };
205
206        let container = docker
207            .create_container(Some(options), config)
208            .await
209            .unwrap();
210
211        docker
212            .start_container(&container.id, None::<StartContainerOptions>)
213            .await
214            .unwrap();
215
216        let result = doit.await;
217
218        remove_container(&docker, &container.id).await;
219
220        result
221    }
222}