vector/internal_events/
common.rs1use std::time::Instant;
2
3use metrics::{counter, histogram};
4use vector_lib::NamedInternalEvent;
5pub use vector_lib::internal_event::EventsReceived;
6use vector_lib::internal_event::{
7 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8};
9
10#[derive(Debug, NamedInternalEvent)]
11pub struct EndpointBytesReceived<'a> {
12 pub byte_size: usize,
13 pub protocol: &'a str,
14 pub endpoint: &'a str,
15}
16
17impl InternalEvent for EndpointBytesReceived<'_> {
18 fn emit(self) {
19 trace!(
20 message = "Bytes received.",
21 byte_size = %self.byte_size,
22 protocol = %self.protocol,
23 endpoint = %self.endpoint,
24 );
25 counter!(
26 "component_received_bytes_total",
27 "protocol" => self.protocol.to_owned(),
28 "endpoint" => self.endpoint.to_owned(),
29 )
30 .increment(self.byte_size as u64);
31 }
32}
33
34#[derive(Debug, NamedInternalEvent)]
35pub struct EndpointBytesSent<'a> {
36 pub byte_size: usize,
37 pub protocol: &'a str,
38 pub endpoint: &'a str,
39}
40
41impl InternalEvent for EndpointBytesSent<'_> {
42 fn emit(self) {
43 trace!(
44 message = "Bytes sent.",
45 byte_size = %self.byte_size,
46 protocol = %self.protocol,
47 endpoint = %self.endpoint
48 );
49 counter!(
50 "component_sent_bytes_total",
51 "protocol" => self.protocol.to_string(),
52 "endpoint" => self.endpoint.to_string()
53 )
54 .increment(self.byte_size as u64);
55 }
56}
57
58#[derive(Debug, NamedInternalEvent)]
59pub struct SocketOutgoingConnectionError<E> {
60 pub error: E,
61}
62
63impl<E: std::error::Error> InternalEvent for SocketOutgoingConnectionError<E> {
64 fn emit(self) {
65 error!(
66 message = "Unable to connect.",
67 error = %self.error,
68 error_code = "failed_connecting",
69 error_type = error_type::CONNECTION_FAILED,
70 stage = error_stage::SENDING,
71 );
72 counter!(
73 "component_errors_total",
74 "error_code" => "failed_connecting",
75 "error_type" => error_type::CONNECTION_FAILED,
76 "stage" => error_stage::SENDING,
77 )
78 .increment(1);
79 }
80}
81
82const STREAM_CLOSED: &str = "stream_closed";
83
84#[derive(Debug, NamedInternalEvent)]
85pub struct StreamClosedError {
86 pub count: usize,
87}
88
89impl InternalEvent for StreamClosedError {
90 fn emit(self) {
91 error!(
92 message = "Failed to forward event(s), downstream is closed.",
93 error_code = STREAM_CLOSED,
94 error_type = error_type::WRITER_FAILED,
95 stage = error_stage::SENDING,
96 );
97 counter!(
98 "component_errors_total",
99 "error_code" => STREAM_CLOSED,
100 "error_type" => error_type::WRITER_FAILED,
101 "stage" => error_stage::SENDING,
102 )
103 .increment(1);
104 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
105 count: self.count,
106 reason: "Downstream is closed.",
107 });
108 }
109}
110
111#[derive(Debug, NamedInternalEvent)]
112pub struct CollectionCompleted {
113 pub start: Instant,
114 pub end: Instant,
115}
116
117impl InternalEvent for CollectionCompleted {
118 fn emit(self) {
119 debug!(message = "Collection completed.");
120 counter!("collect_completed_total").increment(1);
121 histogram!("collect_duration_seconds").record(self.end - self.start);
122 }
123}
124
125#[derive(Debug, NamedInternalEvent)]
126pub struct SinkRequestBuildError<E> {
127 pub error: E,
128}
129
130impl<E: std::fmt::Display> InternalEvent for SinkRequestBuildError<E> {
131 fn emit(self) {
132 error!(
136 message = format!("Failed to build request."),
137 error = %self.error,
138 error_type = error_type::ENCODER_FAILED,
139 stage = error_stage::PROCESSING,
140 );
141 counter!(
142 "component_errors_total",
143 "error_type" => error_type::ENCODER_FAILED,
144 "stage" => error_stage::PROCESSING,
145 )
146 .increment(1);
147 }
148}