1#![cfg_attr(feature = "sources-okta", allow(dead_code))]
13
14use std::{collections::HashMap, future::ready, time::Duration};
15
16use bytes::Bytes;
17use futures_util::{FutureExt, StreamExt, TryFutureExt, stream};
18use http::{Uri, response::Parts};
19use hyper::{Body, Request};
20use tokio_stream::wrappers::IntervalStream;
21use vector_lib::{
22 EstimatedJsonEncodedSizeOf, config::proxy::ProxyConfig, event::Event, json_size::JsonSize,
23 shutdown::ShutdownSignal,
24};
25
26use crate::{
27 SourceSender,
28 http::{Auth, HttpClient, QueryParameterValue, QueryParameters},
29 internal_events::{
30 EndpointBytesReceived, HttpClientEventsReceived, HttpClientHttpError,
31 HttpClientHttpResponseError, StreamClosedError,
32 },
33 sources::util::http::HttpMethod,
34 tls::TlsSettings,
35};
36
37pub(crate) struct GenericHttpClientInputs {
39 pub urls: Vec<Uri>,
41 pub interval: Duration,
43 pub timeout: Duration,
45 pub headers: HashMap<String, Vec<String>>,
47 pub content_type: String,
49 pub auth: Option<Auth>,
50 pub tls: TlsSettings,
51 pub proxy: ProxyConfig,
52 pub shutdown: ShutdownSignal,
53}
54
55pub(crate) const fn default_interval() -> Duration {
57 Duration::from_secs(15)
58}
59
60pub(crate) const fn default_timeout() -> Duration {
62 Duration::from_secs(5)
63}
64
65pub(crate) trait HttpClientBuilder {
68 type Context: HttpClientContext;
69
70 fn build(&self, url: &Uri) -> Self::Context;
72}
73
74pub(crate) trait HttpClientContext {
76 fn on_response(&mut self, url: &Uri, header: &Parts, body: &Bytes) -> Option<Vec<Event>>;
78
79 fn on_http_response_error(&self, _uri: &Uri, _header: &Parts) {}
81
82 fn process_url(&self, _url: &Uri) -> Option<Uri> {
86 None
87 }
88
89 fn get_request_body(&self) -> Option<String> {
92 None
93 }
94
95 fn enrich_events(&mut self, _events: &mut Vec<Event>) {}
99}
100
101pub(crate) fn build_url(uri: &Uri, query: &QueryParameters) -> Uri {
103 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
104 if let Some(query) = uri.query() {
105 serializer.extend_pairs(url::form_urlencoded::parse(query.as_bytes()));
106 };
107 for (k, query_value) in query {
108 match query_value {
109 QueryParameterValue::SingleParam(param) => {
110 serializer.append_pair(k, param.value());
111 }
112 QueryParameterValue::MultiParams(params) => {
113 for v in params {
114 serializer.append_pair(k, v.value());
115 }
116 }
117 };
118 }
119 let mut builder = Uri::builder();
120 if let Some(scheme) = uri.scheme() {
121 builder = builder.scheme(scheme.clone());
122 };
123 if let Some(authority) = uri.authority() {
124 builder = builder.authority(authority.clone());
125 };
126 builder = builder.path_and_query(match serializer.finish() {
127 query if !query.is_empty() => format!("{}?{}", uri.path(), query),
128 _ => uri.path().to_string(),
129 });
130 builder
131 .build()
132 .expect("Failed to build URI from parsed arguments")
133}
134
135pub(crate) fn warn_if_interval_too_low(timeout: Duration, interval: Duration) {
137 if timeout > interval {
138 warn!(
139 interval_secs = %interval.as_secs_f64(),
140 timeout_secs = %timeout.as_secs_f64(),
141 message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
142 );
143 }
144}
145
146pub(crate) async fn call<
151 B: HttpClientBuilder<Context = C> + Send + Clone,
152 C: HttpClientContext + Send,
153>(
154 inputs: GenericHttpClientInputs,
155 context_builder: B,
156 mut out: SourceSender,
157 http_method: HttpMethod,
158) -> Result<(), ()> {
159 let client =
162 HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed");
163 let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval))
164 .take_until(inputs.shutdown)
165 .map(move |_| stream::iter(inputs.urls.clone()))
166 .flatten()
167 .map(move |base_url| {
168 let client = client.clone();
169 let endpoint = base_url.to_string();
170
171 let context_builder = context_builder.clone();
172 let mut context = context_builder.build(&base_url);
173
174 let url = context.process_url(&base_url).unwrap_or(base_url);
176
177 let mut builder = match http_method {
178 HttpMethod::Head => Request::head(&url),
179 HttpMethod::Get => Request::get(&url),
180 HttpMethod::Post => Request::post(&url),
181 HttpMethod::Put => Request::put(&url),
182 HttpMethod::Patch => Request::patch(&url),
183 HttpMethod::Delete => Request::delete(&url),
184 HttpMethod::Options => Request::options(&url),
185 };
186
187 for (header, values) in &inputs.headers {
189 for value in values {
190 builder = builder.header(header, value);
191 }
192 }
193
194 if !inputs.headers.contains_key(http::header::ACCEPT.as_str()) {
196 builder = builder.header(http::header::ACCEPT, &inputs.content_type);
197 }
198
199 let body = match context.get_request_body() {
201 Some(body_str) => {
202 if !inputs
204 .headers
205 .contains_key(http::header::CONTENT_TYPE.as_str())
206 {
207 builder = builder.header(http::header::CONTENT_TYPE, "application/json");
208 }
209 Body::from(body_str)
210 }
211 None => Body::empty(),
212 };
213
214 let mut request = builder.body(body).expect("error creating request");
216
217 if let Some(auth) = &inputs.auth {
218 auth.apply(&mut request);
219 }
220
221 tokio::time::timeout(inputs.timeout, client.send(request))
222 .then(move |result| async move {
223 match result {
224 Ok(Ok(response)) => Ok(response),
225 Ok(Err(error)) => Err(error.into()),
226 Err(_) => Err(format!(
227 "Timeout error: request exceeded {}s",
228 inputs.timeout.as_secs_f64()
229 )
230 .into()),
231 }
232 })
233 .and_then(|response| async move {
234 let (header, body) = response.into_parts();
235 let body = http_body::Body::collect(body).await?.to_bytes();
236 emit!(EndpointBytesReceived {
237 byte_size: body.len(),
238 protocol: "http",
239 endpoint: endpoint.as_str(),
240 });
241 Ok((header, body))
242 })
243 .into_stream()
244 .filter_map(move |response| {
245 ready(match response {
246 Ok((header, body)) if header.status == hyper::StatusCode::OK => {
247 context.on_response(&url, &header, &body).map(|mut events| {
248 let byte_size = if events.is_empty() {
249 JsonSize::zero()
258 } else {
259 events.estimated_json_encoded_size_of()
260 };
261
262 emit!(HttpClientEventsReceived {
263 byte_size,
264 count: events.len(),
265 url: url.to_string()
266 });
267
268 context.enrich_events(&mut events);
271
272 stream::iter(events)
273 })
274 }
275 Ok((header, _)) => {
276 context.on_http_response_error(&url, &header);
277 emit!(HttpClientHttpResponseError {
278 code: header.status,
279 url: url.to_string(),
280 });
281 None
282 }
283 Err(error) => {
284 emit!(HttpClientHttpError {
285 error,
286 url: url.to_string()
287 });
288 None
289 }
290 })
291 })
292 .flatten()
293 .boxed()
294 })
295 .flatten_unordered(None)
296 .boxed();
297
298 match out.send_event_stream(&mut stream).await {
299 Ok(()) => {
300 debug!("Finished sending.");
301 Ok(())
302 }
303 Err(_) => {
304 let (count, _) = stream.size_hint();
305 emit!(StreamClosedError { count });
306 Err(())
307 }
308 }
309}