vector/providers/
http.rs

1use async_stream::stream;
2use bytes::Buf;
3use futures::Stream;
4use http_body::{Body as _, Collected};
5use hyper::Body;
6use indexmap::IndexMap;
7use tokio::time;
8use url::Url;
9use vector_lib::configurable::configurable_component;
10
11use super::BuildResult;
12use crate::{
13    config::{self, Format, ProxyConfig, interpolate, provider::ProviderConfig},
14    http::HttpClient,
15    signal,
16    tls::{TlsConfig, TlsSettings},
17};
18
19/// Request settings.
20#[configurable_component]
21#[derive(Clone, Debug)]
22pub struct RequestConfig {
23    /// HTTP headers to add to the request.
24    #[configurable(metadata(docs::additional_props_description = "An HTTP header."))]
25    #[serde(default)]
26    pub headers: IndexMap<String, String>,
27}
28
29impl Default for RequestConfig {
30    fn default() -> Self {
31        Self {
32            headers: IndexMap::new(),
33        }
34    }
35}
36
37/// Configuration for the `http` provider.
38#[configurable_component(provider("http"))]
39#[derive(Clone, Debug)]
40#[serde(deny_unknown_fields, default)]
41pub struct HttpConfig {
42    /// URL for the HTTP provider.
43    url: Option<Url>,
44
45    #[configurable(derived)]
46    request: RequestConfig,
47
48    /// How often to poll the provider, in seconds.
49    poll_interval_secs: u64,
50
51    #[serde(flatten)]
52    tls_options: Option<TlsConfig>,
53
54    #[configurable(derived)]
55    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
56    proxy: ProxyConfig,
57
58    /// Which config format expected to be loaded
59    #[configurable(derived)]
60    config_format: Format,
61
62    /// Enable environment variable interpolation
63    interpolate_env: bool,
64}
65
66impl Default for HttpConfig {
67    fn default() -> Self {
68        Self {
69            url: None,
70            request: RequestConfig::default(),
71            poll_interval_secs: 30,
72            tls_options: None,
73            proxy: Default::default(),
74            config_format: Format::default(),
75            interpolate_env: false,
76        }
77    }
78}
79
80/// Makes an HTTP request to the provided endpoint, returning the String body.
81async fn http_request(
82    url: &Url,
83    tls_options: Option<&TlsConfig>,
84    headers: &IndexMap<String, String>,
85    proxy: &ProxyConfig,
86) -> Result<bytes::Bytes, &'static str> {
87    let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
88    let http_client =
89        HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
90
91    // Build HTTP request.
92    let mut builder = http::request::Builder::new().uri(url.to_string());
93
94    // Augment with headers. These may be required e.g. for authentication to
95    // private endpoints.
96    for (header, value) in headers.iter() {
97        builder = builder.header(header.as_str(), value.as_str());
98    }
99
100    let request = builder
101        .body(Body::empty())
102        .map_err(|_| "Couldn't create HTTP request")?;
103
104    info!(
105        message = "Attempting to retrieve configuration.",
106        url = ?url.as_str()
107    );
108
109    let response = http_client.send(request).await.map_err(|err| {
110        let message = "HTTP error";
111        error!(
112            message = ?message,
113            error = ?err,
114            url = ?url.as_str());
115        message
116    })?;
117
118    info!(message = "Response received.", url = ?url.as_str());
119
120    response
121        .into_body()
122        .collect()
123        .await
124        .map(Collected::to_bytes)
125        .map_err(|err| {
126            let message = "Error interpreting response.";
127            let cause = err.into_cause();
128            error!(
129                    message = ?message,
130                    error = ?cause);
131
132            message
133        })
134}
135
136/// Calls `http_request`, serializing the result to a `ConfigBuilder`.
137async fn http_request_to_config_builder(
138    url: &Url,
139    tls_options: Option<&TlsConfig>,
140    headers: &IndexMap<String, String>,
141    proxy: &ProxyConfig,
142    config_format: &Format,
143    interpolate_env: bool,
144) -> BuildResult {
145    let config_str = http_request(url, tls_options, headers, proxy)
146        .await
147        .map_err(|e| vec![e.to_owned()])?;
148
149    if !interpolate_env {
150        return config::load(config_str.chunk(), *config_format);
151    }
152
153    let env_vars = std::env::vars_os()
154        .map(|(k, v)| {
155            (
156                k.as_os_str().to_string_lossy().to_string(),
157                v.as_os_str().to_string_lossy().to_string(),
158            )
159        })
160        .collect::<std::collections::HashMap<String, String>>();
161
162    let config_str = interpolate(
163        std::str::from_utf8(&config_str).map_err(|e| vec![e.to_string()])?,
164        &env_vars,
165    )?;
166
167    config::load(config_str.as_bytes().chunk(), *config_format)
168}
169
170/// Polls the HTTP endpoint after/every `poll_interval_secs`, returning a stream of `ConfigBuilder`.
171fn poll_http(
172    poll_interval_secs: u64,
173    url: Url,
174    tls_options: Option<TlsConfig>,
175    headers: IndexMap<String, String>,
176    proxy: ProxyConfig,
177    config_format: Format,
178    interpolate_env: bool,
179) -> impl Stream<Item = signal::SignalTo> {
180    let duration = time::Duration::from_secs(poll_interval_secs);
181    let mut interval = time::interval_at(time::Instant::now() + duration, duration);
182
183    stream! {
184        loop {
185            interval.tick().await;
186
187            match http_request_to_config_builder(&url, tls_options.as_ref(), &headers, &proxy, &config_format, interpolate_env).await {
188                Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
189                Err(_) => {},
190            };
191
192            info!(
193                message = "HTTP provider is waiting.",
194                poll_interval_secs = ?poll_interval_secs,
195                url = ?url.as_str());
196        }
197    }
198}
199
200impl ProviderConfig for HttpConfig {
201    async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
202        let url = self
203            .url
204            .take()
205            .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
206
207        let tls_options = self.tls_options.take();
208        let poll_interval_secs = self.poll_interval_secs;
209        let request = self.request.clone();
210        let config_format = self.config_format;
211
212        let proxy = ProxyConfig::from_env().merge(&self.proxy);
213        let config_builder = http_request_to_config_builder(
214            &url,
215            tls_options.as_ref(),
216            &request.headers,
217            &proxy,
218            &config_format,
219            self.interpolate_env,
220        )
221        .await?;
222
223        // Poll for changes to remote configuration.
224        signal_handler.add(poll_http(
225            poll_interval_secs,
226            url,
227            tls_options,
228            request.headers.clone(),
229            proxy.clone(),
230            config_format,
231            self.interpolate_env,
232        ));
233
234        Ok(config_builder)
235    }
236}
237
238impl_generate_config_from_default!(HttpConfig);