vector/sources/util/
http_client.rs

1//! Common logic for sources that are HTTP clients.
2//!
3//! Specific HTTP client sources will:
4//!   - Call build_url() to build the URL(s) to call.
5//!   - Implement a specific context struct which:
6//!       - Contains the data that source needs in order to process the HTTP responses into internal_events
7//!       - Implements the HttpClient trait
8//!   - Call call() supplying the generic inputs for calling and the source-specific
9//!     context.
10
11// Okta source only imports defaults but doesn't use the rest of the client
12#![cfg_attr(feature = "sources-okta", allow(dead_code))]
13
14use std::{collections::HashMap, future::ready, time::Duration};
15
16use bytes::Bytes;
17use futures_util::{FutureExt, StreamExt, TryFutureExt, stream};
18use http::{Uri, response::Parts};
19use hyper::{Body, Request};
20use tokio_stream::wrappers::IntervalStream;
21use vector_lib::{
22    EstimatedJsonEncodedSizeOf, config::proxy::ProxyConfig, event::Event, json_size::JsonSize,
23    shutdown::ShutdownSignal,
24};
25
26use crate::{
27    SourceSender,
28    http::{Auth, HttpClient, QueryParameterValue, QueryParameters},
29    internal_events::{
30        EndpointBytesReceived, HttpClientEventsReceived, HttpClientHttpError,
31        HttpClientHttpResponseError, StreamClosedError,
32    },
33    sources::util::http::HttpMethod,
34    tls::TlsSettings,
35};
36
37/// Contains the inputs generic to any http client.
38pub(crate) struct GenericHttpClientInputs {
39    /// Array of URLs to call.
40    pub urls: Vec<Uri>,
41    /// Interval between calls.
42    pub interval: Duration,
43    /// Timeout for the HTTP request.
44    pub timeout: Duration,
45    /// Map of Header+Value to apply to HTTP request.
46    pub headers: HashMap<String, Vec<String>>,
47    /// Content type of the HTTP request, determined by the source.
48    pub content_type: String,
49    pub auth: Option<Auth>,
50    pub tls: TlsSettings,
51    pub proxy: ProxyConfig,
52    pub shutdown: ShutdownSignal,
53}
54
55/// The default interval to call the HTTP endpoint if none is configured.
56pub(crate) const fn default_interval() -> Duration {
57    Duration::from_secs(15)
58}
59
60/// The default timeout for the HTTP request if none is configured.
61pub(crate) const fn default_timeout() -> Duration {
62    Duration::from_secs(5)
63}
64
65/// Builds the context, allowing the source-specific implementation to leverage data from the
66/// config and the current HTTP request.
67pub(crate) trait HttpClientBuilder {
68    type Context: HttpClientContext;
69
70    /// Called before the HTTP request is made to build out the context.
71    fn build(&self, url: &Uri) -> Self::Context;
72}
73
74/// Methods that allow context-specific behavior during the scraping procedure.
75pub(crate) trait HttpClientContext {
76    /// Called after the HTTP request succeeds and returns the decoded/parsed Event array.
77    fn on_response(&mut self, url: &Uri, header: &Parts, body: &Bytes) -> Option<Vec<Event>>;
78
79    /// (Optional) Called if the HTTP response is not 200 ('OK').
80    fn on_http_response_error(&self, _uri: &Uri, _header: &Parts) {}
81
82    /// (Optional) Process the base URL before each request.
83    /// Allows for dynamic query parameters that update at runtime.
84    /// Returns a new URL if parameters need to be updated, or None to use the original URL.
85    fn process_url(&self, _url: &Uri) -> Option<Uri> {
86        None
87    }
88
89    /// (Optional) Get the request body to send with the HTTP request.
90    /// Returns the body as a String if one should be sent, or None for an empty body.
91    fn get_request_body(&self) -> Option<String> {
92        None
93    }
94
95    // This function can be defined to enrich events with additional HTTP
96    // metadata. This function should be used rather than internal enrichment so
97    // that accurate byte count metrics can be emitted.
98    fn enrich_events(&mut self, _events: &mut Vec<Event>) {}
99}
100
101/// Builds a url for the HTTP requests.
102pub(crate) fn build_url(uri: &Uri, query: &QueryParameters) -> Uri {
103    let mut serializer = url::form_urlencoded::Serializer::new(String::new());
104    if let Some(query) = uri.query() {
105        serializer.extend_pairs(url::form_urlencoded::parse(query.as_bytes()));
106    };
107    for (k, query_value) in query {
108        match query_value {
109            QueryParameterValue::SingleParam(param) => {
110                serializer.append_pair(k, param.value());
111            }
112            QueryParameterValue::MultiParams(params) => {
113                for v in params {
114                    serializer.append_pair(k, v.value());
115                }
116            }
117        };
118    }
119    let mut builder = Uri::builder();
120    if let Some(scheme) = uri.scheme() {
121        builder = builder.scheme(scheme.clone());
122    };
123    if let Some(authority) = uri.authority() {
124        builder = builder.authority(authority.clone());
125    };
126    builder = builder.path_and_query(match serializer.finish() {
127        query if !query.is_empty() => format!("{}?{}", uri.path(), query),
128        _ => uri.path().to_string(),
129    });
130    builder
131        .build()
132        .expect("Failed to build URI from parsed arguments")
133}
134
135/// Warns if the scrape timeout is greater than the scrape interval.
136pub(crate) fn warn_if_interval_too_low(timeout: Duration, interval: Duration) {
137    if timeout > interval {
138        warn!(
139            interval_secs = %interval.as_secs_f64(),
140            timeout_secs = %timeout.as_secs_f64(),
141            message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
142        );
143    }
144}
145
146/// Calls one or more urls at an interval.
147///   - The HTTP request is built per the options in provided generic inputs.
148///   - The HTTP response is decoded/parsed into events by the specific context.
149///   - The events are then sent to the output stream.
150pub(crate) async fn call<
151    B: HttpClientBuilder<Context = C> + Send + Clone,
152    C: HttpClientContext + Send,
153>(
154    inputs: GenericHttpClientInputs,
155    context_builder: B,
156    mut out: SourceSender,
157    http_method: HttpMethod,
158) -> Result<(), ()> {
159    // Building the HttpClient should not fail as it is just setting up the client with the
160    // proxy and tls settings.
161    let client =
162        HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed");
163    let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval))
164        .take_until(inputs.shutdown)
165        .map(move |_| stream::iter(inputs.urls.clone()))
166        .flatten()
167        .map(move |base_url| {
168            let client = client.clone();
169            let endpoint = base_url.to_string();
170
171            let context_builder = context_builder.clone();
172            let mut context = context_builder.build(&base_url);
173
174            // Check if we need to process the URL dynamically (for updating VRL expressions)
175            let url = context.process_url(&base_url).unwrap_or(base_url);
176
177            let mut builder = match http_method {
178                HttpMethod::Head => Request::head(&url),
179                HttpMethod::Get => Request::get(&url),
180                HttpMethod::Post => Request::post(&url),
181                HttpMethod::Put => Request::put(&url),
182                HttpMethod::Patch => Request::patch(&url),
183                HttpMethod::Delete => Request::delete(&url),
184                HttpMethod::Options => Request::options(&url),
185            };
186
187            // add user specified headers
188            for (header, values) in &inputs.headers {
189                for value in values {
190                    builder = builder.header(header, value);
191                }
192            }
193
194            // set ACCEPT header if not user specified
195            if !inputs.headers.contains_key(http::header::ACCEPT.as_str()) {
196                builder = builder.header(http::header::ACCEPT, &inputs.content_type);
197            }
198
199            // Get the request body from the context (if any)
200            let body = match context.get_request_body() {
201                Some(body_str) => {
202                    // Set Content-Type header if not already set
203                    if !inputs
204                        .headers
205                        .contains_key(http::header::CONTENT_TYPE.as_str())
206                    {
207                        builder = builder.header(http::header::CONTENT_TYPE, "application/json");
208                    }
209                    Body::from(body_str)
210                }
211                None => Body::empty(),
212            };
213
214            // building the request should be infallible
215            let mut request = builder.body(body).expect("error creating request");
216
217            if let Some(auth) = &inputs.auth {
218                auth.apply(&mut request);
219            }
220
221            tokio::time::timeout(inputs.timeout, client.send(request))
222                .then(move |result| async move {
223                    match result {
224                        Ok(Ok(response)) => Ok(response),
225                        Ok(Err(error)) => Err(error.into()),
226                        Err(_) => Err(format!(
227                            "Timeout error: request exceeded {}s",
228                            inputs.timeout.as_secs_f64()
229                        )
230                        .into()),
231                    }
232                })
233                .and_then(|response| async move {
234                    let (header, body) = response.into_parts();
235                    let body = http_body::Body::collect(body).await?.to_bytes();
236                    emit!(EndpointBytesReceived {
237                        byte_size: body.len(),
238                        protocol: "http",
239                        endpoint: endpoint.as_str(),
240                    });
241                    Ok((header, body))
242                })
243                .into_stream()
244                .filter_map(move |response| {
245                    ready(match response {
246                        Ok((header, body)) if header.status == hyper::StatusCode::OK => {
247                            context.on_response(&url, &header, &body).map(|mut events| {
248                                let byte_size = if events.is_empty() {
249                                    // We need to explicitly set the byte size
250                                    // to 0 since
251                                    // `estimated_json_encoded_size_of` returns
252                                    // at least 1 for an empty collection. For
253                                    // the purposes of the
254                                    // HttpClientEventsReceived event, we should
255                                    // emit 0 when there aren't any usable
256                                    // metrics.
257                                    JsonSize::zero()
258                                } else {
259                                    events.estimated_json_encoded_size_of()
260                                };
261
262                                emit!(HttpClientEventsReceived {
263                                    byte_size,
264                                    count: events.len(),
265                                    url: url.to_string()
266                                });
267
268                                // We'll enrich after receiving the events so
269                                // that the byte sizes are accurate.
270                                context.enrich_events(&mut events);
271
272                                stream::iter(events)
273                            })
274                        }
275                        Ok((header, _)) => {
276                            context.on_http_response_error(&url, &header);
277                            emit!(HttpClientHttpResponseError {
278                                code: header.status,
279                                url: url.to_string(),
280                            });
281                            None
282                        }
283                        Err(error) => {
284                            emit!(HttpClientHttpError {
285                                error,
286                                url: url.to_string()
287                            });
288                            None
289                        }
290                    })
291                })
292                .flatten()
293                .boxed()
294        })
295        .flatten_unordered(None)
296        .boxed();
297
298    match out.send_event_stream(&mut stream).await {
299        Ok(()) => {
300            debug!("Finished sending.");
301            Ok(())
302        }
303        Err(_) => {
304            let (count, _) = stream.size_hint();
305            emit!(StreamClosedError { count });
306            Err(())
307        }
308    }
309}