vector/internal_events/
http.rs

1use std::{error::Error, time::Duration};
2
3use http::Response;
4use metrics::{counter, histogram};
5use vector_lib::{
6    internal_event::{InternalEvent, error_stage, error_type},
7    json_size::JsonSize,
8};
9
10const HTTP_STATUS_LABEL: &str = "status";
11
12#[derive(Debug)]
13pub struct HttpServerRequestReceived;
14
15impl InternalEvent for HttpServerRequestReceived {
16    fn emit(self) {
17        debug!(
18            message = "Received HTTP request.",
19            internal_log_rate_limit = true
20        );
21        counter!("http_server_requests_received_total").increment(1);
22    }
23}
24
25#[derive(Debug)]
26pub struct HttpServerResponseSent<'a, B> {
27    pub response: &'a Response<B>,
28    pub latency: Duration,
29}
30
31impl<B> InternalEvent for HttpServerResponseSent<'_, B> {
32    fn emit(self) {
33        let labels = &[(
34            HTTP_STATUS_LABEL,
35            self.response.status().as_u16().to_string(),
36        )];
37        counter!("http_server_responses_sent_total", labels).increment(1);
38        histogram!("http_server_handler_duration_seconds", labels).record(self.latency);
39    }
40}
41
42#[derive(Debug)]
43pub struct HttpBytesReceived<'a> {
44    pub byte_size: usize,
45    pub http_path: &'a str,
46    pub protocol: &'static str,
47}
48
49impl InternalEvent for HttpBytesReceived<'_> {
50    fn emit(self) {
51        trace!(
52            message = "Bytes received.",
53            byte_size = %self.byte_size,
54            http_path = %self.http_path,
55            protocol = %self.protocol
56        );
57        counter!(
58            "component_received_bytes_total",
59            "http_path" => self.http_path.to_string(),
60            "protocol" => self.protocol,
61        )
62        .increment(self.byte_size as u64);
63    }
64}
65
66#[derive(Debug)]
67pub struct HttpEventsReceived<'a> {
68    pub count: usize,
69    pub byte_size: JsonSize,
70    pub http_path: &'a str,
71    pub protocol: &'static str,
72}
73
74impl InternalEvent for HttpEventsReceived<'_> {
75    fn emit(self) {
76        trace!(
77            message = "Events received.",
78            count = %self.count,
79            byte_size = %self.byte_size,
80            http_path = %self.http_path,
81            protocol = %self.protocol,
82        );
83
84        histogram!("component_received_events_count").record(self.count as f64);
85        counter!(
86            "component_received_events_total",
87            "http_path" => self.http_path.to_string(),
88            "protocol" => self.protocol,
89        )
90        .increment(self.count as u64);
91        counter!(
92            "component_received_event_bytes_total",
93            "http_path" => self.http_path.to_string(),
94            "protocol" => self.protocol,
95        )
96        .increment(self.byte_size.get() as u64);
97    }
98}
99
100#[cfg(feature = "sources-utils-http")]
101#[derive(Debug)]
102pub struct HttpBadRequest<'a> {
103    code: u16,
104    error_code: String,
105    message: &'a str,
106}
107
108#[cfg(feature = "sources-utils-http")]
109impl<'a> HttpBadRequest<'a> {
110    pub fn new(code: u16, message: &'a str) -> Self {
111        Self {
112            code,
113            error_code: super::prelude::http_error_code(code),
114            message,
115        }
116    }
117}
118
119#[cfg(feature = "sources-utils-http")]
120impl InternalEvent for HttpBadRequest<'_> {
121    fn emit(self) {
122        warn!(
123            message = "Received bad request.",
124            error = %self.message,
125            error_code = %self.error_code,
126            error_type = error_type::REQUEST_FAILED,
127            error_stage = error_stage::RECEIVING,
128            http_code = %self.code,
129            internal_log_rate_limit = true,
130        );
131        counter!(
132            "component_errors_total",
133            "error_code" => self.error_code,
134            "error_type" => error_type::REQUEST_FAILED,
135            "error_stage" => error_stage::RECEIVING,
136        )
137        .increment(1);
138    }
139}
140
141#[derive(Debug)]
142pub struct HttpDecompressError<'a> {
143    pub error: &'a dyn Error,
144    pub encoding: &'a str,
145}
146
147impl InternalEvent for HttpDecompressError<'_> {
148    fn emit(self) {
149        error!(
150            message = "Failed decompressing payload.",
151            error = %self.error,
152            error_code = "failed_decompressing_payload",
153            error_type = error_type::PARSER_FAILED,
154            stage = error_stage::RECEIVING,
155            encoding = %self.encoding,
156            internal_log_rate_limit = true
157        );
158        counter!(
159            "component_errors_total",
160            "error_code" => "failed_decompressing_payload",
161            "error_type" => error_type::PARSER_FAILED,
162            "stage" => error_stage::RECEIVING,
163        )
164        .increment(1);
165    }
166}
167
168pub struct HttpInternalError<'a> {
169    pub message: &'a str,
170}
171
172impl InternalEvent for HttpInternalError<'_> {
173    fn emit(self) {
174        error!(
175            message = %self.message,
176            error_type = error_type::CONNECTION_FAILED,
177            stage = error_stage::RECEIVING,
178            internal_log_rate_limit = true
179        );
180        counter!(
181            "component_errors_total",
182            "error_type" => error_type::CONNECTION_FAILED,
183            "stage" => error_stage::RECEIVING,
184        )
185        .increment(1);
186    }
187}