vector/
docker.rs

1#![allow(missing_docs)]
2use std::{collections::HashMap, env, path::PathBuf};
3
4use bollard::{
5    API_DEFAULT_VERSION, Docker,
6    errors::Error as DockerError,
7    models::HostConfig,
8    query_parameters::{
9        CreateContainerOptionsBuilder, CreateImageOptionsBuilder, ListImagesOptionsBuilder,
10        RemoveContainerOptions, StartContainerOptions, StopContainerOptions,
11    },
12    secret::ContainerCreateBody,
13};
14use futures::StreamExt;
15use http::uri::Uri;
16use snafu::Snafu;
17use vector_lib::configurable::configurable_component;
18
19// From bollard source.
20const DEFAULT_TIMEOUT: u64 = 120;
21
22#[derive(Debug, Snafu)]
23pub enum Error {
24    #[snafu(display("URL has no host."))]
25    NoHost,
26}
27
28/// Configuration of TLS when connecting to the Docker daemon.
29///
30/// Only relevant when connecting to Docker with an HTTPS URL.
31///
32/// If not configured, the environment variable `DOCKER_CERT_PATH` is used. If `DOCKER_CERT_PATH` is absent, then` DOCKER_CONFIG` is used. If both environment variables are absent, the certificates in `~/.docker/` are read.
33#[configurable_component]
34#[derive(Clone, Debug)]
35#[serde(deny_unknown_fields)]
36pub struct DockerTlsConfig {
37    /// Path to the CA certificate file.
38    ca_file: PathBuf,
39
40    /// Path to the TLS certificate file.
41    crt_file: PathBuf,
42
43    /// Path to the TLS key file.
44    key_file: PathBuf,
45}
46
47pub fn docker(host: Option<String>, tls: Option<DockerTlsConfig>) -> crate::Result<Docker> {
48    let host = host.or_else(|| env::var("DOCKER_HOST").ok());
49
50    match host {
51        None => Docker::connect_with_defaults().map_err(Into::into),
52        Some(host) => {
53            let scheme = host
54                .parse::<Uri>()
55                .ok()
56                .and_then(|uri| uri.into_parts().scheme);
57
58            match scheme.as_ref().map(|scheme| scheme.as_str()) {
59                Some("http") | Some("tcp") => {
60                    let host = get_authority(&host)?;
61                    Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
62                        .map_err(Into::into)
63                }
64                Some("https") => {
65                    let host = get_authority(&host)?;
66                    let tls = tls
67                        .or_else(default_certs)
68                        .ok_or(DockerError::NoHomePathError)?;
69                    Docker::connect_with_ssl(
70                        &host,
71                        &tls.key_file,
72                        &tls.crt_file,
73                        &tls.ca_file,
74                        DEFAULT_TIMEOUT,
75                        API_DEFAULT_VERSION,
76                    )
77                    .map_err(Into::into)
78                }
79                Some("unix") | Some("npipe") | None => {
80                    Docker::connect_with_socket(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
81                        .map_err(Into::into)
82                }
83                Some(scheme) => Err(format!("Unknown scheme: {scheme}").into()),
84            }
85        }
86    }
87}
88
89// From bollard source, unfortunately they don't export this function.
90fn default_certs() -> Option<DockerTlsConfig> {
91    let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
92    let base = match from_env {
93        Ok(path) => PathBuf::from(path),
94        Err(_) => dirs_next::home_dir()?.join(".docker"),
95    };
96    Some(DockerTlsConfig {
97        ca_file: base.join("ca.pem"),
98        key_file: base.join("key.pem"),
99        crt_file: base.join("cert.pem"),
100    })
101}
102
103fn get_authority(url: &str) -> Result<String, Error> {
104    url.parse::<Uri>()
105        .ok()
106        .and_then(|uri| uri.authority().map(<_>::to_string))
107        .ok_or(Error::NoHost)
108}
109
110async fn pull_image(docker: &Docker, image: &str, tag: &str) {
111    let mut filters = HashMap::new();
112    filters.insert(
113        String::from("reference"),
114        vec![format!("{}:{}", image, tag)],
115    );
116
117    let options = Some(ListImagesOptionsBuilder::new().filters(&filters).build());
118
119    let images = docker.list_images(options).await.unwrap();
120    if images.is_empty() {
121        // If not found, pull it
122        let options = Some(
123            CreateImageOptionsBuilder::new()
124                .from_image(image)
125                .tag(tag)
126                .build(),
127        );
128
129        docker
130            .create_image(options, None, None)
131            .for_each(|item| async move {
132                let info = item.unwrap();
133                if let Some(error) = info.error {
134                    panic!("{error:?}");
135                }
136            })
137            .await
138    }
139}
140
141async fn remove_container(docker: &Docker, id: &str) {
142    trace!("Stopping container.");
143
144    _ = docker
145        .stop_container(id, None::<StopContainerOptions>)
146        .await
147        .map_err(|e| error!(%e));
148
149    trace!("Removing container.");
150
151    // Don't panic, as this is unrelated to the test
152    _ = docker
153        .remove_container(id, None::<RemoveContainerOptions>)
154        .await
155        .map_err(|e| error!(%e));
156}
157
158pub struct Container {
159    image: &'static str,
160    tag: &'static str,
161    binds: Option<Vec<String>>,
162    cmd: Option<Vec<String>>,
163}
164
165impl Container {
166    pub const fn new(image: &'static str, tag: &'static str) -> Self {
167        Self {
168            image,
169            tag,
170            binds: None,
171            cmd: None,
172        }
173    }
174
175    pub fn bind(mut self, src: impl std::fmt::Display, dst: &str) -> Self {
176        let bind = format!("{src}:{dst}");
177        self.binds.get_or_insert_with(Vec::new).push(bind);
178        self
179    }
180
181    pub fn cmd(mut self, option: &str) -> Self {
182        self.cmd.get_or_insert_with(Vec::new).push(option.into());
183        self
184    }
185
186    pub async fn run<T>(self, doit: impl futures::Future<Output = T>) -> T {
187        let docker = docker(None, None).unwrap();
188
189        pull_image(&docker, self.image, self.tag).await;
190
191        let options = CreateContainerOptionsBuilder::new()
192            .name(&format!("vector_test_{}", uuid::Uuid::new_v4()))
193            .build();
194
195        let config = ContainerCreateBody {
196            image: Some(format!("{}:{}", &self.image, &self.tag)),
197            cmd: self.cmd,
198            host_config: Some(HostConfig {
199                network_mode: Some(String::from("host")),
200                extra_hosts: Some(vec!["host.docker.internal:host-gateway".into()]),
201                binds: self.binds,
202                ..Default::default()
203            }),
204            ..Default::default()
205        };
206
207        let container = docker
208            .create_container(Some(options), config)
209            .await
210            .unwrap();
211
212        docker
213            .start_container(&container.id, None::<StartContainerOptions>)
214            .await
215            .unwrap();
216
217        let result = doit.await;
218
219        remove_container(&docker, &container.id).await;
220
221        result
222    }
223}