vector/internal_events/
http.rs

1use std::{error::Error, time::Duration};
2
3use http::Response;
4use metrics::{counter, histogram};
5use vector_lib::internal_event::InternalEvent;
6use vector_lib::{
7    internal_event::{error_stage, error_type},
8    json_size::JsonSize,
9};
10
11const HTTP_STATUS_LABEL: &str = "status";
12
13#[derive(Debug)]
14pub struct HttpServerRequestReceived;
15
16impl InternalEvent for HttpServerRequestReceived {
17    fn emit(self) {
18        debug!(
19            message = "Received HTTP request.",
20            internal_log_rate_limit = true
21        );
22        counter!("http_server_requests_received_total").increment(1);
23    }
24}
25
26#[derive(Debug)]
27pub struct HttpServerResponseSent<'a, B> {
28    pub response: &'a Response<B>,
29    pub latency: Duration,
30}
31
32impl<B> InternalEvent for HttpServerResponseSent<'_, B> {
33    fn emit(self) {
34        let labels = &[(
35            HTTP_STATUS_LABEL,
36            self.response.status().as_u16().to_string(),
37        )];
38        counter!("http_server_responses_sent_total", labels).increment(1);
39        histogram!("http_server_handler_duration_seconds", labels).record(self.latency);
40    }
41}
42
43#[derive(Debug)]
44pub struct HttpBytesReceived<'a> {
45    pub byte_size: usize,
46    pub http_path: &'a str,
47    pub protocol: &'static str,
48}
49
50impl InternalEvent for HttpBytesReceived<'_> {
51    fn emit(self) {
52        trace!(
53            message = "Bytes received.",
54            byte_size = %self.byte_size,
55            http_path = %self.http_path,
56            protocol = %self.protocol
57        );
58        counter!(
59            "component_received_bytes_total",
60            "http_path" => self.http_path.to_string(),
61            "protocol" => self.protocol,
62        )
63        .increment(self.byte_size as u64);
64    }
65}
66
67#[derive(Debug)]
68pub struct HttpEventsReceived<'a> {
69    pub count: usize,
70    pub byte_size: JsonSize,
71    pub http_path: &'a str,
72    pub protocol: &'static str,
73}
74
75impl InternalEvent for HttpEventsReceived<'_> {
76    fn emit(self) {
77        trace!(
78            message = "Events received.",
79            count = %self.count,
80            byte_size = %self.byte_size,
81            http_path = %self.http_path,
82            protocol = %self.protocol,
83        );
84
85        histogram!("component_received_events_count").record(self.count as f64);
86        counter!(
87            "component_received_events_total",
88            "http_path" => self.http_path.to_string(),
89            "protocol" => self.protocol,
90        )
91        .increment(self.count as u64);
92        counter!(
93            "component_received_event_bytes_total",
94            "http_path" => self.http_path.to_string(),
95            "protocol" => self.protocol,
96        )
97        .increment(self.byte_size.get() as u64);
98    }
99}
100
101#[cfg(feature = "sources-utils-http")]
102#[derive(Debug)]
103pub struct HttpBadRequest<'a> {
104    code: u16,
105    error_code: String,
106    message: &'a str,
107}
108
109#[cfg(feature = "sources-utils-http")]
110impl<'a> HttpBadRequest<'a> {
111    pub fn new(code: u16, message: &'a str) -> Self {
112        Self {
113            code,
114            error_code: super::prelude::http_error_code(code),
115            message,
116        }
117    }
118}
119
120#[cfg(feature = "sources-utils-http")]
121impl InternalEvent for HttpBadRequest<'_> {
122    fn emit(self) {
123        warn!(
124            message = "Received bad request.",
125            error = %self.message,
126            error_code = %self.error_code,
127            error_type = error_type::REQUEST_FAILED,
128            error_stage = error_stage::RECEIVING,
129            http_code = %self.code,
130            internal_log_rate_limit = true,
131        );
132        counter!(
133            "component_errors_total",
134            "error_code" => self.error_code,
135            "error_type" => error_type::REQUEST_FAILED,
136            "error_stage" => error_stage::RECEIVING,
137        )
138        .increment(1);
139    }
140}
141
142#[derive(Debug)]
143pub struct HttpDecompressError<'a> {
144    pub error: &'a dyn Error,
145    pub encoding: &'a str,
146}
147
148impl InternalEvent for HttpDecompressError<'_> {
149    fn emit(self) {
150        error!(
151            message = "Failed decompressing payload.",
152            error = %self.error,
153            error_code = "failed_decompressing_payload",
154            error_type = error_type::PARSER_FAILED,
155            stage = error_stage::RECEIVING,
156            encoding = %self.encoding,
157            internal_log_rate_limit = true
158        );
159        counter!(
160            "component_errors_total",
161            "error_code" => "failed_decompressing_payload",
162            "error_type" => error_type::PARSER_FAILED,
163            "stage" => error_stage::RECEIVING,
164        )
165        .increment(1);
166    }
167}
168
169pub struct HttpInternalError<'a> {
170    pub message: &'a str,
171}
172
173impl InternalEvent for HttpInternalError<'_> {
174    fn emit(self) {
175        error!(
176            message = %self.message,
177            error_type = error_type::CONNECTION_FAILED,
178            stage = error_stage::RECEIVING,
179            internal_log_rate_limit = true
180        );
181        counter!(
182            "component_errors_total",
183            "error_type" => error_type::CONNECTION_FAILED,
184            "stage" => error_stage::RECEIVING,
185        )
186        .increment(1);
187    }
188}