use async_stream::stream;
use bytes::Buf;
use futures::Stream;
use hyper::Body;
use indexmap::IndexMap;
use tokio::time;
use url::Url;
use vector_lib::configurable::configurable_component;
use crate::{
config::{self, provider::ProviderConfig, ProxyConfig},
http::HttpClient,
signal,
tls::{TlsConfig, TlsSettings},
};
use super::BuildResult;
#[configurable_component]
#[derive(Clone, Debug)]
pub struct RequestConfig {
#[serde(default)]
pub headers: IndexMap<String, String>,
}
impl Default for RequestConfig {
fn default() -> Self {
Self {
headers: IndexMap::new(),
}
}
}
#[configurable_component(provider("http"))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields, default)]
pub struct HttpConfig {
url: Option<Url>,
#[configurable(derived)]
request: RequestConfig,
poll_interval_secs: u64,
#[serde(flatten)]
tls_options: Option<TlsConfig>,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
proxy: ProxyConfig,
}
impl Default for HttpConfig {
fn default() -> Self {
Self {
url: None,
request: RequestConfig::default(),
poll_interval_secs: 30,
tls_options: None,
proxy: Default::default(),
}
}
}
async fn http_request(
url: &Url,
tls_options: &Option<TlsConfig>,
headers: &IndexMap<String, String>,
proxy: &ProxyConfig,
) -> Result<bytes::Bytes, &'static str> {
let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
let http_client =
HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
let mut builder = http::request::Builder::new().uri(url.to_string());
for (header, value) in headers.iter() {
builder = builder.header(header.as_str(), value.as_str());
}
let request = builder
.body(Body::empty())
.map_err(|_| "Couldn't create HTTP request")?;
info!(
message = "Attempting to retrieve configuration.",
url = ?url.as_str()
);
let response = http_client.send(request).await.map_err(|err| {
let message = "HTTP error";
error!(
message = ?message,
error = ?err,
url = ?url.as_str());
message
})?;
info!(message = "Response received.", url = ?url.as_str());
hyper::body::to_bytes(response.into_body())
.await
.map_err(|err| {
let message = "Error interpreting response.";
let cause = err.into_cause();
error!(
message = ?message,
error = ?cause);
message
})
}
async fn http_request_to_config_builder(
url: &Url,
tls_options: &Option<TlsConfig>,
headers: &IndexMap<String, String>,
proxy: &ProxyConfig,
) -> BuildResult {
let config_str = http_request(url, tls_options, headers, proxy)
.await
.map_err(|e| vec![e.to_owned()])?;
config::load(config_str.chunk(), crate::config::format::Format::Toml)?
}
fn poll_http(
poll_interval_secs: u64,
url: Url,
tls_options: Option<TlsConfig>,
headers: IndexMap<String, String>,
proxy: ProxyConfig,
) -> impl Stream<Item = signal::SignalTo> {
let duration = time::Duration::from_secs(poll_interval_secs);
let mut interval = time::interval_at(time::Instant::now() + duration, duration);
stream! {
loop {
interval.tick().await;
match http_request_to_config_builder(&url, &tls_options, &headers, &proxy).await {
Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
Err(_) => {},
};
info!(
message = "HTTP provider is waiting.",
poll_interval_secs = ?poll_interval_secs,
url = ?url.as_str());
}
}
}
impl ProviderConfig for HttpConfig {
async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
let url = self
.url
.take()
.ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
let tls_options = self.tls_options.take();
let poll_interval_secs = self.poll_interval_secs;
let request = self.request.clone();
let proxy = ProxyConfig::from_env().merge(&self.proxy);
let config_builder =
http_request_to_config_builder(&url, &tls_options, &request.headers, &proxy).await?;
signal_handler.add(poll_http(
poll_interval_secs,
url,
tls_options,
request.headers.clone(),
proxy.clone(),
));
Ok(config_builder)
}
}
impl_generate_config_from_default!(HttpConfig);