vector/internal_events/
http_client.rs1use std::time::Duration;
2
3use http::{
4 Request, Response,
5 header::{self, HeaderMap, HeaderValue},
6};
7use hyper::{Error, body::HttpBody};
8use metrics::{counter, histogram};
9use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
10
11#[derive(Debug)]
12pub struct AboutToSendHttpRequest<'a, T> {
13 pub request: &'a Request<T>,
14}
15
16fn remove_sensitive(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
17 let mut headers = headers.clone();
18 for name in &[
19 header::AUTHORIZATION,
20 header::PROXY_AUTHORIZATION,
21 header::COOKIE,
22 header::SET_COOKIE,
23 ] {
24 if let Some(value) = headers.get_mut(name) {
25 value.set_sensitive(true);
26 }
27 }
28 headers
29}
30
31impl<T: HttpBody> InternalEvent for AboutToSendHttpRequest<'_, T> {
32 fn emit(self) {
33 debug!(
34 message = "Sending HTTP request.",
35 uri = %self.request.uri(),
36 method = %self.request.method(),
37 version = ?self.request.version(),
38 headers = ?remove_sensitive(self.request.headers()),
39 body = %FormatBody(self.request.body()),
40 );
41 counter!("http_client_requests_sent_total", "method" => self.request.method().to_string())
42 .increment(1);
43 }
44}
45
46#[derive(Debug)]
47pub struct GotHttpResponse<'a, T> {
48 pub response: &'a Response<T>,
49 pub roundtrip: Duration,
50}
51
52impl<T: HttpBody> InternalEvent for GotHttpResponse<'_, T> {
53 fn emit(self) {
54 debug!(
55 message = "HTTP response.",
56 status = %self.response.status(),
57 version = ?self.response.version(),
58 headers = ?remove_sensitive(self.response.headers()),
59 body = %FormatBody(self.response.body()),
60 );
61 counter!(
62 "http_client_responses_total",
63 "status" => self.response.status().as_u16().to_string(),
64 )
65 .increment(1);
66 histogram!("http_client_rtt_seconds").record(self.roundtrip);
67 histogram!(
68 "http_client_response_rtt_seconds",
69 "status" => self.response.status().as_u16().to_string(),
70 )
71 .record(self.roundtrip);
72 }
73}
74
75#[derive(Debug)]
76pub struct GotHttpWarning<'a> {
77 pub error: &'a Error,
78 pub roundtrip: Duration,
79}
80
81impl InternalEvent for GotHttpWarning<'_> {
82 fn emit(self) {
83 warn!(
84 message = "HTTP error.",
85 error = %self.error,
86 error_type = error_type::REQUEST_FAILED,
87 stage = error_stage::PROCESSING,
88 );
89 counter!("http_client_errors_total", "error_kind" => self.error.to_string()).increment(1);
90 histogram!("http_client_rtt_seconds").record(self.roundtrip);
91 histogram!("http_client_error_rtt_seconds", "error_kind" => self.error.to_string())
92 .record(self.roundtrip);
93 }
94}
95
96struct FormatBody<'a, B>(&'a B);
98
99impl<B: HttpBody> std::fmt::Display for FormatBody<'_, B> {
100 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
101 let size = self.0.size_hint();
102 match (size.lower(), size.upper()) {
103 (0, None) => write!(fmt, "[unknown]"),
104 (lower, None) => write!(fmt, "[>={lower} bytes]"),
105
106 (0, Some(0)) => write!(fmt, "[empty]"),
107 (0, Some(upper)) => write!(fmt, "[<={upper} bytes]"),
108
109 (lower, Some(upper)) if lower == upper => write!(fmt, "[{lower} bytes]"),
110 (lower, Some(upper)) => write!(fmt, "[{lower}..={upper} bytes]"),
111 }
112 }
113}