vector/providers/
http.rs

1use async_stream::stream;
2use bytes::Buf;
3use futures::Stream;
4use hyper::Body;
5use indexmap::IndexMap;
6use tokio::time;
7use url::Url;
8use vector_lib::configurable::configurable_component;
9
10use super::BuildResult;
11use crate::{
12    config::{self, Format, ProxyConfig, provider::ProviderConfig},
13    http::HttpClient,
14    signal,
15    tls::{TlsConfig, TlsSettings},
16};
17
18/// Request settings.
19#[configurable_component]
20#[derive(Clone, Debug)]
21pub struct RequestConfig {
22    /// HTTP headers to add to the request.
23    #[serde(default)]
24    pub headers: IndexMap<String, String>,
25}
26
27impl Default for RequestConfig {
28    fn default() -> Self {
29        Self {
30            headers: IndexMap::new(),
31        }
32    }
33}
34
35/// Configuration for the `http` provider.
36#[configurable_component(provider("http"))]
37#[derive(Clone, Debug)]
38#[serde(deny_unknown_fields, default)]
39pub struct HttpConfig {
40    /// URL for the HTTP provider.
41    url: Option<Url>,
42
43    #[configurable(derived)]
44    request: RequestConfig,
45
46    /// How often to poll the provider, in seconds.
47    poll_interval_secs: u64,
48
49    #[serde(flatten)]
50    tls_options: Option<TlsConfig>,
51
52    #[configurable(derived)]
53    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
54    proxy: ProxyConfig,
55
56    /// Which config format expected to be loaded
57    #[configurable(derived)]
58    config_format: Format,
59}
60
61impl Default for HttpConfig {
62    fn default() -> Self {
63        Self {
64            url: None,
65            request: RequestConfig::default(),
66            poll_interval_secs: 30,
67            tls_options: None,
68            proxy: Default::default(),
69            config_format: Format::default(),
70        }
71    }
72}
73
74/// Makes an HTTP request to the provided endpoint, returning the String body.
75async fn http_request(
76    url: &Url,
77    tls_options: Option<&TlsConfig>,
78    headers: &IndexMap<String, String>,
79    proxy: &ProxyConfig,
80) -> Result<bytes::Bytes, &'static str> {
81    let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
82    let http_client =
83        HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
84
85    // Build HTTP request.
86    let mut builder = http::request::Builder::new().uri(url.to_string());
87
88    // Augment with headers. These may be required e.g. for authentication to
89    // private endpoints.
90    for (header, value) in headers.iter() {
91        builder = builder.header(header.as_str(), value.as_str());
92    }
93
94    let request = builder
95        .body(Body::empty())
96        .map_err(|_| "Couldn't create HTTP request")?;
97
98    info!(
99        message = "Attempting to retrieve configuration.",
100        url = ?url.as_str()
101    );
102
103    let response = http_client.send(request).await.map_err(|err| {
104        let message = "HTTP error";
105        error!(
106            message = ?message,
107            error = ?err,
108            url = ?url.as_str());
109        message
110    })?;
111
112    info!(message = "Response received.", url = ?url.as_str());
113
114    hyper::body::to_bytes(response.into_body())
115        .await
116        .map_err(|err| {
117            let message = "Error interpreting response.";
118            let cause = err.into_cause();
119            error!(
120                    message = ?message,
121                    error = ?cause);
122
123            message
124        })
125}
126
127/// Calls `http_request`, serializing the result to a `ConfigBuilder`.
128async fn http_request_to_config_builder(
129    url: &Url,
130    tls_options: Option<&TlsConfig>,
131    headers: &IndexMap<String, String>,
132    proxy: &ProxyConfig,
133    config_format: &Format,
134) -> BuildResult {
135    let config_str = http_request(url, tls_options, headers, proxy)
136        .await
137        .map_err(|e| vec![e.to_owned()])?;
138
139    config::load(config_str.chunk(), *config_format)
140}
141
142/// Polls the HTTP endpoint after/every `poll_interval_secs`, returning a stream of `ConfigBuilder`.
143fn poll_http(
144    poll_interval_secs: u64,
145    url: Url,
146    tls_options: Option<TlsConfig>,
147    headers: IndexMap<String, String>,
148    proxy: ProxyConfig,
149    config_format: Format,
150) -> impl Stream<Item = signal::SignalTo> {
151    let duration = time::Duration::from_secs(poll_interval_secs);
152    let mut interval = time::interval_at(time::Instant::now() + duration, duration);
153
154    stream! {
155        loop {
156            interval.tick().await;
157
158            match http_request_to_config_builder(&url, tls_options.as_ref(), &headers, &proxy, &config_format).await {
159                Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
160                Err(_) => {},
161            };
162
163            info!(
164                message = "HTTP provider is waiting.",
165                poll_interval_secs = ?poll_interval_secs,
166                url = ?url.as_str());
167        }
168    }
169}
170
171impl ProviderConfig for HttpConfig {
172    async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
173        let url = self
174            .url
175            .take()
176            .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
177
178        let tls_options = self.tls_options.take();
179        let poll_interval_secs = self.poll_interval_secs;
180        let request = self.request.clone();
181        let config_format = self.config_format;
182
183        let proxy = ProxyConfig::from_env().merge(&self.proxy);
184        let config_builder = http_request_to_config_builder(
185            &url,
186            tls_options.as_ref(),
187            &request.headers,
188            &proxy,
189            &config_format,
190        )
191        .await?;
192
193        // Poll for changes to remote configuration.
194        signal_handler.add(poll_http(
195            poll_interval_secs,
196            url,
197            tls_options,
198            request.headers.clone(),
199            proxy.clone(),
200            config_format,
201        ));
202
203        Ok(config_builder)
204    }
205}
206
207impl_generate_config_from_default!(HttpConfig);