vector/internal_events/
http.rs

1use std::{error::Error, time::Duration};
2
3use http::Response;
4use metrics::{counter, histogram};
5use vector_lib::{
6    internal_event::{InternalEvent, error_stage, error_type},
7    json_size::JsonSize,
8};
9
10const HTTP_STATUS_LABEL: &str = "status";
11
12#[derive(Debug)]
13pub struct HttpServerRequestReceived;
14
15impl InternalEvent for HttpServerRequestReceived {
16    fn emit(self) {
17        debug!(message = "Received HTTP request.");
18        counter!("http_server_requests_received_total").increment(1);
19    }
20}
21
22#[derive(Debug)]
23pub struct HttpServerResponseSent<'a, B> {
24    pub response: &'a Response<B>,
25    pub latency: Duration,
26}
27
28impl<B> InternalEvent for HttpServerResponseSent<'_, B> {
29    fn emit(self) {
30        let labels = &[(
31            HTTP_STATUS_LABEL,
32            self.response.status().as_u16().to_string(),
33        )];
34        counter!("http_server_responses_sent_total", labels).increment(1);
35        histogram!("http_server_handler_duration_seconds", labels).record(self.latency);
36    }
37}
38
39#[derive(Debug)]
40pub struct HttpBytesReceived<'a> {
41    pub byte_size: usize,
42    pub http_path: &'a str,
43    pub protocol: &'static str,
44}
45
46impl InternalEvent for HttpBytesReceived<'_> {
47    fn emit(self) {
48        trace!(
49            message = "Bytes received.",
50            byte_size = %self.byte_size,
51            http_path = %self.http_path,
52            protocol = %self.protocol
53        );
54        counter!(
55            "component_received_bytes_total",
56            "http_path" => self.http_path.to_string(),
57            "protocol" => self.protocol,
58        )
59        .increment(self.byte_size as u64);
60    }
61}
62
63#[derive(Debug)]
64pub struct HttpEventsReceived<'a> {
65    pub count: usize,
66    pub byte_size: JsonSize,
67    pub http_path: &'a str,
68    pub protocol: &'static str,
69}
70
71impl InternalEvent for HttpEventsReceived<'_> {
72    fn emit(self) {
73        trace!(
74            message = "Events received.",
75            count = %self.count,
76            byte_size = %self.byte_size,
77            http_path = %self.http_path,
78            protocol = %self.protocol,
79        );
80
81        histogram!("component_received_events_count").record(self.count as f64);
82        counter!(
83            "component_received_events_total",
84            "http_path" => self.http_path.to_string(),
85            "protocol" => self.protocol,
86        )
87        .increment(self.count as u64);
88        counter!(
89            "component_received_event_bytes_total",
90            "http_path" => self.http_path.to_string(),
91            "protocol" => self.protocol,
92        )
93        .increment(self.byte_size.get() as u64);
94    }
95}
96
97#[cfg(feature = "sources-utils-http")]
98#[derive(Debug)]
99pub struct HttpBadRequest<'a> {
100    code: u16,
101    error_code: String,
102    message: &'a str,
103}
104
105#[cfg(feature = "sources-utils-http")]
106impl<'a> HttpBadRequest<'a> {
107    pub fn new(code: u16, message: &'a str) -> Self {
108        Self {
109            code,
110            error_code: super::prelude::http_error_code(code),
111            message,
112        }
113    }
114}
115
116#[cfg(feature = "sources-utils-http")]
117impl InternalEvent for HttpBadRequest<'_> {
118    fn emit(self) {
119        warn!(
120            message = "Received bad request.",
121            error = %self.message,
122            error_code = %self.error_code,
123            error_type = error_type::REQUEST_FAILED,
124            error_stage = error_stage::RECEIVING,
125            http_code = %self.code,
126        );
127        counter!(
128            "component_errors_total",
129            "error_code" => self.error_code,
130            "error_type" => error_type::REQUEST_FAILED,
131            "error_stage" => error_stage::RECEIVING,
132        )
133        .increment(1);
134    }
135}
136
137#[derive(Debug)]
138pub struct HttpDecompressError<'a> {
139    pub error: &'a dyn Error,
140    pub encoding: &'a str,
141}
142
143impl InternalEvent for HttpDecompressError<'_> {
144    fn emit(self) {
145        error!(
146            message = "Failed decompressing payload.",
147            error = %self.error,
148            error_code = "failed_decompressing_payload",
149            error_type = error_type::PARSER_FAILED,
150            stage = error_stage::RECEIVING,
151            encoding = %self.encoding
152        );
153        counter!(
154            "component_errors_total",
155            "error_code" => "failed_decompressing_payload",
156            "error_type" => error_type::PARSER_FAILED,
157            "stage" => error_stage::RECEIVING,
158        )
159        .increment(1);
160    }
161}
162
163pub struct HttpInternalError<'a> {
164    pub message: &'a str,
165}
166
167impl InternalEvent for HttpInternalError<'_> {
168    fn emit(self) {
169        error!(
170            message = %self.message,
171            error_type = error_type::CONNECTION_FAILED,
172            stage = error_stage::RECEIVING
173        );
174        counter!(
175            "component_errors_total",
176            "error_type" => error_type::CONNECTION_FAILED,
177            "stage" => error_stage::RECEIVING,
178        )
179        .increment(1);
180    }
181}