vector/internal_events/
common.rs1use std::time::Instant;
2
3use metrics::{counter, histogram};
4pub use vector_lib::internal_event::EventsReceived;
5use vector_lib::internal_event::InternalEvent;
6use vector_lib::internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL};
7
8#[derive(Debug)]
9pub struct EndpointBytesReceived<'a> {
10 pub byte_size: usize,
11 pub protocol: &'a str,
12 pub endpoint: &'a str,
13}
14
15impl InternalEvent for EndpointBytesReceived<'_> {
16 fn emit(self) {
17 trace!(
18 message = "Bytes received.",
19 byte_size = %self.byte_size,
20 protocol = %self.protocol,
21 endpoint = %self.endpoint,
22 );
23 counter!(
24 "component_received_bytes_total",
25 "protocol" => self.protocol.to_owned(),
26 "endpoint" => self.endpoint.to_owned(),
27 )
28 .increment(self.byte_size as u64);
29 }
30}
31
32#[derive(Debug)]
33pub struct EndpointBytesSent<'a> {
34 pub byte_size: usize,
35 pub protocol: &'a str,
36 pub endpoint: &'a str,
37}
38
39impl InternalEvent for EndpointBytesSent<'_> {
40 fn emit(self) {
41 trace!(
42 message = "Bytes sent.",
43 byte_size = %self.byte_size,
44 protocol = %self.protocol,
45 endpoint = %self.endpoint
46 );
47 counter!(
48 "component_sent_bytes_total",
49 "protocol" => self.protocol.to_string(),
50 "endpoint" => self.endpoint.to_string()
51 )
52 .increment(self.byte_size as u64);
53 }
54}
55
56#[derive(Debug)]
57pub struct SocketOutgoingConnectionError<E> {
58 pub error: E,
59}
60
61impl<E: std::error::Error> InternalEvent for SocketOutgoingConnectionError<E> {
62 fn emit(self) {
63 error!(
64 message = "Unable to connect.",
65 error = %self.error,
66 error_code = "failed_connecting",
67 error_type = error_type::CONNECTION_FAILED,
68 stage = error_stage::SENDING,
69 internal_log_rate_limit = true,
70 );
71 counter!(
72 "component_errors_total",
73 "error_code" => "failed_connecting",
74 "error_type" => error_type::CONNECTION_FAILED,
75 "stage" => error_stage::SENDING,
76 )
77 .increment(1);
78 }
79}
80
81const STREAM_CLOSED: &str = "stream_closed";
82
83#[derive(Debug)]
84pub struct StreamClosedError {
85 pub count: usize,
86}
87
88impl InternalEvent for StreamClosedError {
89 fn emit(self) {
90 error!(
91 message = "Failed to forward event(s), downstream is closed.",
92 error_code = STREAM_CLOSED,
93 error_type = error_type::WRITER_FAILED,
94 stage = error_stage::SENDING,
95 internal_log_rate_limit = true,
96 );
97 counter!(
98 "component_errors_total",
99 "error_code" => STREAM_CLOSED,
100 "error_type" => error_type::WRITER_FAILED,
101 "stage" => error_stage::SENDING,
102 )
103 .increment(1);
104 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
105 count: self.count,
106 reason: "Downstream is closed.",
107 });
108 }
109}
110
111#[derive(Debug)]
112pub struct CollectionCompleted {
113 pub start: Instant,
114 pub end: Instant,
115}
116
117impl InternalEvent for CollectionCompleted {
118 fn emit(self) {
119 debug!(message = "Collection completed.");
120 counter!("collect_completed_total").increment(1);
121 histogram!("collect_duration_seconds").record(self.end - self.start);
122 }
123}
124
125#[derive(Debug)]
126pub struct SinkRequestBuildError<E> {
127 pub error: E,
128}
129
130impl<E: std::fmt::Display> InternalEvent for SinkRequestBuildError<E> {
131 fn emit(self) {
132 error!(
136 message = format!("Failed to build request."),
137 error = %self.error,
138 error_type = error_type::ENCODER_FAILED,
139 stage = error_stage::PROCESSING,
140 internal_log_rate_limit = true,
141 );
142 counter!(
143 "component_errors_total",
144 "error_type" => error_type::ENCODER_FAILED,
145 "stage" => error_stage::PROCESSING,
146 )
147 .increment(1);
148 }
149}