vector/internal_events/
common.rs

1use std::time::Instant;
2
3use metrics::{counter, histogram};
4use vector_lib::NamedInternalEvent;
5pub use vector_lib::internal_event::EventsReceived;
6use vector_lib::internal_event::{
7    ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8};
9
10#[derive(Debug, NamedInternalEvent)]
11pub struct EndpointBytesReceived<'a> {
12    pub byte_size: usize,
13    pub protocol: &'a str,
14    pub endpoint: &'a str,
15}
16
17impl InternalEvent for EndpointBytesReceived<'_> {
18    fn emit(self) {
19        trace!(
20            message = "Bytes received.",
21            byte_size = %self.byte_size,
22            protocol = %self.protocol,
23            endpoint = %self.endpoint,
24        );
25        counter!(
26            "component_received_bytes_total",
27            "protocol" => self.protocol.to_owned(),
28            "endpoint" => self.endpoint.to_owned(),
29        )
30        .increment(self.byte_size as u64);
31    }
32}
33
34#[derive(Debug, NamedInternalEvent)]
35pub struct EndpointBytesSent<'a> {
36    pub byte_size: usize,
37    pub protocol: &'a str,
38    pub endpoint: &'a str,
39}
40
41impl InternalEvent for EndpointBytesSent<'_> {
42    fn emit(self) {
43        trace!(
44            message = "Bytes sent.",
45            byte_size = %self.byte_size,
46            protocol = %self.protocol,
47            endpoint = %self.endpoint
48        );
49        counter!(
50            "component_sent_bytes_total",
51            "protocol" => self.protocol.to_string(),
52            "endpoint" => self.endpoint.to_string()
53        )
54        .increment(self.byte_size as u64);
55    }
56}
57
58#[derive(Debug, NamedInternalEvent)]
59pub struct SocketOutgoingConnectionError<E> {
60    pub error: E,
61}
62
63impl<E: std::error::Error> InternalEvent for SocketOutgoingConnectionError<E> {
64    fn emit(self) {
65        error!(
66            message = "Unable to connect.",
67            error = %self.error,
68            error_code = "failed_connecting",
69            error_type = error_type::CONNECTION_FAILED,
70            stage = error_stage::SENDING,
71        );
72        counter!(
73            "component_errors_total",
74            "error_code" => "failed_connecting",
75            "error_type" => error_type::CONNECTION_FAILED,
76            "stage" => error_stage::SENDING,
77        )
78        .increment(1);
79    }
80}
81
82const STREAM_CLOSED: &str = "stream_closed";
83
84#[derive(Debug, NamedInternalEvent)]
85pub struct StreamClosedError {
86    pub count: usize,
87}
88
89impl InternalEvent for StreamClosedError {
90    fn emit(self) {
91        error!(
92            message = "Failed to forward event(s), downstream is closed.",
93            error_code = STREAM_CLOSED,
94            error_type = error_type::WRITER_FAILED,
95            stage = error_stage::SENDING,
96        );
97        counter!(
98            "component_errors_total",
99            "error_code" => STREAM_CLOSED,
100            "error_type" => error_type::WRITER_FAILED,
101            "stage" => error_stage::SENDING,
102        )
103        .increment(1);
104        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
105            count: self.count,
106            reason: "Downstream is closed.",
107        });
108    }
109}
110
111#[derive(Debug, NamedInternalEvent)]
112pub struct CollectionCompleted {
113    pub start: Instant,
114    pub end: Instant,
115}
116
117impl InternalEvent for CollectionCompleted {
118    fn emit(self) {
119        debug!(message = "Collection completed.");
120        counter!("collect_completed_total").increment(1);
121        histogram!("collect_duration_seconds").record(self.end - self.start);
122    }
123}
124
125#[derive(Debug, NamedInternalEvent)]
126pub struct SinkRequestBuildError<E> {
127    pub error: E,
128}
129
130impl<E: std::fmt::Display> InternalEvent for SinkRequestBuildError<E> {
131    fn emit(self) {
132        // Providing the name of the sink with the build error is not necessary because the emitted log
133        // message contains the sink name in `component_type` field thanks to `tracing` spans. For example:
134        // "<timestamp> ERROR sink{component_kind="sink" component_id=sink0 component_type=aws_s3 component_name=sink0}: vector::internal_events::common: Failed to build request."
135        error!(
136            message = format!("Failed to build request."),
137            error = %self.error,
138            error_type = error_type::ENCODER_FAILED,
139            stage = error_stage::PROCESSING,
140        );
141        counter!(
142            "component_errors_total",
143            "error_type" => error_type::ENCODER_FAILED,
144            "stage" => error_stage::PROCESSING,
145        )
146        .increment(1);
147    }
148}