vector/internal_events/
http_client.rs1use std::time::Duration;
2
3use http::{
4 header::{self, HeaderMap, HeaderValue},
5 Request, Response,
6};
7use hyper::{body::HttpBody, Error};
8use metrics::{counter, histogram};
9use vector_lib::internal_event::InternalEvent;
10use vector_lib::internal_event::{error_stage, error_type};
11
12#[derive(Debug)]
13pub struct AboutToSendHttpRequest<'a, T> {
14 pub request: &'a Request<T>,
15}
16
17fn remove_sensitive(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
18 let mut headers = headers.clone();
19 for name in &[
20 header::AUTHORIZATION,
21 header::PROXY_AUTHORIZATION,
22 header::COOKIE,
23 header::SET_COOKIE,
24 ] {
25 if let Some(value) = headers.get_mut(name) {
26 value.set_sensitive(true);
27 }
28 }
29 headers
30}
31
32impl<T: HttpBody> InternalEvent for AboutToSendHttpRequest<'_, T> {
33 fn emit(self) {
34 debug!(
35 message = "Sending HTTP request.",
36 uri = %self.request.uri(),
37 method = %self.request.method(),
38 version = ?self.request.version(),
39 headers = ?remove_sensitive(self.request.headers()),
40 body = %FormatBody(self.request.body()),
41 );
42 counter!("http_client_requests_sent_total", "method" => self.request.method().to_string())
43 .increment(1);
44 }
45}
46
47#[derive(Debug)]
48pub struct GotHttpResponse<'a, T> {
49 pub response: &'a Response<T>,
50 pub roundtrip: Duration,
51}
52
53impl<T: HttpBody> InternalEvent for GotHttpResponse<'_, T> {
54 fn emit(self) {
55 debug!(
56 message = "HTTP response.",
57 status = %self.response.status(),
58 version = ?self.response.version(),
59 headers = ?remove_sensitive(self.response.headers()),
60 body = %FormatBody(self.response.body()),
61 );
62 counter!(
63 "http_client_responses_total",
64 "status" => self.response.status().as_u16().to_string(),
65 )
66 .increment(1);
67 histogram!("http_client_rtt_seconds").record(self.roundtrip);
68 histogram!(
69 "http_client_response_rtt_seconds",
70 "status" => self.response.status().as_u16().to_string(),
71 )
72 .record(self.roundtrip);
73 }
74}
75
76#[derive(Debug)]
77pub struct GotHttpWarning<'a> {
78 pub error: &'a Error,
79 pub roundtrip: Duration,
80}
81
82impl InternalEvent for GotHttpWarning<'_> {
83 fn emit(self) {
84 warn!(
85 message = "HTTP error.",
86 error = %self.error,
87 error_type = error_type::REQUEST_FAILED,
88 stage = error_stage::PROCESSING,
89 internal_log_rate_limit = true,
90 );
91 counter!("http_client_errors_total", "error_kind" => self.error.to_string()).increment(1);
92 histogram!("http_client_rtt_seconds").record(self.roundtrip);
93 histogram!("http_client_error_rtt_seconds", "error_kind" => self.error.to_string())
94 .record(self.roundtrip);
95 }
96}
97
98struct FormatBody<'a, B>(&'a B);
100
101impl<B: HttpBody> std::fmt::Display for FormatBody<'_, B> {
102 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
103 let size = self.0.size_hint();
104 match (size.lower(), size.upper()) {
105 (0, None) => write!(fmt, "[unknown]"),
106 (lower, None) => write!(fmt, "[>={lower} bytes]"),
107
108 (0, Some(0)) => write!(fmt, "[empty]"),
109 (0, Some(upper)) => write!(fmt, "[<={upper} bytes]"),
110
111 (lower, Some(upper)) if lower == upper => write!(fmt, "[{lower} bytes]"),
112 (lower, Some(upper)) => write!(fmt, "[{lower}..={upper} bytes]"),
113 }
114 }
115}