vector/internal_events/
http_client.rs

1use std::time::Duration;
2
3use http::{
4    header::{self, HeaderMap, HeaderValue},
5    Request, Response,
6};
7use hyper::{body::HttpBody, Error};
8use metrics::{counter, histogram};
9use vector_lib::internal_event::InternalEvent;
10use vector_lib::internal_event::{error_stage, error_type};
11
12#[derive(Debug)]
13pub struct AboutToSendHttpRequest<'a, T> {
14    pub request: &'a Request<T>,
15}
16
17fn remove_sensitive(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
18    let mut headers = headers.clone();
19    for name in &[
20        header::AUTHORIZATION,
21        header::PROXY_AUTHORIZATION,
22        header::COOKIE,
23        header::SET_COOKIE,
24    ] {
25        if let Some(value) = headers.get_mut(name) {
26            value.set_sensitive(true);
27        }
28    }
29    headers
30}
31
32impl<T: HttpBody> InternalEvent for AboutToSendHttpRequest<'_, T> {
33    fn emit(self) {
34        debug!(
35            message = "Sending HTTP request.",
36            uri = %self.request.uri(),
37            method = %self.request.method(),
38            version = ?self.request.version(),
39            headers = ?remove_sensitive(self.request.headers()),
40            body = %FormatBody(self.request.body()),
41        );
42        counter!("http_client_requests_sent_total", "method" => self.request.method().to_string())
43            .increment(1);
44    }
45}
46
47#[derive(Debug)]
48pub struct GotHttpResponse<'a, T> {
49    pub response: &'a Response<T>,
50    pub roundtrip: Duration,
51}
52
53impl<T: HttpBody> InternalEvent for GotHttpResponse<'_, T> {
54    fn emit(self) {
55        debug!(
56            message = "HTTP response.",
57            status = %self.response.status(),
58            version = ?self.response.version(),
59            headers = ?remove_sensitive(self.response.headers()),
60            body = %FormatBody(self.response.body()),
61        );
62        counter!(
63            "http_client_responses_total",
64            "status" => self.response.status().as_u16().to_string(),
65        )
66        .increment(1);
67        histogram!("http_client_rtt_seconds").record(self.roundtrip);
68        histogram!(
69            "http_client_response_rtt_seconds",
70            "status" => self.response.status().as_u16().to_string(),
71        )
72        .record(self.roundtrip);
73    }
74}
75
76#[derive(Debug)]
77pub struct GotHttpWarning<'a> {
78    pub error: &'a Error,
79    pub roundtrip: Duration,
80}
81
82impl InternalEvent for GotHttpWarning<'_> {
83    fn emit(self) {
84        warn!(
85            message = "HTTP error.",
86            error = %self.error,
87            error_type = error_type::REQUEST_FAILED,
88            stage = error_stage::PROCESSING,
89            internal_log_rate_limit = true,
90        );
91        counter!("http_client_errors_total", "error_kind" => self.error.to_string()).increment(1);
92        histogram!("http_client_rtt_seconds").record(self.roundtrip);
93        histogram!("http_client_error_rtt_seconds", "error_kind" => self.error.to_string())
94            .record(self.roundtrip);
95    }
96}
97
98/// Newtype placeholder to provide a formatter for the request and response body.
99struct FormatBody<'a, B>(&'a B);
100
101impl<B: HttpBody> std::fmt::Display for FormatBody<'_, B> {
102    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
103        let size = self.0.size_hint();
104        match (size.lower(), size.upper()) {
105            (0, None) => write!(fmt, "[unknown]"),
106            (lower, None) => write!(fmt, "[>={lower} bytes]"),
107
108            (0, Some(0)) => write!(fmt, "[empty]"),
109            (0, Some(upper)) => write!(fmt, "[<={upper} bytes]"),
110
111            (lower, Some(upper)) if lower == upper => write!(fmt, "[{lower} bytes]"),
112            (lower, Some(upper)) => write!(fmt, "[{lower}..={upper} bytes]"),
113        }
114    }
115}