vector/internal_events/
http.rs

1use std::{error::Error, time::Duration};
2
3use http::Response;
4use metrics::{counter, histogram};
5use vector_lib::{
6    NamedInternalEvent,
7    internal_event::{InternalEvent, error_stage, error_type},
8    json_size::JsonSize,
9};
10
11const HTTP_STATUS_LABEL: &str = "status";
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct HttpServerRequestReceived;
15
16impl InternalEvent for HttpServerRequestReceived {
17    fn emit(self) {
18        debug!(message = "Received HTTP request.");
19        counter!("http_server_requests_received_total").increment(1);
20    }
21}
22
23#[derive(Debug, NamedInternalEvent)]
24pub struct HttpServerResponseSent<'a, B> {
25    pub response: &'a Response<B>,
26    pub latency: Duration,
27}
28
29impl<B> InternalEvent for HttpServerResponseSent<'_, B> {
30    fn emit(self) {
31        let labels = &[(
32            HTTP_STATUS_LABEL,
33            self.response.status().as_u16().to_string(),
34        )];
35        counter!("http_server_responses_sent_total", labels).increment(1);
36        histogram!("http_server_handler_duration_seconds", labels).record(self.latency);
37    }
38}
39
40#[derive(Debug, NamedInternalEvent)]
41pub struct HttpBytesReceived<'a> {
42    pub byte_size: usize,
43    pub http_path: &'a str,
44    pub protocol: &'static str,
45}
46
47impl InternalEvent for HttpBytesReceived<'_> {
48    fn emit(self) {
49        trace!(
50            message = "Bytes received.",
51            byte_size = %self.byte_size,
52            http_path = %self.http_path,
53            protocol = %self.protocol
54        );
55        counter!(
56            "component_received_bytes_total",
57            "http_path" => self.http_path.to_string(),
58            "protocol" => self.protocol,
59        )
60        .increment(self.byte_size as u64);
61    }
62}
63
64#[derive(Debug, NamedInternalEvent)]
65pub struct HttpEventsReceived<'a> {
66    pub count: usize,
67    pub byte_size: JsonSize,
68    pub http_path: &'a str,
69    pub protocol: &'static str,
70}
71
72impl InternalEvent for HttpEventsReceived<'_> {
73    fn emit(self) {
74        trace!(
75            message = "Events received.",
76            count = %self.count,
77            byte_size = %self.byte_size,
78            http_path = %self.http_path,
79            protocol = %self.protocol,
80        );
81
82        histogram!("component_received_events_count").record(self.count as f64);
83        counter!(
84            "component_received_events_total",
85            "http_path" => self.http_path.to_string(),
86            "protocol" => self.protocol,
87        )
88        .increment(self.count as u64);
89        counter!(
90            "component_received_event_bytes_total",
91            "http_path" => self.http_path.to_string(),
92            "protocol" => self.protocol,
93        )
94        .increment(self.byte_size.get() as u64);
95    }
96}
97
98#[cfg(feature = "sources-utils-http")]
99#[derive(Debug, NamedInternalEvent)]
100pub struct HttpBadRequest<'a> {
101    code: u16,
102    error_code: String,
103    message: &'a str,
104}
105
106#[cfg(feature = "sources-utils-http")]
107impl<'a> HttpBadRequest<'a> {
108    pub fn new(code: u16, message: &'a str) -> Self {
109        Self {
110            code,
111            error_code: super::prelude::http_error_code(code),
112            message,
113        }
114    }
115}
116
117#[cfg(feature = "sources-utils-http")]
118impl InternalEvent for HttpBadRequest<'_> {
119    fn emit(self) {
120        warn!(
121            message = "Received bad request.",
122            error = %self.message,
123            error_code = %self.error_code,
124            error_type = error_type::REQUEST_FAILED,
125            error_stage = error_stage::RECEIVING,
126            http_code = %self.code,
127        );
128        counter!(
129            "component_errors_total",
130            "error_code" => self.error_code,
131            "error_type" => error_type::REQUEST_FAILED,
132            "error_stage" => error_stage::RECEIVING,
133        )
134        .increment(1);
135    }
136}
137
138#[derive(Debug, NamedInternalEvent)]
139pub struct HttpDecompressError<'a> {
140    pub error: &'a dyn Error,
141    pub encoding: &'a str,
142}
143
144impl InternalEvent for HttpDecompressError<'_> {
145    fn emit(self) {
146        error!(
147            message = "Failed decompressing payload.",
148            error = %self.error,
149            error_code = "failed_decompressing_payload",
150            error_type = error_type::PARSER_FAILED,
151            stage = error_stage::RECEIVING,
152            encoding = %self.encoding
153        );
154        counter!(
155            "component_errors_total",
156            "error_code" => "failed_decompressing_payload",
157            "error_type" => error_type::PARSER_FAILED,
158            "stage" => error_stage::RECEIVING,
159        )
160        .increment(1);
161    }
162}
163
164#[derive(NamedInternalEvent)]
165pub struct HttpInternalError<'a> {
166    pub message: &'a str,
167}
168
169impl InternalEvent for HttpInternalError<'_> {
170    fn emit(self) {
171        error!(
172            message = %self.message,
173            error_type = error_type::CONNECTION_FAILED,
174            stage = error_stage::RECEIVING
175        );
176        counter!(
177            "component_errors_total",
178            "error_type" => error_type::CONNECTION_FAILED,
179            "stage" => error_stage::RECEIVING,
180        )
181        .increment(1);
182    }
183}