1use async_stream::stream;
2use bytes::Buf;
3use futures::Stream;
4use hyper::Body;
5use indexmap::IndexMap;
6use tokio::time;
7use url::Url;
8use vector_lib::configurable::configurable_component;
9
10use super::BuildResult;
11use crate::{
12 config::{self, Format, ProxyConfig, provider::ProviderConfig},
13 http::HttpClient,
14 signal,
15 tls::{TlsConfig, TlsSettings},
16};
17
18#[configurable_component]
20#[derive(Clone, Debug)]
21pub struct RequestConfig {
22 #[serde(default)]
24 pub headers: IndexMap<String, String>,
25}
26
27impl Default for RequestConfig {
28 fn default() -> Self {
29 Self {
30 headers: IndexMap::new(),
31 }
32 }
33}
34
35#[configurable_component(provider("http"))]
37#[derive(Clone, Debug)]
38#[serde(deny_unknown_fields, default)]
39pub struct HttpConfig {
40 url: Option<Url>,
42
43 #[configurable(derived)]
44 request: RequestConfig,
45
46 poll_interval_secs: u64,
48
49 #[serde(flatten)]
50 tls_options: Option<TlsConfig>,
51
52 #[configurable(derived)]
53 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
54 proxy: ProxyConfig,
55
56 #[configurable(derived)]
58 config_format: Format,
59}
60
61impl Default for HttpConfig {
62 fn default() -> Self {
63 Self {
64 url: None,
65 request: RequestConfig::default(),
66 poll_interval_secs: 30,
67 tls_options: None,
68 proxy: Default::default(),
69 config_format: Format::default(),
70 }
71 }
72}
73
74async fn http_request(
76 url: &Url,
77 tls_options: Option<&TlsConfig>,
78 headers: &IndexMap<String, String>,
79 proxy: &ProxyConfig,
80) -> Result<bytes::Bytes, &'static str> {
81 let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
82 let http_client =
83 HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
84
85 let mut builder = http::request::Builder::new().uri(url.to_string());
87
88 for (header, value) in headers.iter() {
91 builder = builder.header(header.as_str(), value.as_str());
92 }
93
94 let request = builder
95 .body(Body::empty())
96 .map_err(|_| "Couldn't create HTTP request")?;
97
98 info!(
99 message = "Attempting to retrieve configuration.",
100 url = ?url.as_str()
101 );
102
103 let response = http_client.send(request).await.map_err(|err| {
104 let message = "HTTP error";
105 error!(
106 message = ?message,
107 error = ?err,
108 url = ?url.as_str());
109 message
110 })?;
111
112 info!(message = "Response received.", url = ?url.as_str());
113
114 hyper::body::to_bytes(response.into_body())
115 .await
116 .map_err(|err| {
117 let message = "Error interpreting response.";
118 let cause = err.into_cause();
119 error!(
120 message = ?message,
121 error = ?cause);
122
123 message
124 })
125}
126
127async fn http_request_to_config_builder(
129 url: &Url,
130 tls_options: Option<&TlsConfig>,
131 headers: &IndexMap<String, String>,
132 proxy: &ProxyConfig,
133 config_format: &Format,
134) -> BuildResult {
135 let config_str = http_request(url, tls_options, headers, proxy)
136 .await
137 .map_err(|e| vec![e.to_owned()])?;
138
139 config::load(config_str.chunk(), *config_format)
140}
141
142fn poll_http(
144 poll_interval_secs: u64,
145 url: Url,
146 tls_options: Option<TlsConfig>,
147 headers: IndexMap<String, String>,
148 proxy: ProxyConfig,
149 config_format: Format,
150) -> impl Stream<Item = signal::SignalTo> {
151 let duration = time::Duration::from_secs(poll_interval_secs);
152 let mut interval = time::interval_at(time::Instant::now() + duration, duration);
153
154 stream! {
155 loop {
156 interval.tick().await;
157
158 match http_request_to_config_builder(&url, tls_options.as_ref(), &headers, &proxy, &config_format).await {
159 Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
160 Err(_) => {},
161 };
162
163 info!(
164 message = "HTTP provider is waiting.",
165 poll_interval_secs = ?poll_interval_secs,
166 url = ?url.as_str());
167 }
168 }
169}
170
171impl ProviderConfig for HttpConfig {
172 async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
173 let url = self
174 .url
175 .take()
176 .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
177
178 let tls_options = self.tls_options.take();
179 let poll_interval_secs = self.poll_interval_secs;
180 let request = self.request.clone();
181 let config_format = self.config_format;
182
183 let proxy = ProxyConfig::from_env().merge(&self.proxy);
184 let config_builder = http_request_to_config_builder(
185 &url,
186 tls_options.as_ref(),
187 &request.headers,
188 &proxy,
189 &config_format,
190 )
191 .await?;
192
193 signal_handler.add(poll_http(
195 poll_interval_secs,
196 url,
197 tls_options,
198 request.headers.clone(),
199 proxy.clone(),
200 config_format,
201 ));
202
203 Ok(config_builder)
204 }
205}
206
207impl_generate_config_from_default!(HttpConfig);