1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
use async_stream::stream;
use bytes::Buf;
use futures::Stream;
use hyper::Body;
use indexmap::IndexMap;
use tokio::time;
use url::Url;
use vector_lib::configurable::configurable_component;

use crate::{
    config::{self, provider::ProviderConfig, ProxyConfig},
    http::HttpClient,
    signal,
    tls::{TlsConfig, TlsSettings},
};

use super::BuildResult;

/// Request settings.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct RequestConfig {
    /// HTTP headers to add to the request.
    #[serde(default)]
    pub headers: IndexMap<String, String>,
}

impl Default for RequestConfig {
    fn default() -> Self {
        Self {
            headers: IndexMap::new(),
        }
    }
}

/// Configuration for the `http` provider.
#[configurable_component(provider("http"))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields, default)]
pub struct HttpConfig {
    /// URL for the HTTP provider.
    url: Option<Url>,

    #[configurable(derived)]
    request: RequestConfig,

    /// How often to poll the provider, in seconds.
    poll_interval_secs: u64,

    #[serde(flatten)]
    tls_options: Option<TlsConfig>,

    #[configurable(derived)]
    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
    proxy: ProxyConfig,
}

impl Default for HttpConfig {
    fn default() -> Self {
        Self {
            url: None,
            request: RequestConfig::default(),
            poll_interval_secs: 30,
            tls_options: None,
            proxy: Default::default(),
        }
    }
}

/// Makes an HTTP request to the provided endpoint, returning the String body.
async fn http_request(
    url: &Url,
    tls_options: &Option<TlsConfig>,
    headers: &IndexMap<String, String>,
    proxy: &ProxyConfig,
) -> Result<bytes::Bytes, &'static str> {
    let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
    let http_client =
        HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;

    // Build HTTP request.
    let mut builder = http::request::Builder::new().uri(url.to_string());

    // Augment with headers. These may be required e.g. for authentication to
    // private endpoints.
    for (header, value) in headers.iter() {
        builder = builder.header(header.as_str(), value.as_str());
    }

    let request = builder
        .body(Body::empty())
        .map_err(|_| "Couldn't create HTTP request")?;

    info!(
        message = "Attempting to retrieve configuration.",
        url = ?url.as_str()
    );

    let response = http_client.send(request).await.map_err(|err| {
        let message = "HTTP error";
        error!(
            message = ?message,
            error = ?err,
            url = ?url.as_str());
        message
    })?;

    info!(message = "Response received.", url = ?url.as_str());

    hyper::body::to_bytes(response.into_body())
        .await
        .map_err(|err| {
            let message = "Error interpreting response.";
            let cause = err.into_cause();
            error!(
                    message = ?message,
                    error = ?cause);

            message
        })
}

/// Calls `http_request`, serializing the result to a `ConfigBuilder`.
async fn http_request_to_config_builder(
    url: &Url,
    tls_options: &Option<TlsConfig>,
    headers: &IndexMap<String, String>,
    proxy: &ProxyConfig,
) -> BuildResult {
    let config_str = http_request(url, tls_options, headers, proxy)
        .await
        .map_err(|e| vec![e.to_owned()])?;

    config::load(config_str.chunk(), crate::config::format::Format::Toml)?
}

/// Polls the HTTP endpoint after/every `poll_interval_secs`, returning a stream of `ConfigBuilder`.
fn poll_http(
    poll_interval_secs: u64,
    url: Url,
    tls_options: Option<TlsConfig>,
    headers: IndexMap<String, String>,
    proxy: ProxyConfig,
) -> impl Stream<Item = signal::SignalTo> {
    let duration = time::Duration::from_secs(poll_interval_secs);
    let mut interval = time::interval_at(time::Instant::now() + duration, duration);

    stream! {
        loop {
            interval.tick().await;

            match http_request_to_config_builder(&url, &tls_options, &headers, &proxy).await {
                Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
                Err(_) => {},
            };

            info!(
                message = "HTTP provider is waiting.",
                poll_interval_secs = ?poll_interval_secs,
                url = ?url.as_str());
        }
    }
}

impl ProviderConfig for HttpConfig {
    async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
        let url = self
            .url
            .take()
            .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;

        let tls_options = self.tls_options.take();
        let poll_interval_secs = self.poll_interval_secs;
        let request = self.request.clone();

        let proxy = ProxyConfig::from_env().merge(&self.proxy);
        let config_builder =
            http_request_to_config_builder(&url, &tls_options, &request.headers, &proxy).await?;

        // Poll for changes to remote configuration.
        signal_handler.add(poll_http(
            poll_interval_secs,
            url,
            tls_options,
            request.headers.clone(),
            proxy.clone(),
        ));

        Ok(config_builder)
    }
}

impl_generate_config_from_default!(HttpConfig);