1use async_stream::stream;
2use bytes::Buf;
3use futures::Stream;
4use http_body::{Body as _, Collected};
5use hyper::Body;
6use indexmap::IndexMap;
7use tokio::time;
8use url::Url;
9use vector_lib::configurable::configurable_component;
10
11use super::BuildResult;
12use crate::{
13 config::{self, Format, ProxyConfig, interpolate, provider::ProviderConfig},
14 http::HttpClient,
15 signal,
16 tls::{TlsConfig, TlsSettings},
17};
18
19#[configurable_component]
21#[derive(Clone, Debug)]
22pub struct RequestConfig {
23 #[configurable(metadata(docs::additional_props_description = "An HTTP header."))]
25 #[serde(default)]
26 pub headers: IndexMap<String, String>,
27}
28
29impl Default for RequestConfig {
30 fn default() -> Self {
31 Self {
32 headers: IndexMap::new(),
33 }
34 }
35}
36
37#[configurable_component(provider("http"))]
39#[derive(Clone, Debug)]
40#[serde(deny_unknown_fields, default)]
41pub struct HttpConfig {
42 url: Option<Url>,
44
45 #[configurable(derived)]
46 request: RequestConfig,
47
48 poll_interval_secs: u64,
50
51 #[serde(flatten)]
52 tls_options: Option<TlsConfig>,
53
54 #[configurable(derived)]
55 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
56 proxy: ProxyConfig,
57
58 #[configurable(derived)]
60 config_format: Format,
61
62 interpolate_env: bool,
64}
65
66impl Default for HttpConfig {
67 fn default() -> Self {
68 Self {
69 url: None,
70 request: RequestConfig::default(),
71 poll_interval_secs: 30,
72 tls_options: None,
73 proxy: Default::default(),
74 config_format: Format::default(),
75 interpolate_env: false,
76 }
77 }
78}
79
80async fn http_request(
82 url: &Url,
83 tls_options: Option<&TlsConfig>,
84 headers: &IndexMap<String, String>,
85 proxy: &ProxyConfig,
86) -> Result<bytes::Bytes, &'static str> {
87 let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
88 let http_client =
89 HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
90
91 let mut builder = http::request::Builder::new().uri(url.to_string());
93
94 for (header, value) in headers.iter() {
97 builder = builder.header(header.as_str(), value.as_str());
98 }
99
100 let request = builder
101 .body(Body::empty())
102 .map_err(|_| "Couldn't create HTTP request")?;
103
104 info!(
105 message = "Attempting to retrieve configuration.",
106 url = ?url.as_str()
107 );
108
109 let response = http_client.send(request).await.map_err(|err| {
110 let message = "HTTP error";
111 error!(
112 message = ?message,
113 error = ?err,
114 url = ?url.as_str());
115 message
116 })?;
117
118 info!(message = "Response received.", url = ?url.as_str());
119
120 response
121 .into_body()
122 .collect()
123 .await
124 .map(Collected::to_bytes)
125 .map_err(|err| {
126 let message = "Error interpreting response.";
127 let cause = err.into_cause();
128 error!(
129 message = ?message,
130 error = ?cause);
131
132 message
133 })
134}
135
136async fn http_request_to_config_builder(
138 url: &Url,
139 tls_options: Option<&TlsConfig>,
140 headers: &IndexMap<String, String>,
141 proxy: &ProxyConfig,
142 config_format: &Format,
143 interpolate_env: bool,
144) -> BuildResult {
145 let config_str = http_request(url, tls_options, headers, proxy)
146 .await
147 .map_err(|e| vec![e.to_owned()])?;
148
149 if !interpolate_env {
150 return config::load(config_str.chunk(), *config_format);
151 }
152
153 let env_vars = std::env::vars_os()
154 .map(|(k, v)| {
155 (
156 k.as_os_str().to_string_lossy().to_string(),
157 v.as_os_str().to_string_lossy().to_string(),
158 )
159 })
160 .collect::<std::collections::HashMap<String, String>>();
161
162 let config_str = interpolate(
163 std::str::from_utf8(&config_str).map_err(|e| vec![e.to_string()])?,
164 &env_vars,
165 )?;
166
167 config::load(config_str.as_bytes().chunk(), *config_format)
168}
169
170fn poll_http(
172 poll_interval_secs: u64,
173 url: Url,
174 tls_options: Option<TlsConfig>,
175 headers: IndexMap<String, String>,
176 proxy: ProxyConfig,
177 config_format: Format,
178 interpolate_env: bool,
179) -> impl Stream<Item = signal::SignalTo> {
180 let duration = time::Duration::from_secs(poll_interval_secs);
181 let mut interval = time::interval_at(time::Instant::now() + duration, duration);
182
183 stream! {
184 loop {
185 interval.tick().await;
186
187 match http_request_to_config_builder(&url, tls_options.as_ref(), &headers, &proxy, &config_format, interpolate_env).await {
188 Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
189 Err(_) => {},
190 };
191
192 info!(
193 message = "HTTP provider is waiting.",
194 poll_interval_secs = ?poll_interval_secs,
195 url = ?url.as_str());
196 }
197 }
198}
199
200impl ProviderConfig for HttpConfig {
201 async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
202 let url = self
203 .url
204 .take()
205 .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
206
207 let tls_options = self.tls_options.take();
208 let poll_interval_secs = self.poll_interval_secs;
209 let request = self.request.clone();
210 let config_format = self.config_format;
211
212 let proxy = ProxyConfig::from_env().merge(&self.proxy);
213 let config_builder = http_request_to_config_builder(
214 &url,
215 tls_options.as_ref(),
216 &request.headers,
217 &proxy,
218 &config_format,
219 self.interpolate_env,
220 )
221 .await?;
222
223 signal_handler.add(poll_http(
225 poll_interval_secs,
226 url,
227 tls_options,
228 request.headers.clone(),
229 proxy.clone(),
230 config_format,
231 self.interpolate_env,
232 ));
233
234 Ok(config_builder)
235 }
236}
237
238impl_generate_config_from_default!(HttpConfig);