vector/internal_events/
http.rs1use std::{error::Error, time::Duration};
2
3use http::Response;
4use metrics::{counter, histogram};
5use vector_lib::{
6 internal_event::{InternalEvent, error_stage, error_type},
7 json_size::JsonSize,
8};
9
10const HTTP_STATUS_LABEL: &str = "status";
11
12#[derive(Debug)]
13pub struct HttpServerRequestReceived;
14
15impl InternalEvent for HttpServerRequestReceived {
16 fn emit(self) {
17 debug!(
18 message = "Received HTTP request.",
19 internal_log_rate_limit = true
20 );
21 counter!("http_server_requests_received_total").increment(1);
22 }
23}
24
25#[derive(Debug)]
26pub struct HttpServerResponseSent<'a, B> {
27 pub response: &'a Response<B>,
28 pub latency: Duration,
29}
30
31impl<B> InternalEvent for HttpServerResponseSent<'_, B> {
32 fn emit(self) {
33 let labels = &[(
34 HTTP_STATUS_LABEL,
35 self.response.status().as_u16().to_string(),
36 )];
37 counter!("http_server_responses_sent_total", labels).increment(1);
38 histogram!("http_server_handler_duration_seconds", labels).record(self.latency);
39 }
40}
41
42#[derive(Debug)]
43pub struct HttpBytesReceived<'a> {
44 pub byte_size: usize,
45 pub http_path: &'a str,
46 pub protocol: &'static str,
47}
48
49impl InternalEvent for HttpBytesReceived<'_> {
50 fn emit(self) {
51 trace!(
52 message = "Bytes received.",
53 byte_size = %self.byte_size,
54 http_path = %self.http_path,
55 protocol = %self.protocol
56 );
57 counter!(
58 "component_received_bytes_total",
59 "http_path" => self.http_path.to_string(),
60 "protocol" => self.protocol,
61 )
62 .increment(self.byte_size as u64);
63 }
64}
65
66#[derive(Debug)]
67pub struct HttpEventsReceived<'a> {
68 pub count: usize,
69 pub byte_size: JsonSize,
70 pub http_path: &'a str,
71 pub protocol: &'static str,
72}
73
74impl InternalEvent for HttpEventsReceived<'_> {
75 fn emit(self) {
76 trace!(
77 message = "Events received.",
78 count = %self.count,
79 byte_size = %self.byte_size,
80 http_path = %self.http_path,
81 protocol = %self.protocol,
82 );
83
84 histogram!("component_received_events_count").record(self.count as f64);
85 counter!(
86 "component_received_events_total",
87 "http_path" => self.http_path.to_string(),
88 "protocol" => self.protocol,
89 )
90 .increment(self.count as u64);
91 counter!(
92 "component_received_event_bytes_total",
93 "http_path" => self.http_path.to_string(),
94 "protocol" => self.protocol,
95 )
96 .increment(self.byte_size.get() as u64);
97 }
98}
99
100#[cfg(feature = "sources-utils-http")]
101#[derive(Debug)]
102pub struct HttpBadRequest<'a> {
103 code: u16,
104 error_code: String,
105 message: &'a str,
106}
107
108#[cfg(feature = "sources-utils-http")]
109impl<'a> HttpBadRequest<'a> {
110 pub fn new(code: u16, message: &'a str) -> Self {
111 Self {
112 code,
113 error_code: super::prelude::http_error_code(code),
114 message,
115 }
116 }
117}
118
119#[cfg(feature = "sources-utils-http")]
120impl InternalEvent for HttpBadRequest<'_> {
121 fn emit(self) {
122 warn!(
123 message = "Received bad request.",
124 error = %self.message,
125 error_code = %self.error_code,
126 error_type = error_type::REQUEST_FAILED,
127 error_stage = error_stage::RECEIVING,
128 http_code = %self.code,
129 internal_log_rate_limit = true,
130 );
131 counter!(
132 "component_errors_total",
133 "error_code" => self.error_code,
134 "error_type" => error_type::REQUEST_FAILED,
135 "error_stage" => error_stage::RECEIVING,
136 )
137 .increment(1);
138 }
139}
140
141#[derive(Debug)]
142pub struct HttpDecompressError<'a> {
143 pub error: &'a dyn Error,
144 pub encoding: &'a str,
145}
146
147impl InternalEvent for HttpDecompressError<'_> {
148 fn emit(self) {
149 error!(
150 message = "Failed decompressing payload.",
151 error = %self.error,
152 error_code = "failed_decompressing_payload",
153 error_type = error_type::PARSER_FAILED,
154 stage = error_stage::RECEIVING,
155 encoding = %self.encoding,
156 internal_log_rate_limit = true
157 );
158 counter!(
159 "component_errors_total",
160 "error_code" => "failed_decompressing_payload",
161 "error_type" => error_type::PARSER_FAILED,
162 "stage" => error_stage::RECEIVING,
163 )
164 .increment(1);
165 }
166}
167
168pub struct HttpInternalError<'a> {
169 pub message: &'a str,
170}
171
172impl InternalEvent for HttpInternalError<'_> {
173 fn emit(self) {
174 error!(
175 message = %self.message,
176 error_type = error_type::CONNECTION_FAILED,
177 stage = error_stage::RECEIVING,
178 internal_log_rate_limit = true
179 );
180 counter!(
181 "component_errors_total",
182 "error_type" => error_type::CONNECTION_FAILED,
183 "stage" => error_stage::RECEIVING,
184 )
185 .increment(1);
186 }
187}