vector/internal_events/
http.rs1use std::{error::Error, time::Duration};
2
3use http::Response;
4use metrics::{counter, histogram};
5use vector_lib::{
6 internal_event::{InternalEvent, error_stage, error_type},
7 json_size::JsonSize,
8};
9
10const HTTP_STATUS_LABEL: &str = "status";
11
12#[derive(Debug)]
13pub struct HttpServerRequestReceived;
14
15impl InternalEvent for HttpServerRequestReceived {
16 fn emit(self) {
17 debug!(message = "Received HTTP request.");
18 counter!("http_server_requests_received_total").increment(1);
19 }
20}
21
22#[derive(Debug)]
23pub struct HttpServerResponseSent<'a, B> {
24 pub response: &'a Response<B>,
25 pub latency: Duration,
26}
27
28impl<B> InternalEvent for HttpServerResponseSent<'_, B> {
29 fn emit(self) {
30 let labels = &[(
31 HTTP_STATUS_LABEL,
32 self.response.status().as_u16().to_string(),
33 )];
34 counter!("http_server_responses_sent_total", labels).increment(1);
35 histogram!("http_server_handler_duration_seconds", labels).record(self.latency);
36 }
37}
38
39#[derive(Debug)]
40pub struct HttpBytesReceived<'a> {
41 pub byte_size: usize,
42 pub http_path: &'a str,
43 pub protocol: &'static str,
44}
45
46impl InternalEvent for HttpBytesReceived<'_> {
47 fn emit(self) {
48 trace!(
49 message = "Bytes received.",
50 byte_size = %self.byte_size,
51 http_path = %self.http_path,
52 protocol = %self.protocol
53 );
54 counter!(
55 "component_received_bytes_total",
56 "http_path" => self.http_path.to_string(),
57 "protocol" => self.protocol,
58 )
59 .increment(self.byte_size as u64);
60 }
61}
62
63#[derive(Debug)]
64pub struct HttpEventsReceived<'a> {
65 pub count: usize,
66 pub byte_size: JsonSize,
67 pub http_path: &'a str,
68 pub protocol: &'static str,
69}
70
71impl InternalEvent for HttpEventsReceived<'_> {
72 fn emit(self) {
73 trace!(
74 message = "Events received.",
75 count = %self.count,
76 byte_size = %self.byte_size,
77 http_path = %self.http_path,
78 protocol = %self.protocol,
79 );
80
81 histogram!("component_received_events_count").record(self.count as f64);
82 counter!(
83 "component_received_events_total",
84 "http_path" => self.http_path.to_string(),
85 "protocol" => self.protocol,
86 )
87 .increment(self.count as u64);
88 counter!(
89 "component_received_event_bytes_total",
90 "http_path" => self.http_path.to_string(),
91 "protocol" => self.protocol,
92 )
93 .increment(self.byte_size.get() as u64);
94 }
95}
96
97#[cfg(feature = "sources-utils-http")]
98#[derive(Debug)]
99pub struct HttpBadRequest<'a> {
100 code: u16,
101 error_code: String,
102 message: &'a str,
103}
104
105#[cfg(feature = "sources-utils-http")]
106impl<'a> HttpBadRequest<'a> {
107 pub fn new(code: u16, message: &'a str) -> Self {
108 Self {
109 code,
110 error_code: super::prelude::http_error_code(code),
111 message,
112 }
113 }
114}
115
116#[cfg(feature = "sources-utils-http")]
117impl InternalEvent for HttpBadRequest<'_> {
118 fn emit(self) {
119 warn!(
120 message = "Received bad request.",
121 error = %self.message,
122 error_code = %self.error_code,
123 error_type = error_type::REQUEST_FAILED,
124 error_stage = error_stage::RECEIVING,
125 http_code = %self.code,
126 );
127 counter!(
128 "component_errors_total",
129 "error_code" => self.error_code,
130 "error_type" => error_type::REQUEST_FAILED,
131 "error_stage" => error_stage::RECEIVING,
132 )
133 .increment(1);
134 }
135}
136
137#[derive(Debug)]
138pub struct HttpDecompressError<'a> {
139 pub error: &'a dyn Error,
140 pub encoding: &'a str,
141}
142
143impl InternalEvent for HttpDecompressError<'_> {
144 fn emit(self) {
145 error!(
146 message = "Failed decompressing payload.",
147 error = %self.error,
148 error_code = "failed_decompressing_payload",
149 error_type = error_type::PARSER_FAILED,
150 stage = error_stage::RECEIVING,
151 encoding = %self.encoding
152 );
153 counter!(
154 "component_errors_total",
155 "error_code" => "failed_decompressing_payload",
156 "error_type" => error_type::PARSER_FAILED,
157 "stage" => error_stage::RECEIVING,
158 )
159 .increment(1);
160 }
161}
162
163pub struct HttpInternalError<'a> {
164 pub message: &'a str,
165}
166
167impl InternalEvent for HttpInternalError<'_> {
168 fn emit(self) {
169 error!(
170 message = %self.message,
171 error_type = error_type::CONNECTION_FAILED,
172 stage = error_stage::RECEIVING
173 );
174 counter!(
175 "component_errors_total",
176 "error_type" => error_type::CONNECTION_FAILED,
177 "stage" => error_stage::RECEIVING,
178 )
179 .increment(1);
180 }
181}