vector/internal_events/
common.rs1use std::time::Instant;
2
3use metrics::{counter, histogram};
4pub use vector_lib::internal_event::EventsReceived;
5use vector_lib::internal_event::{
6 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
7};
8
9#[derive(Debug)]
10pub struct EndpointBytesReceived<'a> {
11 pub byte_size: usize,
12 pub protocol: &'a str,
13 pub endpoint: &'a str,
14}
15
16impl InternalEvent for EndpointBytesReceived<'_> {
17 fn emit(self) {
18 trace!(
19 message = "Bytes received.",
20 byte_size = %self.byte_size,
21 protocol = %self.protocol,
22 endpoint = %self.endpoint,
23 );
24 counter!(
25 "component_received_bytes_total",
26 "protocol" => self.protocol.to_owned(),
27 "endpoint" => self.endpoint.to_owned(),
28 )
29 .increment(self.byte_size as u64);
30 }
31}
32
33#[derive(Debug)]
34pub struct EndpointBytesSent<'a> {
35 pub byte_size: usize,
36 pub protocol: &'a str,
37 pub endpoint: &'a str,
38}
39
40impl InternalEvent for EndpointBytesSent<'_> {
41 fn emit(self) {
42 trace!(
43 message = "Bytes sent.",
44 byte_size = %self.byte_size,
45 protocol = %self.protocol,
46 endpoint = %self.endpoint
47 );
48 counter!(
49 "component_sent_bytes_total",
50 "protocol" => self.protocol.to_string(),
51 "endpoint" => self.endpoint.to_string()
52 )
53 .increment(self.byte_size as u64);
54 }
55}
56
57#[derive(Debug)]
58pub struct SocketOutgoingConnectionError<E> {
59 pub error: E,
60}
61
62impl<E: std::error::Error> InternalEvent for SocketOutgoingConnectionError<E> {
63 fn emit(self) {
64 error!(
65 message = "Unable to connect.",
66 error = %self.error,
67 error_code = "failed_connecting",
68 error_type = error_type::CONNECTION_FAILED,
69 stage = error_stage::SENDING,
70 );
71 counter!(
72 "component_errors_total",
73 "error_code" => "failed_connecting",
74 "error_type" => error_type::CONNECTION_FAILED,
75 "stage" => error_stage::SENDING,
76 )
77 .increment(1);
78 }
79}
80
81const STREAM_CLOSED: &str = "stream_closed";
82
83#[derive(Debug)]
84pub struct StreamClosedError {
85 pub count: usize,
86}
87
88impl InternalEvent for StreamClosedError {
89 fn emit(self) {
90 error!(
91 message = "Failed to forward event(s), downstream is closed.",
92 error_code = STREAM_CLOSED,
93 error_type = error_type::WRITER_FAILED,
94 stage = error_stage::SENDING,
95 );
96 counter!(
97 "component_errors_total",
98 "error_code" => STREAM_CLOSED,
99 "error_type" => error_type::WRITER_FAILED,
100 "stage" => error_stage::SENDING,
101 )
102 .increment(1);
103 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
104 count: self.count,
105 reason: "Downstream is closed.",
106 });
107 }
108}
109
110#[derive(Debug)]
111pub struct CollectionCompleted {
112 pub start: Instant,
113 pub end: Instant,
114}
115
116impl InternalEvent for CollectionCompleted {
117 fn emit(self) {
118 debug!(message = "Collection completed.");
119 counter!("collect_completed_total").increment(1);
120 histogram!("collect_duration_seconds").record(self.end - self.start);
121 }
122}
123
124#[derive(Debug)]
125pub struct SinkRequestBuildError<E> {
126 pub error: E,
127}
128
129impl<E: std::fmt::Display> InternalEvent for SinkRequestBuildError<E> {
130 fn emit(self) {
131 error!(
135 message = format!("Failed to build request."),
136 error = %self.error,
137 error_type = error_type::ENCODER_FAILED,
138 stage = error_stage::PROCESSING,
139 );
140 counter!(
141 "component_errors_total",
142 "error_type" => error_type::ENCODER_FAILED,
143 "stage" => error_stage::PROCESSING,
144 )
145 .increment(1);
146 }
147}