vector/internal_events/
common.rs

1use std::time::Instant;
2
3use metrics::{counter, histogram};
4pub use vector_lib::internal_event::EventsReceived;
5use vector_lib::internal_event::InternalEvent;
6use vector_lib::internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL};
7
8#[derive(Debug)]
9pub struct EndpointBytesReceived<'a> {
10    pub byte_size: usize,
11    pub protocol: &'a str,
12    pub endpoint: &'a str,
13}
14
15impl InternalEvent for EndpointBytesReceived<'_> {
16    fn emit(self) {
17        trace!(
18            message = "Bytes received.",
19            byte_size = %self.byte_size,
20            protocol = %self.protocol,
21            endpoint = %self.endpoint,
22        );
23        counter!(
24            "component_received_bytes_total",
25            "protocol" => self.protocol.to_owned(),
26            "endpoint" => self.endpoint.to_owned(),
27        )
28        .increment(self.byte_size as u64);
29    }
30}
31
32#[derive(Debug)]
33pub struct EndpointBytesSent<'a> {
34    pub byte_size: usize,
35    pub protocol: &'a str,
36    pub endpoint: &'a str,
37}
38
39impl InternalEvent for EndpointBytesSent<'_> {
40    fn emit(self) {
41        trace!(
42            message = "Bytes sent.",
43            byte_size = %self.byte_size,
44            protocol = %self.protocol,
45            endpoint = %self.endpoint
46        );
47        counter!(
48            "component_sent_bytes_total",
49            "protocol" => self.protocol.to_string(),
50            "endpoint" => self.endpoint.to_string()
51        )
52        .increment(self.byte_size as u64);
53    }
54}
55
56#[derive(Debug)]
57pub struct SocketOutgoingConnectionError<E> {
58    pub error: E,
59}
60
61impl<E: std::error::Error> InternalEvent for SocketOutgoingConnectionError<E> {
62    fn emit(self) {
63        error!(
64            message = "Unable to connect.",
65            error = %self.error,
66            error_code = "failed_connecting",
67            error_type = error_type::CONNECTION_FAILED,
68            stage = error_stage::SENDING,
69            internal_log_rate_limit = true,
70        );
71        counter!(
72            "component_errors_total",
73            "error_code" => "failed_connecting",
74            "error_type" => error_type::CONNECTION_FAILED,
75            "stage" => error_stage::SENDING,
76        )
77        .increment(1);
78    }
79}
80
81const STREAM_CLOSED: &str = "stream_closed";
82
83#[derive(Debug)]
84pub struct StreamClosedError {
85    pub count: usize,
86}
87
88impl InternalEvent for StreamClosedError {
89    fn emit(self) {
90        error!(
91            message = "Failed to forward event(s), downstream is closed.",
92            error_code = STREAM_CLOSED,
93            error_type = error_type::WRITER_FAILED,
94            stage = error_stage::SENDING,
95            internal_log_rate_limit = true,
96        );
97        counter!(
98            "component_errors_total",
99            "error_code" => STREAM_CLOSED,
100            "error_type" => error_type::WRITER_FAILED,
101            "stage" => error_stage::SENDING,
102        )
103        .increment(1);
104        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
105            count: self.count,
106            reason: "Downstream is closed.",
107        });
108    }
109}
110
111#[derive(Debug)]
112pub struct CollectionCompleted {
113    pub start: Instant,
114    pub end: Instant,
115}
116
117impl InternalEvent for CollectionCompleted {
118    fn emit(self) {
119        debug!(message = "Collection completed.");
120        counter!("collect_completed_total").increment(1);
121        histogram!("collect_duration_seconds").record(self.end - self.start);
122    }
123}
124
125#[derive(Debug)]
126pub struct SinkRequestBuildError<E> {
127    pub error: E,
128}
129
130impl<E: std::fmt::Display> InternalEvent for SinkRequestBuildError<E> {
131    fn emit(self) {
132        // Providing the name of the sink with the build error is not necessary because the emitted log
133        // message contains the sink name in `component_type` field thanks to `tracing` spans. For example:
134        // "<timestamp> ERROR sink{component_kind="sink" component_id=sink0 component_type=aws_s3 component_name=sink0}: vector::internal_events::common: Failed to build request."
135        error!(
136            message = format!("Failed to build request."),
137            error = %self.error,
138            error_type = error_type::ENCODER_FAILED,
139            stage = error_stage::PROCESSING,
140            internal_log_rate_limit = true,
141        );
142        counter!(
143            "component_errors_total",
144            "error_type" => error_type::ENCODER_FAILED,
145            "stage" => error_stage::PROCESSING,
146        )
147        .increment(1);
148    }
149}