vector/internal_events/
common.rs

1use std::time::Instant;
2
3use metrics::{counter, histogram};
4pub use vector_lib::internal_event::EventsReceived;
5use vector_lib::internal_event::{
6    ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
7};
8
9#[derive(Debug)]
10pub struct EndpointBytesReceived<'a> {
11    pub byte_size: usize,
12    pub protocol: &'a str,
13    pub endpoint: &'a str,
14}
15
16impl InternalEvent for EndpointBytesReceived<'_> {
17    fn emit(self) {
18        trace!(
19            message = "Bytes received.",
20            byte_size = %self.byte_size,
21            protocol = %self.protocol,
22            endpoint = %self.endpoint,
23        );
24        counter!(
25            "component_received_bytes_total",
26            "protocol" => self.protocol.to_owned(),
27            "endpoint" => self.endpoint.to_owned(),
28        )
29        .increment(self.byte_size as u64);
30    }
31}
32
33#[derive(Debug)]
34pub struct EndpointBytesSent<'a> {
35    pub byte_size: usize,
36    pub protocol: &'a str,
37    pub endpoint: &'a str,
38}
39
40impl InternalEvent for EndpointBytesSent<'_> {
41    fn emit(self) {
42        trace!(
43            message = "Bytes sent.",
44            byte_size = %self.byte_size,
45            protocol = %self.protocol,
46            endpoint = %self.endpoint
47        );
48        counter!(
49            "component_sent_bytes_total",
50            "protocol" => self.protocol.to_string(),
51            "endpoint" => self.endpoint.to_string()
52        )
53        .increment(self.byte_size as u64);
54    }
55}
56
57#[derive(Debug)]
58pub struct SocketOutgoingConnectionError<E> {
59    pub error: E,
60}
61
62impl<E: std::error::Error> InternalEvent for SocketOutgoingConnectionError<E> {
63    fn emit(self) {
64        error!(
65            message = "Unable to connect.",
66            error = %self.error,
67            error_code = "failed_connecting",
68            error_type = error_type::CONNECTION_FAILED,
69            stage = error_stage::SENDING,
70        );
71        counter!(
72            "component_errors_total",
73            "error_code" => "failed_connecting",
74            "error_type" => error_type::CONNECTION_FAILED,
75            "stage" => error_stage::SENDING,
76        )
77        .increment(1);
78    }
79}
80
81const STREAM_CLOSED: &str = "stream_closed";
82
83#[derive(Debug)]
84pub struct StreamClosedError {
85    pub count: usize,
86}
87
88impl InternalEvent for StreamClosedError {
89    fn emit(self) {
90        error!(
91            message = "Failed to forward event(s), downstream is closed.",
92            error_code = STREAM_CLOSED,
93            error_type = error_type::WRITER_FAILED,
94            stage = error_stage::SENDING,
95        );
96        counter!(
97            "component_errors_total",
98            "error_code" => STREAM_CLOSED,
99            "error_type" => error_type::WRITER_FAILED,
100            "stage" => error_stage::SENDING,
101        )
102        .increment(1);
103        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
104            count: self.count,
105            reason: "Downstream is closed.",
106        });
107    }
108}
109
110#[derive(Debug)]
111pub struct CollectionCompleted {
112    pub start: Instant,
113    pub end: Instant,
114}
115
116impl InternalEvent for CollectionCompleted {
117    fn emit(self) {
118        debug!(message = "Collection completed.");
119        counter!("collect_completed_total").increment(1);
120        histogram!("collect_duration_seconds").record(self.end - self.start);
121    }
122}
123
124#[derive(Debug)]
125pub struct SinkRequestBuildError<E> {
126    pub error: E,
127}
128
129impl<E: std::fmt::Display> InternalEvent for SinkRequestBuildError<E> {
130    fn emit(self) {
131        // Providing the name of the sink with the build error is not necessary because the emitted log
132        // message contains the sink name in `component_type` field thanks to `tracing` spans. For example:
133        // "<timestamp> ERROR sink{component_kind="sink" component_id=sink0 component_type=aws_s3 component_name=sink0}: vector::internal_events::common: Failed to build request."
134        error!(
135            message = format!("Failed to build request."),
136            error = %self.error,
137            error_type = error_type::ENCODER_FAILED,
138            stage = error_stage::PROCESSING,
139        );
140        counter!(
141            "component_errors_total",
142            "error_type" => error_type::ENCODER_FAILED,
143            "stage" => error_stage::PROCESSING,
144        )
145        .increment(1);
146    }
147}