vector/providers/
http.rs

1use async_stream::stream;
2use bytes::Buf;
3use futures::Stream;
4use http_body::{Body as _, Collected};
5use hyper::Body;
6use indexmap::IndexMap;
7use tokio::time;
8use url::Url;
9use vector_lib::configurable::configurable_component;
10
11use super::BuildResult;
12use crate::{
13    config::{self, Format, ProxyConfig, provider::ProviderConfig},
14    http::HttpClient,
15    signal,
16    tls::{TlsConfig, TlsSettings},
17};
18
19/// Request settings.
20#[configurable_component]
21#[derive(Clone, Debug)]
22pub struct RequestConfig {
23    /// HTTP headers to add to the request.
24    #[serde(default)]
25    pub headers: IndexMap<String, String>,
26}
27
28impl Default for RequestConfig {
29    fn default() -> Self {
30        Self {
31            headers: IndexMap::new(),
32        }
33    }
34}
35
36/// Configuration for the `http` provider.
37#[configurable_component(provider("http"))]
38#[derive(Clone, Debug)]
39#[serde(deny_unknown_fields, default)]
40pub struct HttpConfig {
41    /// URL for the HTTP provider.
42    url: Option<Url>,
43
44    #[configurable(derived)]
45    request: RequestConfig,
46
47    /// How often to poll the provider, in seconds.
48    poll_interval_secs: u64,
49
50    #[serde(flatten)]
51    tls_options: Option<TlsConfig>,
52
53    #[configurable(derived)]
54    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
55    proxy: ProxyConfig,
56
57    /// Which config format expected to be loaded
58    #[configurable(derived)]
59    config_format: Format,
60}
61
62impl Default for HttpConfig {
63    fn default() -> Self {
64        Self {
65            url: None,
66            request: RequestConfig::default(),
67            poll_interval_secs: 30,
68            tls_options: None,
69            proxy: Default::default(),
70            config_format: Format::default(),
71        }
72    }
73}
74
75/// Makes an HTTP request to the provided endpoint, returning the String body.
76async fn http_request(
77    url: &Url,
78    tls_options: Option<&TlsConfig>,
79    headers: &IndexMap<String, String>,
80    proxy: &ProxyConfig,
81) -> Result<bytes::Bytes, &'static str> {
82    let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
83    let http_client =
84        HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
85
86    // Build HTTP request.
87    let mut builder = http::request::Builder::new().uri(url.to_string());
88
89    // Augment with headers. These may be required e.g. for authentication to
90    // private endpoints.
91    for (header, value) in headers.iter() {
92        builder = builder.header(header.as_str(), value.as_str());
93    }
94
95    let request = builder
96        .body(Body::empty())
97        .map_err(|_| "Couldn't create HTTP request")?;
98
99    info!(
100        message = "Attempting to retrieve configuration.",
101        url = ?url.as_str()
102    );
103
104    let response = http_client.send(request).await.map_err(|err| {
105        let message = "HTTP error";
106        error!(
107            message = ?message,
108            error = ?err,
109            url = ?url.as_str());
110        message
111    })?;
112
113    info!(message = "Response received.", url = ?url.as_str());
114
115    response
116        .into_body()
117        .collect()
118        .await
119        .map(Collected::to_bytes)
120        .map_err(|err| {
121            let message = "Error interpreting response.";
122            let cause = err.into_cause();
123            error!(
124                    message = ?message,
125                    error = ?cause);
126
127            message
128        })
129}
130
131/// Calls `http_request`, serializing the result to a `ConfigBuilder`.
132async fn http_request_to_config_builder(
133    url: &Url,
134    tls_options: Option<&TlsConfig>,
135    headers: &IndexMap<String, String>,
136    proxy: &ProxyConfig,
137    config_format: &Format,
138) -> BuildResult {
139    let config_str = http_request(url, tls_options, headers, proxy)
140        .await
141        .map_err(|e| vec![e.to_owned()])?;
142
143    config::load(config_str.chunk(), *config_format)
144}
145
146/// Polls the HTTP endpoint after/every `poll_interval_secs`, returning a stream of `ConfigBuilder`.
147fn poll_http(
148    poll_interval_secs: u64,
149    url: Url,
150    tls_options: Option<TlsConfig>,
151    headers: IndexMap<String, String>,
152    proxy: ProxyConfig,
153    config_format: Format,
154) -> impl Stream<Item = signal::SignalTo> {
155    let duration = time::Duration::from_secs(poll_interval_secs);
156    let mut interval = time::interval_at(time::Instant::now() + duration, duration);
157
158    stream! {
159        loop {
160            interval.tick().await;
161
162            match http_request_to_config_builder(&url, tls_options.as_ref(), &headers, &proxy, &config_format).await {
163                Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
164                Err(_) => {},
165            };
166
167            info!(
168                message = "HTTP provider is waiting.",
169                poll_interval_secs = ?poll_interval_secs,
170                url = ?url.as_str());
171        }
172    }
173}
174
175impl ProviderConfig for HttpConfig {
176    async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
177        let url = self
178            .url
179            .take()
180            .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
181
182        let tls_options = self.tls_options.take();
183        let poll_interval_secs = self.poll_interval_secs;
184        let request = self.request.clone();
185        let config_format = self.config_format;
186
187        let proxy = ProxyConfig::from_env().merge(&self.proxy);
188        let config_builder = http_request_to_config_builder(
189            &url,
190            tls_options.as_ref(),
191            &request.headers,
192            &proxy,
193            &config_format,
194        )
195        .await?;
196
197        // Poll for changes to remote configuration.
198        signal_handler.add(poll_http(
199            poll_interval_secs,
200            url,
201            tls_options,
202            request.headers.clone(),
203            proxy.clone(),
204            config_format,
205        ));
206
207        Ok(config_builder)
208    }
209}
210
211impl_generate_config_from_default!(HttpConfig);