vector/sources/util/
http_client.rs

1//! Common logic for sources that are HTTP clients.
2//!
3//! Specific HTTP client sources will:
4//!   - Call build_url() to build the URL(s) to call.
5//!   - Implement a specific context struct which:
6//!       - Contains the data that source needs in order to process the HTTP responses into internal_events
7//!       - Implements the HttpClient trait
8//!   - Call call() supplying the generic inputs for calling and the source-specific
9//!     context.
10
11use bytes::Bytes;
12use futures_util::{stream, FutureExt, StreamExt, TryFutureExt};
13use http::{response::Parts, Uri};
14use hyper::{Body, Request};
15use std::time::Duration;
16use std::{collections::HashMap, future::ready};
17use tokio_stream::wrappers::IntervalStream;
18use vector_lib::json_size::JsonSize;
19
20use crate::http::{QueryParameterValue, QueryParameters};
21use crate::{
22    http::{Auth, HttpClient},
23    internal_events::{
24        EndpointBytesReceived, HttpClientEventsReceived, HttpClientHttpError,
25        HttpClientHttpResponseError, StreamClosedError,
26    },
27    sources::util::http::HttpMethod,
28    tls::TlsSettings,
29    SourceSender,
30};
31use vector_lib::shutdown::ShutdownSignal;
32use vector_lib::{config::proxy::ProxyConfig, event::Event, EstimatedJsonEncodedSizeOf};
33
34/// Contains the inputs generic to any http client.
35pub(crate) struct GenericHttpClientInputs {
36    /// Array of URLs to call.
37    pub urls: Vec<Uri>,
38    /// Interval between calls.
39    pub interval: Duration,
40    /// Timeout for the HTTP request.
41    pub timeout: Duration,
42    /// Map of Header+Value to apply to HTTP request.
43    pub headers: HashMap<String, Vec<String>>,
44    /// Content type of the HTTP request, determined by the source.
45    pub content_type: String,
46    pub auth: Option<Auth>,
47    pub tls: TlsSettings,
48    pub proxy: ProxyConfig,
49    pub shutdown: ShutdownSignal,
50}
51
52/// The default interval to call the HTTP endpoint if none is configured.
53pub(crate) const fn default_interval() -> Duration {
54    Duration::from_secs(15)
55}
56
57/// The default timeout for the HTTP request if none is configured.
58pub(crate) const fn default_timeout() -> Duration {
59    Duration::from_secs(5)
60}
61
62/// Builds the context, allowing the source-specific implementation to leverage data from the
63/// config and the current HTTP request.
64pub(crate) trait HttpClientBuilder {
65    type Context: HttpClientContext;
66
67    /// Called before the HTTP request is made to build out the context.
68    fn build(&self, url: &Uri) -> Self::Context;
69}
70
71/// Methods that allow context-specific behavior during the scraping procedure.
72pub(crate) trait HttpClientContext {
73    /// Called after the HTTP request succeeds and returns the decoded/parsed Event array.
74    fn on_response(&mut self, url: &Uri, header: &Parts, body: &Bytes) -> Option<Vec<Event>>;
75
76    /// (Optional) Called if the HTTP response is not 200 ('OK').
77    fn on_http_response_error(&self, _uri: &Uri, _header: &Parts) {}
78
79    /// (Optional) Process the base URL before each request.
80    /// Allows for dynamic query parameters that update at runtime.
81    /// Returns a new URL if parameters need to be updated, or None to use the original URL.
82    fn process_url(&self, _url: &Uri) -> Option<Uri> {
83        None
84    }
85
86    // This function can be defined to enrich events with additional HTTP
87    // metadata. This function should be used rather than internal enrichment so
88    // that accurate byte count metrics can be emitted.
89    fn enrich_events(&mut self, _events: &mut Vec<Event>) {}
90}
91
92/// Builds a url for the HTTP requests.
93pub(crate) fn build_url(uri: &Uri, query: &QueryParameters) -> Uri {
94    let mut serializer = url::form_urlencoded::Serializer::new(String::new());
95    if let Some(query) = uri.query() {
96        serializer.extend_pairs(url::form_urlencoded::parse(query.as_bytes()));
97    };
98    for (k, query_value) in query {
99        match query_value {
100            QueryParameterValue::SingleParam(param) => {
101                serializer.append_pair(k, param.value());
102            }
103            QueryParameterValue::MultiParams(params) => {
104                for v in params {
105                    serializer.append_pair(k, v.value());
106                }
107            }
108        };
109    }
110    let mut builder = Uri::builder();
111    if let Some(scheme) = uri.scheme() {
112        builder = builder.scheme(scheme.clone());
113    };
114    if let Some(authority) = uri.authority() {
115        builder = builder.authority(authority.clone());
116    };
117    builder = builder.path_and_query(match serializer.finish() {
118        query if !query.is_empty() => format!("{}?{}", uri.path(), query),
119        _ => uri.path().to_string(),
120    });
121    builder
122        .build()
123        .expect("Failed to build URI from parsed arguments")
124}
125
126/// Warns if the scrape timeout is greater than the scrape interval.
127pub(crate) fn warn_if_interval_too_low(timeout: Duration, interval: Duration) {
128    if timeout > interval {
129        warn!(
130            interval_secs = %interval.as_secs_f64(),
131            timeout_secs = %timeout.as_secs_f64(),
132            message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
133        );
134    }
135}
136
137/// Calls one or more urls at an interval.
138///   - The HTTP request is built per the options in provided generic inputs.
139///   - The HTTP response is decoded/parsed into events by the specific context.
140///   - The events are then sent to the output stream.
141pub(crate) async fn call<
142    B: HttpClientBuilder<Context = C> + Send + Clone,
143    C: HttpClientContext + Send,
144>(
145    inputs: GenericHttpClientInputs,
146    context_builder: B,
147    mut out: SourceSender,
148    http_method: HttpMethod,
149) -> Result<(), ()> {
150    // Building the HttpClient should not fail as it is just setting up the client with the
151    // proxy and tls settings.
152    let client =
153        HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed");
154    let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval))
155        .take_until(inputs.shutdown)
156        .map(move |_| stream::iter(inputs.urls.clone()))
157        .flatten()
158        .map(move |base_url| {
159            let client = client.clone();
160            let endpoint = base_url.to_string();
161
162            let context_builder = context_builder.clone();
163            let mut context = context_builder.build(&base_url);
164
165            // Check if we need to process the URL dynamically (for updating VRL expressions)
166            let url = context.process_url(&base_url).unwrap_or(base_url);
167
168            let mut builder = match http_method {
169                HttpMethod::Head => Request::head(&url),
170                HttpMethod::Get => Request::get(&url),
171                HttpMethod::Post => Request::post(&url),
172                HttpMethod::Put => Request::put(&url),
173                HttpMethod::Patch => Request::patch(&url),
174                HttpMethod::Delete => Request::delete(&url),
175                HttpMethod::Options => Request::options(&url),
176            };
177
178            // add user specified headers
179            for (header, values) in &inputs.headers {
180                for value in values {
181                    builder = builder.header(header, value);
182                }
183            }
184
185            // set ACCEPT header if not user specified
186            if !inputs.headers.contains_key(http::header::ACCEPT.as_str()) {
187                builder = builder.header(http::header::ACCEPT, &inputs.content_type);
188            }
189
190            // building an empty request should be infallible
191            let mut request = builder.body(Body::empty()).expect("error creating request");
192
193            if let Some(auth) = &inputs.auth {
194                auth.apply(&mut request);
195            }
196
197            tokio::time::timeout(inputs.timeout, client.send(request))
198                .then(move |result| async move {
199                    match result {
200                        Ok(Ok(response)) => Ok(response),
201                        Ok(Err(error)) => Err(error.into()),
202                        Err(_) => Err(format!(
203                            "Timeout error: request exceeded {}s",
204                            inputs.timeout.as_secs_f64()
205                        )
206                        .into()),
207                    }
208                })
209                .and_then(|response| async move {
210                    let (header, body) = response.into_parts();
211                    let body = hyper::body::to_bytes(body).await?;
212                    emit!(EndpointBytesReceived {
213                        byte_size: body.len(),
214                        protocol: "http",
215                        endpoint: endpoint.as_str(),
216                    });
217                    Ok((header, body))
218                })
219                .into_stream()
220                .filter_map(move |response| {
221                    ready(match response {
222                        Ok((header, body)) if header.status == hyper::StatusCode::OK => {
223                            context.on_response(&url, &header, &body).map(|mut events| {
224                                let byte_size = if events.is_empty() {
225                                    // We need to explicitly set the byte size
226                                    // to 0 since
227                                    // `estimated_json_encoded_size_of` returns
228                                    // at least 1 for an empty collection. For
229                                    // the purposes of the
230                                    // HttpClientEventsReceived event, we should
231                                    // emit 0 when there aren't any usable
232                                    // metrics.
233                                    JsonSize::zero()
234                                } else {
235                                    events.estimated_json_encoded_size_of()
236                                };
237
238                                emit!(HttpClientEventsReceived {
239                                    byte_size,
240                                    count: events.len(),
241                                    url: url.to_string()
242                                });
243
244                                // We'll enrich after receiving the events so
245                                // that the byte sizes are accurate.
246                                context.enrich_events(&mut events);
247
248                                stream::iter(events)
249                            })
250                        }
251                        Ok((header, _)) => {
252                            context.on_http_response_error(&url, &header);
253                            emit!(HttpClientHttpResponseError {
254                                code: header.status,
255                                url: url.to_string(),
256                            });
257                            None
258                        }
259                        Err(error) => {
260                            emit!(HttpClientHttpError {
261                                error,
262                                url: url.to_string()
263                            });
264                            None
265                        }
266                    })
267                })
268                .flatten()
269                .boxed()
270        })
271        .flatten_unordered(None)
272        .boxed();
273
274    match out.send_event_stream(&mut stream).await {
275        Ok(()) => {
276            debug!("Finished sending.");
277            Ok(())
278        }
279        Err(_) => {
280            let (count, _) = stream.size_hint();
281            emit!(StreamClosedError { count });
282            Err(())
283        }
284    }
285}