1use std::time::Duration;
2
3use metrics::{Histogram, counter, gauge, histogram};
4use vector_common::{
5 internal_event::{InternalEvent, error_type},
6 registered_event,
7};
8
9pub struct BufferCreated {
10 pub buffer_id: String,
11 pub idx: usize,
12 pub max_size_events: usize,
13 pub max_size_bytes: u64,
14}
15
16impl InternalEvent for BufferCreated {
17 #[expect(clippy::cast_precision_loss)]
18 fn emit(self) {
19 if self.max_size_events != 0 {
20 gauge!(
21 "buffer_max_event_size",
22 "buffer_id" => self.buffer_id.clone(),
23 "stage" => self.idx.to_string(),
24 )
25 .set(self.max_size_events as f64);
26 }
27 if self.max_size_bytes != 0 {
28 gauge!(
29 "buffer_max_byte_size",
30 "buffer_id" => self.buffer_id,
31 "stage" => self.idx.to_string(),
32 )
33 .set(self.max_size_bytes as f64);
34 }
35 }
36}
37
38pub struct BufferEventsReceived {
39 pub buffer_id: String,
40 pub idx: usize,
41 pub count: u64,
42 pub byte_size: u64,
43 pub total_count: u64,
44 pub total_byte_size: u64,
45}
46
47impl InternalEvent for BufferEventsReceived {
48 #[expect(clippy::cast_precision_loss)]
49 fn emit(self) {
50 counter!(
51 "buffer_received_events_total",
52 "buffer_id" => self.buffer_id.clone(),
53 "stage" => self.idx.to_string()
54 )
55 .increment(self.count);
56
57 counter!(
58 "buffer_received_bytes_total",
59 "buffer_id" => self.buffer_id.clone(),
60 "stage" => self.idx.to_string()
61 )
62 .increment(self.byte_size);
63 gauge!(
64 "buffer_events",
65 "buffer_id" => self.buffer_id.clone(),
66 "stage" => self.idx.to_string()
67 )
68 .set(self.total_count as f64);
69 gauge!(
70 "buffer_byte_size",
71 "buffer_id" => self.buffer_id,
72 "stage" => self.idx.to_string()
73 )
74 .set(self.total_byte_size as f64);
75 }
76}
77
78pub struct BufferEventsSent {
79 pub buffer_id: String,
80 pub idx: usize,
81 pub count: u64,
82 pub byte_size: u64,
83 pub total_count: u64,
84 pub total_byte_size: u64,
85}
86
87impl InternalEvent for BufferEventsSent {
88 #[expect(clippy::cast_precision_loss)]
89 fn emit(self) {
90 counter!(
91 "buffer_sent_events_total",
92 "buffer_id" => self.buffer_id.clone(),
93 "stage" => self.idx.to_string()
94 )
95 .increment(self.count);
96 counter!(
97 "buffer_sent_bytes_total",
98 "buffer_id" => self.buffer_id.clone(),
99 "stage" => self.idx.to_string()
100 )
101 .increment(self.byte_size);
102 gauge!(
103 "buffer_events",
104 "buffer_id" => self.buffer_id.clone(),
105 "stage" => self.idx.to_string()
106 )
107 .set(self.total_count as f64);
108 gauge!(
109 "buffer_byte_size",
110 "buffer_id" => self.buffer_id,
111 "stage" => self.idx.to_string()
112 )
113 .set(self.total_byte_size as f64);
114 }
115}
116
117pub struct BufferEventsDropped {
118 pub buffer_id: String,
119 pub idx: usize,
120 pub count: u64,
121 pub byte_size: u64,
122 pub total_count: u64,
123 pub total_byte_size: u64,
124 pub intentional: bool,
125 pub reason: &'static str,
126}
127
128impl InternalEvent for BufferEventsDropped {
129 #[expect(clippy::cast_precision_loss)]
130 fn emit(self) {
131 let intentional_str = if self.intentional { "true" } else { "false" };
132 if self.intentional {
133 debug!(
134 message = "Events dropped.",
135 count = %self.count,
136 byte_size = %self.byte_size,
137 intentional = %intentional_str,
138 reason = %self.reason,
139 buffer_id = %self.buffer_id,
140 stage = %self.idx,
141 );
142 } else {
143 error!(
144 message = "Events dropped.",
145 count = %self.count,
146 byte_size = %self.byte_size,
147 intentional = %intentional_str,
148 reason = %self.reason,
149 buffer_id = %self.buffer_id,
150 stage = %self.idx,
151 );
152 }
153
154 counter!(
155 "buffer_discarded_events_total",
156 "buffer_id" => self.buffer_id.clone(),
157 "stage" => self.idx.to_string(),
158 "intentional" => intentional_str,
159 )
160 .increment(self.count);
161 counter!(
162 "buffer_discarded_bytes_total",
163 "buffer_id" => self.buffer_id.clone(),
164 "stage" => self.idx.to_string(),
165 "intentional" => intentional_str,
166 )
167 .increment(self.byte_size);
168 gauge!(
169 "buffer_events",
170 "buffer_id" => self.buffer_id.clone(),
171 "stage" => self.idx.to_string()
172 )
173 .set(self.total_count as f64);
174 gauge!(
175 "buffer_byte_size",
176 "buffer_id" => self.buffer_id,
177 "stage" => self.idx.to_string()
178 )
179 .set(self.total_byte_size as f64);
180 }
181}
182
183pub struct BufferReadError {
184 pub error_code: &'static str,
185 pub error: String,
186}
187
188impl InternalEvent for BufferReadError {
189 fn emit(self) {
190 error!(
191 message = "Error encountered during buffer read.",
192 error = %self.error,
193 error_code = self.error_code,
194 error_type = error_type::READER_FAILED,
195 stage = "processing",
196 internal_log_rate_limit = true,
197 );
198 counter!(
199 "buffer_errors_total", "error_code" => self.error_code,
200 "error_type" => "reader_failed",
201 "stage" => "processing",
202 )
203 .increment(1);
204 }
205}
206
207registered_event! {
208 BufferSendDuration {
209 stage: usize,
210 } => {
211 send_duration: Histogram = histogram!("buffer_send_duration_seconds", "stage" => self.stage.to_string()),
212 }
213
214 fn emit(&self, duration: Duration) {
215 self.send_duration.record(duration);
216 }
217}