vector/sources/util/http_client.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
//! Common logic for sources that are HTTP clients.
//!
//! Specific HTTP client sources will:
//! - Call build_url() to build the URL(s) to call.
//! - Implement a specific context struct which:
//! - Contains the data that source needs in order to process the HTTP responses into internal_events
//! - Implements the HttpClient trait
//! - Call call() supplying the generic inputs for calling and the source-specific
//! context.
use bytes::Bytes;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt};
use http::{response::Parts, Uri};
use hyper::{Body, Request};
use std::time::Duration;
use std::{collections::HashMap, future::ready};
use tokio_stream::wrappers::IntervalStream;
use vector_lib::json_size::JsonSize;
use crate::{
http::{Auth, HttpClient},
internal_events::{
EndpointBytesReceived, HttpClientEventsReceived, HttpClientHttpError,
HttpClientHttpResponseError, StreamClosedError,
},
sources::util::http::HttpMethod,
tls::TlsSettings,
SourceSender,
};
use vector_lib::shutdown::ShutdownSignal;
use vector_lib::{config::proxy::ProxyConfig, event::Event, EstimatedJsonEncodedSizeOf};
/// Contains the inputs generic to any http client.
pub(crate) struct GenericHttpClientInputs {
/// Array of URLs to call.
pub urls: Vec<Uri>,
/// Interval between calls.
pub interval: Duration,
/// Timeout for the HTTP request.
pub timeout: Duration,
/// Map of Header+Value to apply to HTTP request.
pub headers: HashMap<String, Vec<String>>,
/// Content type of the HTTP request, determined by the source.
pub content_type: String,
pub auth: Option<Auth>,
pub tls: TlsSettings,
pub proxy: ProxyConfig,
pub shutdown: ShutdownSignal,
}
/// The default interval to call the HTTP endpoint if none is configured.
pub(crate) const fn default_interval() -> Duration {
Duration::from_secs(15)
}
/// The default timeout for the HTTP request if none is configured.
pub(crate) const fn default_timeout() -> Duration {
Duration::from_secs(5)
}
/// Builds the context, allowing the source-specific implementation to leverage data from the
/// config and the current HTTP request.
pub(crate) trait HttpClientBuilder {
type Context: HttpClientContext;
/// Called before the HTTP request is made to build out the context.
fn build(&self, url: &Uri) -> Self::Context;
}
/// Methods that allow context-specific behavior during the scraping procedure.
pub(crate) trait HttpClientContext {
/// Called after the HTTP request succeeds and returns the decoded/parsed Event array.
fn on_response(&mut self, url: &Uri, header: &Parts, body: &Bytes) -> Option<Vec<Event>>;
/// (Optional) Called if the HTTP response is not 200 ('OK').
fn on_http_response_error(&self, _uri: &Uri, _header: &Parts) {}
// This function can be defined to enrich events with additional HTTP
// metadata. This function should be used rather than internal enrichment so
// that accurate byte count metrics can be emitted.
fn enrich_events(&mut self, _events: &mut Vec<Event>) {}
}
/// Builds a url for the HTTP requests.
pub(crate) fn build_url(uri: &Uri, query: &HashMap<String, Vec<String>>) -> Uri {
let mut serializer = url::form_urlencoded::Serializer::new(String::new());
if let Some(query) = uri.query() {
serializer.extend_pairs(url::form_urlencoded::parse(query.as_bytes()));
};
for (k, l) in query {
for v in l {
serializer.append_pair(k, v);
}
}
let mut builder = Uri::builder();
if let Some(scheme) = uri.scheme() {
builder = builder.scheme(scheme.clone());
};
if let Some(authority) = uri.authority() {
builder = builder.authority(authority.clone());
};
builder = builder.path_and_query(match serializer.finish() {
query if !query.is_empty() => format!("{}?{}", uri.path(), query),
_ => uri.path().to_string(),
});
builder
.build()
.expect("Failed to build URI from parsed arguments")
}
/// Warns if the scrape timeout is greater than the scrape interval.
pub(crate) fn warn_if_interval_too_low(timeout: Duration, interval: Duration) {
if timeout > interval {
warn!(
interval_secs = %interval.as_secs_f64(),
timeout_secs = %timeout.as_secs_f64(),
message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
);
}
}
/// Calls one or more urls at an interval.
/// - The HTTP request is built per the options in provided generic inputs.
/// - The HTTP response is decoded/parsed into events by the specific context.
/// - The events are then sent to the output stream.
pub(crate) async fn call<
B: HttpClientBuilder<Context = C> + Send + Clone,
C: HttpClientContext + Send,
>(
inputs: GenericHttpClientInputs,
context_builder: B,
mut out: SourceSender,
http_method: HttpMethod,
) -> Result<(), ()> {
// Building the HttpClient should not fail as it is just setting up the client with the
// proxy and tls settings.
let client =
HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed");
let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval))
.take_until(inputs.shutdown)
.map(move |_| stream::iter(inputs.urls.clone()))
.flatten()
.map(move |url| {
let client = client.clone();
let endpoint = url.to_string();
let context_builder = context_builder.clone();
let mut context = context_builder.build(&url);
let mut builder = match http_method {
HttpMethod::Head => Request::head(&url),
HttpMethod::Get => Request::get(&url),
HttpMethod::Post => Request::post(&url),
HttpMethod::Put => Request::put(&url),
HttpMethod::Patch => Request::patch(&url),
HttpMethod::Delete => Request::delete(&url),
HttpMethod::Options => Request::options(&url),
};
// add user specified headers
for (header, values) in &inputs.headers {
for value in values {
builder = builder.header(header, value);
}
}
// set ACCEPT header if not user specified
if !inputs.headers.contains_key(http::header::ACCEPT.as_str()) {
builder = builder.header(http::header::ACCEPT, &inputs.content_type);
}
// building an empty request should be infallible
let mut request = builder.body(Body::empty()).expect("error creating request");
if let Some(auth) = &inputs.auth {
auth.apply(&mut request);
}
tokio::time::timeout(inputs.timeout, client.send(request))
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error.into()),
Err(_) => Err(format!(
"Timeout error: request exceeded {}s",
inputs.timeout.as_secs_f64()
)
.into()),
}
})
.and_then(|response| async move {
let (header, body) = response.into_parts();
let body = hyper::body::to_bytes(body).await?;
emit!(EndpointBytesReceived {
byte_size: body.len(),
protocol: "http",
endpoint: endpoint.as_str(),
});
Ok((header, body))
})
.into_stream()
.filter_map(move |response| {
ready(match response {
Ok((header, body)) if header.status == hyper::StatusCode::OK => {
context.on_response(&url, &header, &body).map(|mut events| {
let byte_size = if events.is_empty() {
// We need to explicitly set the byte size
// to 0 since
// `estimated_json_encoded_size_of` returns
// at least 1 for an empty collection. For
// the purposes of the
// HttpClientEventsReceived event, we should
// emit 0 when there aren't any usable
// metrics.
JsonSize::zero()
} else {
events.estimated_json_encoded_size_of()
};
emit!(HttpClientEventsReceived {
byte_size,
count: events.len(),
url: url.to_string()
});
// We'll enrich after receiving the events so
// that the byte sizes are accurate.
context.enrich_events(&mut events);
stream::iter(events)
})
}
Ok((header, _)) => {
context.on_http_response_error(&url, &header);
emit!(HttpClientHttpResponseError {
code: header.status,
url: url.to_string(),
});
None
}
Err(error) => {
emit!(HttpClientHttpError {
error,
url: url.to_string()
});
None
}
})
})
.flatten()
.boxed()
})
.flatten_unordered(None)
.boxed();
match out.send_event_stream(&mut stream).await {
Ok(()) => {
debug!("Finished sending.");
Ok(())
}
Err(_) => {
let (count, _) = stream.size_hint();
emit!(StreamClosedError { count });
Err(())
}
}
}