1use async_stream::stream;
2use bytes::Buf;
3use futures::Stream;
4use hyper::Body;
5use indexmap::IndexMap;
6use tokio::time;
7use url::Url;
8use vector_lib::configurable::configurable_component;
9
10use crate::{
11 config::{self, provider::ProviderConfig, Format, ProxyConfig},
12 http::HttpClient,
13 signal,
14 tls::{TlsConfig, TlsSettings},
15};
16
17use super::BuildResult;
18
19#[configurable_component]
21#[derive(Clone, Debug)]
22pub struct RequestConfig {
23 #[serde(default)]
25 pub headers: IndexMap<String, String>,
26}
27
28impl Default for RequestConfig {
29 fn default() -> Self {
30 Self {
31 headers: IndexMap::new(),
32 }
33 }
34}
35
36#[configurable_component(provider("http"))]
38#[derive(Clone, Debug)]
39#[serde(deny_unknown_fields, default)]
40pub struct HttpConfig {
41 url: Option<Url>,
43
44 #[configurable(derived)]
45 request: RequestConfig,
46
47 poll_interval_secs: u64,
49
50 #[serde(flatten)]
51 tls_options: Option<TlsConfig>,
52
53 #[configurable(derived)]
54 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
55 proxy: ProxyConfig,
56
57 #[configurable(derived)]
59 config_format: Format,
60}
61
62impl Default for HttpConfig {
63 fn default() -> Self {
64 Self {
65 url: None,
66 request: RequestConfig::default(),
67 poll_interval_secs: 30,
68 tls_options: None,
69 proxy: Default::default(),
70 config_format: Format::default(),
71 }
72 }
73}
74
75async fn http_request(
77 url: &Url,
78 tls_options: Option<&TlsConfig>,
79 headers: &IndexMap<String, String>,
80 proxy: &ProxyConfig,
81) -> Result<bytes::Bytes, &'static str> {
82 let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
83 let http_client =
84 HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
85
86 let mut builder = http::request::Builder::new().uri(url.to_string());
88
89 for (header, value) in headers.iter() {
92 builder = builder.header(header.as_str(), value.as_str());
93 }
94
95 let request = builder
96 .body(Body::empty())
97 .map_err(|_| "Couldn't create HTTP request")?;
98
99 info!(
100 message = "Attempting to retrieve configuration.",
101 url = ?url.as_str()
102 );
103
104 let response = http_client.send(request).await.map_err(|err| {
105 let message = "HTTP error";
106 error!(
107 message = ?message,
108 error = ?err,
109 url = ?url.as_str());
110 message
111 })?;
112
113 info!(message = "Response received.", url = ?url.as_str());
114
115 hyper::body::to_bytes(response.into_body())
116 .await
117 .map_err(|err| {
118 let message = "Error interpreting response.";
119 let cause = err.into_cause();
120 error!(
121 message = ?message,
122 error = ?cause);
123
124 message
125 })
126}
127
128async fn http_request_to_config_builder(
130 url: &Url,
131 tls_options: Option<&TlsConfig>,
132 headers: &IndexMap<String, String>,
133 proxy: &ProxyConfig,
134 config_format: &Format,
135) -> BuildResult {
136 let config_str = http_request(url, tls_options, headers, proxy)
137 .await
138 .map_err(|e| vec![e.to_owned()])?;
139
140 config::load(config_str.chunk(), *config_format)
141}
142
143fn poll_http(
145 poll_interval_secs: u64,
146 url: Url,
147 tls_options: Option<TlsConfig>,
148 headers: IndexMap<String, String>,
149 proxy: ProxyConfig,
150 config_format: Format,
151) -> impl Stream<Item = signal::SignalTo> {
152 let duration = time::Duration::from_secs(poll_interval_secs);
153 let mut interval = time::interval_at(time::Instant::now() + duration, duration);
154
155 stream! {
156 loop {
157 interval.tick().await;
158
159 match http_request_to_config_builder(&url, tls_options.as_ref(), &headers, &proxy, &config_format).await {
160 Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
161 Err(_) => {},
162 };
163
164 info!(
165 message = "HTTP provider is waiting.",
166 poll_interval_secs = ?poll_interval_secs,
167 url = ?url.as_str());
168 }
169 }
170}
171
172impl ProviderConfig for HttpConfig {
173 async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
174 let url = self
175 .url
176 .take()
177 .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
178
179 let tls_options = self.tls_options.take();
180 let poll_interval_secs = self.poll_interval_secs;
181 let request = self.request.clone();
182 let config_format = self.config_format;
183
184 let proxy = ProxyConfig::from_env().merge(&self.proxy);
185 let config_builder = http_request_to_config_builder(
186 &url,
187 tls_options.as_ref(),
188 &request.headers,
189 &proxy,
190 &config_format,
191 )
192 .await?;
193
194 signal_handler.add(poll_http(
196 poll_interval_secs,
197 url,
198 tls_options,
199 request.headers.clone(),
200 proxy.clone(),
201 config_format,
202 ));
203
204 Ok(config_builder)
205 }
206}
207
208impl_generate_config_from_default!(HttpConfig);