1use bytes::Bytes;
12use futures_util::{stream, FutureExt, StreamExt, TryFutureExt};
13use http::{response::Parts, Uri};
14use hyper::{Body, Request};
15use std::time::Duration;
16use std::{collections::HashMap, future::ready};
17use tokio_stream::wrappers::IntervalStream;
18use vector_lib::json_size::JsonSize;
19
20use crate::http::{QueryParameterValue, QueryParameters};
21use crate::{
22 http::{Auth, HttpClient},
23 internal_events::{
24 EndpointBytesReceived, HttpClientEventsReceived, HttpClientHttpError,
25 HttpClientHttpResponseError, StreamClosedError,
26 },
27 sources::util::http::HttpMethod,
28 tls::TlsSettings,
29 SourceSender,
30};
31use vector_lib::shutdown::ShutdownSignal;
32use vector_lib::{config::proxy::ProxyConfig, event::Event, EstimatedJsonEncodedSizeOf};
33
34pub(crate) struct GenericHttpClientInputs {
36 pub urls: Vec<Uri>,
38 pub interval: Duration,
40 pub timeout: Duration,
42 pub headers: HashMap<String, Vec<String>>,
44 pub content_type: String,
46 pub auth: Option<Auth>,
47 pub tls: TlsSettings,
48 pub proxy: ProxyConfig,
49 pub shutdown: ShutdownSignal,
50}
51
52pub(crate) const fn default_interval() -> Duration {
54 Duration::from_secs(15)
55}
56
57pub(crate) const fn default_timeout() -> Duration {
59 Duration::from_secs(5)
60}
61
62pub(crate) trait HttpClientBuilder {
65 type Context: HttpClientContext;
66
67 fn build(&self, url: &Uri) -> Self::Context;
69}
70
71pub(crate) trait HttpClientContext {
73 fn on_response(&mut self, url: &Uri, header: &Parts, body: &Bytes) -> Option<Vec<Event>>;
75
76 fn on_http_response_error(&self, _uri: &Uri, _header: &Parts) {}
78
79 fn process_url(&self, _url: &Uri) -> Option<Uri> {
83 None
84 }
85
86 fn enrich_events(&mut self, _events: &mut Vec<Event>) {}
90}
91
92pub(crate) fn build_url(uri: &Uri, query: &QueryParameters) -> Uri {
94 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
95 if let Some(query) = uri.query() {
96 serializer.extend_pairs(url::form_urlencoded::parse(query.as_bytes()));
97 };
98 for (k, query_value) in query {
99 match query_value {
100 QueryParameterValue::SingleParam(param) => {
101 serializer.append_pair(k, param.value());
102 }
103 QueryParameterValue::MultiParams(params) => {
104 for v in params {
105 serializer.append_pair(k, v.value());
106 }
107 }
108 };
109 }
110 let mut builder = Uri::builder();
111 if let Some(scheme) = uri.scheme() {
112 builder = builder.scheme(scheme.clone());
113 };
114 if let Some(authority) = uri.authority() {
115 builder = builder.authority(authority.clone());
116 };
117 builder = builder.path_and_query(match serializer.finish() {
118 query if !query.is_empty() => format!("{}?{}", uri.path(), query),
119 _ => uri.path().to_string(),
120 });
121 builder
122 .build()
123 .expect("Failed to build URI from parsed arguments")
124}
125
126pub(crate) fn warn_if_interval_too_low(timeout: Duration, interval: Duration) {
128 if timeout > interval {
129 warn!(
130 interval_secs = %interval.as_secs_f64(),
131 timeout_secs = %timeout.as_secs_f64(),
132 message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
133 );
134 }
135}
136
137pub(crate) async fn call<
142 B: HttpClientBuilder<Context = C> + Send + Clone,
143 C: HttpClientContext + Send,
144>(
145 inputs: GenericHttpClientInputs,
146 context_builder: B,
147 mut out: SourceSender,
148 http_method: HttpMethod,
149) -> Result<(), ()> {
150 let client =
153 HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed");
154 let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval))
155 .take_until(inputs.shutdown)
156 .map(move |_| stream::iter(inputs.urls.clone()))
157 .flatten()
158 .map(move |base_url| {
159 let client = client.clone();
160 let endpoint = base_url.to_string();
161
162 let context_builder = context_builder.clone();
163 let mut context = context_builder.build(&base_url);
164
165 let url = context.process_url(&base_url).unwrap_or(base_url);
167
168 let mut builder = match http_method {
169 HttpMethod::Head => Request::head(&url),
170 HttpMethod::Get => Request::get(&url),
171 HttpMethod::Post => Request::post(&url),
172 HttpMethod::Put => Request::put(&url),
173 HttpMethod::Patch => Request::patch(&url),
174 HttpMethod::Delete => Request::delete(&url),
175 HttpMethod::Options => Request::options(&url),
176 };
177
178 for (header, values) in &inputs.headers {
180 for value in values {
181 builder = builder.header(header, value);
182 }
183 }
184
185 if !inputs.headers.contains_key(http::header::ACCEPT.as_str()) {
187 builder = builder.header(http::header::ACCEPT, &inputs.content_type);
188 }
189
190 let mut request = builder.body(Body::empty()).expect("error creating request");
192
193 if let Some(auth) = &inputs.auth {
194 auth.apply(&mut request);
195 }
196
197 tokio::time::timeout(inputs.timeout, client.send(request))
198 .then(move |result| async move {
199 match result {
200 Ok(Ok(response)) => Ok(response),
201 Ok(Err(error)) => Err(error.into()),
202 Err(_) => Err(format!(
203 "Timeout error: request exceeded {}s",
204 inputs.timeout.as_secs_f64()
205 )
206 .into()),
207 }
208 })
209 .and_then(|response| async move {
210 let (header, body) = response.into_parts();
211 let body = hyper::body::to_bytes(body).await?;
212 emit!(EndpointBytesReceived {
213 byte_size: body.len(),
214 protocol: "http",
215 endpoint: endpoint.as_str(),
216 });
217 Ok((header, body))
218 })
219 .into_stream()
220 .filter_map(move |response| {
221 ready(match response {
222 Ok((header, body)) if header.status == hyper::StatusCode::OK => {
223 context.on_response(&url, &header, &body).map(|mut events| {
224 let byte_size = if events.is_empty() {
225 JsonSize::zero()
234 } else {
235 events.estimated_json_encoded_size_of()
236 };
237
238 emit!(HttpClientEventsReceived {
239 byte_size,
240 count: events.len(),
241 url: url.to_string()
242 });
243
244 context.enrich_events(&mut events);
247
248 stream::iter(events)
249 })
250 }
251 Ok((header, _)) => {
252 context.on_http_response_error(&url, &header);
253 emit!(HttpClientHttpResponseError {
254 code: header.status,
255 url: url.to_string(),
256 });
257 None
258 }
259 Err(error) => {
260 emit!(HttpClientHttpError {
261 error,
262 url: url.to_string()
263 });
264 None
265 }
266 })
267 })
268 .flatten()
269 .boxed()
270 })
271 .flatten_unordered(None)
272 .boxed();
273
274 match out.send_event_stream(&mut stream).await {
275 Ok(()) => {
276 debug!("Finished sending.");
277 Ok(())
278 }
279 Err(_) => {
280 let (count, _) = stream.size_hint();
281 emit!(StreamClosedError { count });
282 Err(())
283 }
284 }
285}