vector/internal_events/
http.rs1use std::{error::Error, time::Duration};
2
3use http::Response;
4use metrics::{counter, histogram};
5use vector_lib::internal_event::InternalEvent;
6use vector_lib::{
7 internal_event::{error_stage, error_type},
8 json_size::JsonSize,
9};
10
11const HTTP_STATUS_LABEL: &str = "status";
12
13#[derive(Debug)]
14pub struct HttpServerRequestReceived;
15
16impl InternalEvent for HttpServerRequestReceived {
17 fn emit(self) {
18 debug!(
19 message = "Received HTTP request.",
20 internal_log_rate_limit = true
21 );
22 counter!("http_server_requests_received_total").increment(1);
23 }
24}
25
26#[derive(Debug)]
27pub struct HttpServerResponseSent<'a, B> {
28 pub response: &'a Response<B>,
29 pub latency: Duration,
30}
31
32impl<B> InternalEvent for HttpServerResponseSent<'_, B> {
33 fn emit(self) {
34 let labels = &[(
35 HTTP_STATUS_LABEL,
36 self.response.status().as_u16().to_string(),
37 )];
38 counter!("http_server_responses_sent_total", labels).increment(1);
39 histogram!("http_server_handler_duration_seconds", labels).record(self.latency);
40 }
41}
42
43#[derive(Debug)]
44pub struct HttpBytesReceived<'a> {
45 pub byte_size: usize,
46 pub http_path: &'a str,
47 pub protocol: &'static str,
48}
49
50impl InternalEvent for HttpBytesReceived<'_> {
51 fn emit(self) {
52 trace!(
53 message = "Bytes received.",
54 byte_size = %self.byte_size,
55 http_path = %self.http_path,
56 protocol = %self.protocol
57 );
58 counter!(
59 "component_received_bytes_total",
60 "http_path" => self.http_path.to_string(),
61 "protocol" => self.protocol,
62 )
63 .increment(self.byte_size as u64);
64 }
65}
66
67#[derive(Debug)]
68pub struct HttpEventsReceived<'a> {
69 pub count: usize,
70 pub byte_size: JsonSize,
71 pub http_path: &'a str,
72 pub protocol: &'static str,
73}
74
75impl InternalEvent for HttpEventsReceived<'_> {
76 fn emit(self) {
77 trace!(
78 message = "Events received.",
79 count = %self.count,
80 byte_size = %self.byte_size,
81 http_path = %self.http_path,
82 protocol = %self.protocol,
83 );
84
85 histogram!("component_received_events_count").record(self.count as f64);
86 counter!(
87 "component_received_events_total",
88 "http_path" => self.http_path.to_string(),
89 "protocol" => self.protocol,
90 )
91 .increment(self.count as u64);
92 counter!(
93 "component_received_event_bytes_total",
94 "http_path" => self.http_path.to_string(),
95 "protocol" => self.protocol,
96 )
97 .increment(self.byte_size.get() as u64);
98 }
99}
100
101#[cfg(feature = "sources-utils-http")]
102#[derive(Debug)]
103pub struct HttpBadRequest<'a> {
104 code: u16,
105 error_code: String,
106 message: &'a str,
107}
108
109#[cfg(feature = "sources-utils-http")]
110impl<'a> HttpBadRequest<'a> {
111 pub fn new(code: u16, message: &'a str) -> Self {
112 Self {
113 code,
114 error_code: super::prelude::http_error_code(code),
115 message,
116 }
117 }
118}
119
120#[cfg(feature = "sources-utils-http")]
121impl InternalEvent for HttpBadRequest<'_> {
122 fn emit(self) {
123 warn!(
124 message = "Received bad request.",
125 error = %self.message,
126 error_code = %self.error_code,
127 error_type = error_type::REQUEST_FAILED,
128 error_stage = error_stage::RECEIVING,
129 http_code = %self.code,
130 internal_log_rate_limit = true,
131 );
132 counter!(
133 "component_errors_total",
134 "error_code" => self.error_code,
135 "error_type" => error_type::REQUEST_FAILED,
136 "error_stage" => error_stage::RECEIVING,
137 )
138 .increment(1);
139 }
140}
141
142#[derive(Debug)]
143pub struct HttpDecompressError<'a> {
144 pub error: &'a dyn Error,
145 pub encoding: &'a str,
146}
147
148impl InternalEvent for HttpDecompressError<'_> {
149 fn emit(self) {
150 error!(
151 message = "Failed decompressing payload.",
152 error = %self.error,
153 error_code = "failed_decompressing_payload",
154 error_type = error_type::PARSER_FAILED,
155 stage = error_stage::RECEIVING,
156 encoding = %self.encoding,
157 internal_log_rate_limit = true
158 );
159 counter!(
160 "component_errors_total",
161 "error_code" => "failed_decompressing_payload",
162 "error_type" => error_type::PARSER_FAILED,
163 "stage" => error_stage::RECEIVING,
164 )
165 .increment(1);
166 }
167}
168
169pub struct HttpInternalError<'a> {
170 pub message: &'a str,
171}
172
173impl InternalEvent for HttpInternalError<'_> {
174 fn emit(self) {
175 error!(
176 message = %self.message,
177 error_type = error_type::CONNECTION_FAILED,
178 stage = error_stage::RECEIVING,
179 internal_log_rate_limit = true
180 );
181 counter!(
182 "component_errors_total",
183 "error_type" => error_type::CONNECTION_FAILED,
184 "stage" => error_stage::RECEIVING,
185 )
186 .increment(1);
187 }
188}