1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::time::Duration;

use http::{
    header::{self, HeaderMap, HeaderValue},
    Request, Response,
};
use hyper::{body::HttpBody, Error};
use metrics::{counter, histogram};
use vector_lib::internal_event::InternalEvent;
use vector_lib::internal_event::{error_stage, error_type};

#[derive(Debug)]
pub struct AboutToSendHttpRequest<'a, T> {
    pub request: &'a Request<T>,
}

fn remove_sensitive(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
    let mut headers = headers.clone();
    for name in &[
        header::AUTHORIZATION,
        header::PROXY_AUTHORIZATION,
        header::COOKIE,
        header::SET_COOKIE,
    ] {
        if let Some(value) = headers.get_mut(name) {
            value.set_sensitive(true);
        }
    }
    headers
}

impl<'a, T: HttpBody> InternalEvent for AboutToSendHttpRequest<'a, T> {
    fn emit(self) {
        debug!(
            message = "Sending HTTP request.",
            uri = %self.request.uri(),
            method = %self.request.method(),
            version = ?self.request.version(),
            headers = ?remove_sensitive(self.request.headers()),
            body = %FormatBody(self.request.body()),
        );
        counter!("http_client_requests_sent_total", "method" => self.request.method().to_string())
            .increment(1);
    }
}

#[derive(Debug)]
pub struct GotHttpResponse<'a, T> {
    pub response: &'a Response<T>,
    pub roundtrip: Duration,
}

impl<'a, T: HttpBody> InternalEvent for GotHttpResponse<'a, T> {
    fn emit(self) {
        debug!(
            message = "HTTP response.",
            status = %self.response.status(),
            version = ?self.response.version(),
            headers = ?remove_sensitive(self.response.headers()),
            body = %FormatBody(self.response.body()),
        );
        counter!(
            "http_client_responses_total",
            "status" => self.response.status().as_u16().to_string(),
        )
        .increment(1);
        histogram!("http_client_rtt_seconds").record(self.roundtrip);
        histogram!(
            "http_client_response_rtt_seconds",
            "status" => self.response.status().as_u16().to_string(),
        )
        .record(self.roundtrip);
    }
}

#[derive(Debug)]
pub struct GotHttpWarning<'a> {
    pub error: &'a Error,
    pub roundtrip: Duration,
}

impl<'a> InternalEvent for GotHttpWarning<'a> {
    fn emit(self) {
        warn!(
            message = "HTTP error.",
            error = %self.error,
            error_type = error_type::REQUEST_FAILED,
            stage = error_stage::PROCESSING,
            internal_log_rate_limit = true,
        );
        counter!("http_client_errors_total", "error_kind" => self.error.to_string()).increment(1);
        histogram!("http_client_rtt_seconds").record(self.roundtrip);
        histogram!("http_client_error_rtt_seconds", "error_kind" => self.error.to_string())
            .record(self.roundtrip);
    }
}

/// Newtype placeholder to provide a formatter for the request and response body.
struct FormatBody<'a, B>(&'a B);

impl<'a, B: HttpBody> std::fmt::Display for FormatBody<'a, B> {
    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
        let size = self.0.size_hint();
        match (size.lower(), size.upper()) {
            (0, None) => write!(fmt, "[unknown]"),
            (lower, None) => write!(fmt, "[>={} bytes]", lower),

            (0, Some(0)) => write!(fmt, "[empty]"),
            (0, Some(upper)) => write!(fmt, "[<={} bytes]", upper),

            (lower, Some(upper)) if lower == upper => write!(fmt, "[{} bytes]", lower),
            (lower, Some(upper)) => write!(fmt, "[{}..={} bytes]", lower, upper),
        }
    }
}