vector/internal_events/
http_client.rs

1use std::time::Duration;
2
3use http::{
4    Request, Response,
5    header::{self, HeaderMap, HeaderValue},
6};
7use hyper::{Error, body::HttpBody};
8use metrics::{counter, histogram};
9use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
10
11#[derive(Debug)]
12pub struct AboutToSendHttpRequest<'a, T> {
13    pub request: &'a Request<T>,
14}
15
16fn remove_sensitive(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
17    let mut headers = headers.clone();
18    for name in &[
19        header::AUTHORIZATION,
20        header::PROXY_AUTHORIZATION,
21        header::COOKIE,
22        header::SET_COOKIE,
23    ] {
24        if let Some(value) = headers.get_mut(name) {
25            value.set_sensitive(true);
26        }
27    }
28    headers
29}
30
31impl<T: HttpBody> InternalEvent for AboutToSendHttpRequest<'_, T> {
32    fn emit(self) {
33        debug!(
34            message = "Sending HTTP request.",
35            uri = %self.request.uri(),
36            method = %self.request.method(),
37            version = ?self.request.version(),
38            headers = ?remove_sensitive(self.request.headers()),
39            body = %FormatBody(self.request.body()),
40        );
41        counter!("http_client_requests_sent_total", "method" => self.request.method().to_string())
42            .increment(1);
43    }
44}
45
46#[derive(Debug)]
47pub struct GotHttpResponse<'a, T> {
48    pub response: &'a Response<T>,
49    pub roundtrip: Duration,
50}
51
52impl<T: HttpBody> InternalEvent for GotHttpResponse<'_, T> {
53    fn emit(self) {
54        debug!(
55            message = "HTTP response.",
56            status = %self.response.status(),
57            version = ?self.response.version(),
58            headers = ?remove_sensitive(self.response.headers()),
59            body = %FormatBody(self.response.body()),
60        );
61        counter!(
62            "http_client_responses_total",
63            "status" => self.response.status().as_u16().to_string(),
64        )
65        .increment(1);
66        histogram!("http_client_rtt_seconds").record(self.roundtrip);
67        histogram!(
68            "http_client_response_rtt_seconds",
69            "status" => self.response.status().as_u16().to_string(),
70        )
71        .record(self.roundtrip);
72    }
73}
74
75#[derive(Debug)]
76pub struct GotHttpWarning<'a> {
77    pub error: &'a Error,
78    pub roundtrip: Duration,
79}
80
81impl InternalEvent for GotHttpWarning<'_> {
82    fn emit(self) {
83        warn!(
84            message = "HTTP error.",
85            error = %self.error,
86            error_type = error_type::REQUEST_FAILED,
87            stage = error_stage::PROCESSING,
88        );
89        counter!("http_client_errors_total", "error_kind" => self.error.to_string()).increment(1);
90        histogram!("http_client_rtt_seconds").record(self.roundtrip);
91        histogram!("http_client_error_rtt_seconds", "error_kind" => self.error.to_string())
92            .record(self.roundtrip);
93    }
94}
95
96/// Newtype placeholder to provide a formatter for the request and response body.
97struct FormatBody<'a, B>(&'a B);
98
99impl<B: HttpBody> std::fmt::Display for FormatBody<'_, B> {
100    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
101        let size = self.0.size_hint();
102        match (size.lower(), size.upper()) {
103            (0, None) => write!(fmt, "[unknown]"),
104            (lower, None) => write!(fmt, "[>={lower} bytes]"),
105
106            (0, Some(0)) => write!(fmt, "[empty]"),
107            (0, Some(upper)) => write!(fmt, "[<={upper} bytes]"),
108
109            (lower, Some(upper)) if lower == upper => write!(fmt, "[{lower} bytes]"),
110            (lower, Some(upper)) => write!(fmt, "[{lower}..={upper} bytes]"),
111        }
112    }
113}