1use async_stream::stream;
2use bytes::Buf;
3use futures::Stream;
4use http_body::{Body as _, Collected};
5use hyper::Body;
6use indexmap::IndexMap;
7use tokio::time;
8use url::Url;
9use vector_lib::configurable::configurable_component;
10
11use super::BuildResult;
12use crate::{
13 config::{self, Format, ProxyConfig, provider::ProviderConfig},
14 http::HttpClient,
15 signal,
16 tls::{TlsConfig, TlsSettings},
17};
18
19#[configurable_component]
21#[derive(Clone, Debug)]
22pub struct RequestConfig {
23 #[serde(default)]
25 pub headers: IndexMap<String, String>,
26}
27
28impl Default for RequestConfig {
29 fn default() -> Self {
30 Self {
31 headers: IndexMap::new(),
32 }
33 }
34}
35
36#[configurable_component(provider("http"))]
38#[derive(Clone, Debug)]
39#[serde(deny_unknown_fields, default)]
40pub struct HttpConfig {
41 url: Option<Url>,
43
44 #[configurable(derived)]
45 request: RequestConfig,
46
47 poll_interval_secs: u64,
49
50 #[serde(flatten)]
51 tls_options: Option<TlsConfig>,
52
53 #[configurable(derived)]
54 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
55 proxy: ProxyConfig,
56
57 #[configurable(derived)]
59 config_format: Format,
60}
61
62impl Default for HttpConfig {
63 fn default() -> Self {
64 Self {
65 url: None,
66 request: RequestConfig::default(),
67 poll_interval_secs: 30,
68 tls_options: None,
69 proxy: Default::default(),
70 config_format: Format::default(),
71 }
72 }
73}
74
75async fn http_request(
77 url: &Url,
78 tls_options: Option<&TlsConfig>,
79 headers: &IndexMap<String, String>,
80 proxy: &ProxyConfig,
81) -> Result<bytes::Bytes, &'static str> {
82 let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
83 let http_client =
84 HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
85
86 let mut builder = http::request::Builder::new().uri(url.to_string());
88
89 for (header, value) in headers.iter() {
92 builder = builder.header(header.as_str(), value.as_str());
93 }
94
95 let request = builder
96 .body(Body::empty())
97 .map_err(|_| "Couldn't create HTTP request")?;
98
99 info!(
100 message = "Attempting to retrieve configuration.",
101 url = ?url.as_str()
102 );
103
104 let response = http_client.send(request).await.map_err(|err| {
105 let message = "HTTP error";
106 error!(
107 message = ?message,
108 error = ?err,
109 url = ?url.as_str());
110 message
111 })?;
112
113 info!(message = "Response received.", url = ?url.as_str());
114
115 response
116 .into_body()
117 .collect()
118 .await
119 .map(Collected::to_bytes)
120 .map_err(|err| {
121 let message = "Error interpreting response.";
122 let cause = err.into_cause();
123 error!(
124 message = ?message,
125 error = ?cause);
126
127 message
128 })
129}
130
131async fn http_request_to_config_builder(
133 url: &Url,
134 tls_options: Option<&TlsConfig>,
135 headers: &IndexMap<String, String>,
136 proxy: &ProxyConfig,
137 config_format: &Format,
138) -> BuildResult {
139 let config_str = http_request(url, tls_options, headers, proxy)
140 .await
141 .map_err(|e| vec![e.to_owned()])?;
142
143 config::load(config_str.chunk(), *config_format)
144}
145
146fn poll_http(
148 poll_interval_secs: u64,
149 url: Url,
150 tls_options: Option<TlsConfig>,
151 headers: IndexMap<String, String>,
152 proxy: ProxyConfig,
153 config_format: Format,
154) -> impl Stream<Item = signal::SignalTo> {
155 let duration = time::Duration::from_secs(poll_interval_secs);
156 let mut interval = time::interval_at(time::Instant::now() + duration, duration);
157
158 stream! {
159 loop {
160 interval.tick().await;
161
162 match http_request_to_config_builder(&url, tls_options.as_ref(), &headers, &proxy, &config_format).await {
163 Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
164 Err(_) => {},
165 };
166
167 info!(
168 message = "HTTP provider is waiting.",
169 poll_interval_secs = ?poll_interval_secs,
170 url = ?url.as_str());
171 }
172 }
173}
174
175impl ProviderConfig for HttpConfig {
176 async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
177 let url = self
178 .url
179 .take()
180 .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
181
182 let tls_options = self.tls_options.take();
183 let poll_interval_secs = self.poll_interval_secs;
184 let request = self.request.clone();
185 let config_format = self.config_format;
186
187 let proxy = ProxyConfig::from_env().merge(&self.proxy);
188 let config_builder = http_request_to_config_builder(
189 &url,
190 tls_options.as_ref(),
191 &request.headers,
192 &proxy,
193 &config_format,
194 )
195 .await?;
196
197 signal_handler.add(poll_http(
199 poll_interval_secs,
200 url,
201 tls_options,
202 request.headers.clone(),
203 proxy.clone(),
204 config_format,
205 ));
206
207 Ok(config_builder)
208 }
209}
210
211impl_generate_config_from_default!(HttpConfig);