vector/providers/
http.rs

1use async_stream::stream;
2use bytes::Buf;
3use futures::Stream;
4use hyper::Body;
5use indexmap::IndexMap;
6use tokio::time;
7use url::Url;
8use vector_lib::configurable::configurable_component;
9
10use crate::{
11    config::{self, provider::ProviderConfig, Format, ProxyConfig},
12    http::HttpClient,
13    signal,
14    tls::{TlsConfig, TlsSettings},
15};
16
17use super::BuildResult;
18
19/// Request settings.
20#[configurable_component]
21#[derive(Clone, Debug)]
22pub struct RequestConfig {
23    /// HTTP headers to add to the request.
24    #[serde(default)]
25    pub headers: IndexMap<String, String>,
26}
27
28impl Default for RequestConfig {
29    fn default() -> Self {
30        Self {
31            headers: IndexMap::new(),
32        }
33    }
34}
35
36/// Configuration for the `http` provider.
37#[configurable_component(provider("http"))]
38#[derive(Clone, Debug)]
39#[serde(deny_unknown_fields, default)]
40pub struct HttpConfig {
41    /// URL for the HTTP provider.
42    url: Option<Url>,
43
44    #[configurable(derived)]
45    request: RequestConfig,
46
47    /// How often to poll the provider, in seconds.
48    poll_interval_secs: u64,
49
50    #[serde(flatten)]
51    tls_options: Option<TlsConfig>,
52
53    #[configurable(derived)]
54    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
55    proxy: ProxyConfig,
56
57    /// Which config format expected to be loaded
58    #[configurable(derived)]
59    config_format: Format,
60}
61
62impl Default for HttpConfig {
63    fn default() -> Self {
64        Self {
65            url: None,
66            request: RequestConfig::default(),
67            poll_interval_secs: 30,
68            tls_options: None,
69            proxy: Default::default(),
70            config_format: Format::default(),
71        }
72    }
73}
74
75/// Makes an HTTP request to the provided endpoint, returning the String body.
76async fn http_request(
77    url: &Url,
78    tls_options: Option<&TlsConfig>,
79    headers: &IndexMap<String, String>,
80    proxy: &ProxyConfig,
81) -> Result<bytes::Bytes, &'static str> {
82    let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
83    let http_client =
84        HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
85
86    // Build HTTP request.
87    let mut builder = http::request::Builder::new().uri(url.to_string());
88
89    // Augment with headers. These may be required e.g. for authentication to
90    // private endpoints.
91    for (header, value) in headers.iter() {
92        builder = builder.header(header.as_str(), value.as_str());
93    }
94
95    let request = builder
96        .body(Body::empty())
97        .map_err(|_| "Couldn't create HTTP request")?;
98
99    info!(
100        message = "Attempting to retrieve configuration.",
101        url = ?url.as_str()
102    );
103
104    let response = http_client.send(request).await.map_err(|err| {
105        let message = "HTTP error";
106        error!(
107            message = ?message,
108            error = ?err,
109            url = ?url.as_str());
110        message
111    })?;
112
113    info!(message = "Response received.", url = ?url.as_str());
114
115    hyper::body::to_bytes(response.into_body())
116        .await
117        .map_err(|err| {
118            let message = "Error interpreting response.";
119            let cause = err.into_cause();
120            error!(
121                    message = ?message,
122                    error = ?cause);
123
124            message
125        })
126}
127
128/// Calls `http_request`, serializing the result to a `ConfigBuilder`.
129async fn http_request_to_config_builder(
130    url: &Url,
131    tls_options: Option<&TlsConfig>,
132    headers: &IndexMap<String, String>,
133    proxy: &ProxyConfig,
134    config_format: &Format,
135) -> BuildResult {
136    let config_str = http_request(url, tls_options, headers, proxy)
137        .await
138        .map_err(|e| vec![e.to_owned()])?;
139
140    config::load(config_str.chunk(), *config_format)
141}
142
143/// Polls the HTTP endpoint after/every `poll_interval_secs`, returning a stream of `ConfigBuilder`.
144fn poll_http(
145    poll_interval_secs: u64,
146    url: Url,
147    tls_options: Option<TlsConfig>,
148    headers: IndexMap<String, String>,
149    proxy: ProxyConfig,
150    config_format: Format,
151) -> impl Stream<Item = signal::SignalTo> {
152    let duration = time::Duration::from_secs(poll_interval_secs);
153    let mut interval = time::interval_at(time::Instant::now() + duration, duration);
154
155    stream! {
156        loop {
157            interval.tick().await;
158
159            match http_request_to_config_builder(&url, tls_options.as_ref(), &headers, &proxy, &config_format).await {
160                Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
161                Err(_) => {},
162            };
163
164            info!(
165                message = "HTTP provider is waiting.",
166                poll_interval_secs = ?poll_interval_secs,
167                url = ?url.as_str());
168        }
169    }
170}
171
172impl ProviderConfig for HttpConfig {
173    async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
174        let url = self
175            .url
176            .take()
177            .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
178
179        let tls_options = self.tls_options.take();
180        let poll_interval_secs = self.poll_interval_secs;
181        let request = self.request.clone();
182        let config_format = self.config_format;
183
184        let proxy = ProxyConfig::from_env().merge(&self.proxy);
185        let config_builder = http_request_to_config_builder(
186            &url,
187            tls_options.as_ref(),
188            &request.headers,
189            &proxy,
190            &config_format,
191        )
192        .await?;
193
194        // Poll for changes to remote configuration.
195        signal_handler.add(poll_http(
196            poll_interval_secs,
197            url,
198            tls_options,
199            request.headers.clone(),
200            proxy.clone(),
201            config_format,
202        ));
203
204        Ok(config_builder)
205    }
206}
207
208impl_generate_config_from_default!(HttpConfig);