1use std::time::Duration;
2
3use metrics::{Histogram, counter, gauge, histogram};
4use vector_common::NamedInternalEvent;
5use vector_common::{
6 internal_event::{InternalEvent, error_type},
7 registered_event,
8};
9
10#[derive(NamedInternalEvent)]
11pub struct BufferCreated {
12 pub buffer_id: String,
13 pub idx: usize,
14 pub max_size_events: usize,
15 pub max_size_bytes: u64,
16}
17
18impl InternalEvent for BufferCreated {
19 #[expect(clippy::cast_precision_loss)]
20 fn emit(self) {
21 let stage = self.idx.to_string();
22 if self.max_size_events != 0 {
23 gauge!(
24 "buffer_max_size_events",
25 "buffer_id" => self.buffer_id.clone(),
26 "stage" => stage.clone(),
27 )
28 .set(self.max_size_events as f64);
29 gauge!(
31 "buffer_max_event_size",
32 "buffer_id" => self.buffer_id.clone(),
33 "stage" => stage.clone(),
34 )
35 .set(self.max_size_events as f64);
36 }
37 if self.max_size_bytes != 0 {
38 gauge!(
39 "buffer_max_size_bytes",
40 "buffer_id" => self.buffer_id.clone(),
41 "stage" => stage.clone(),
42 )
43 .set(self.max_size_bytes as f64);
44 gauge!(
46 "buffer_max_byte_size",
47 "buffer_id" => self.buffer_id,
48 "stage" => stage,
49 )
50 .set(self.max_size_bytes as f64);
51 }
52 }
53}
54
55#[derive(NamedInternalEvent)]
56pub struct BufferEventsReceived {
57 pub buffer_id: String,
58 pub idx: usize,
59 pub count: u64,
60 pub byte_size: u64,
61 pub total_count: u64,
62 pub total_byte_size: u64,
63}
64
65impl InternalEvent for BufferEventsReceived {
66 #[expect(clippy::cast_precision_loss)]
67 fn emit(self) {
68 counter!(
69 "buffer_received_events_total",
70 "buffer_id" => self.buffer_id.clone(),
71 "stage" => self.idx.to_string()
72 )
73 .increment(self.count);
74
75 counter!(
76 "buffer_received_bytes_total",
77 "buffer_id" => self.buffer_id.clone(),
78 "stage" => self.idx.to_string()
79 )
80 .increment(self.byte_size);
81 gauge!(
83 "buffer_events",
84 "buffer_id" => self.buffer_id.clone(),
85 "stage" => self.idx.to_string()
86 )
87 .set(self.total_count as f64);
88 gauge!(
89 "buffer_size_events",
90 "buffer_id" => self.buffer_id.clone(),
91 "stage" => self.idx.to_string()
92 )
93 .set(self.total_count as f64);
94 gauge!(
95 "buffer_size_bytes",
96 "buffer_id" => self.buffer_id.clone(),
97 "stage" => self.idx.to_string()
98 )
99 .set(self.total_byte_size as f64);
100 gauge!(
102 "buffer_byte_size",
103 "buffer_id" => self.buffer_id,
104 "stage" => self.idx.to_string()
105 )
106 .set(self.total_byte_size as f64);
107 }
108}
109
110#[derive(NamedInternalEvent)]
111pub struct BufferEventsSent {
112 pub buffer_id: String,
113 pub idx: usize,
114 pub count: u64,
115 pub byte_size: u64,
116 pub total_count: u64,
117 pub total_byte_size: u64,
118}
119
120impl InternalEvent for BufferEventsSent {
121 #[expect(clippy::cast_precision_loss)]
122 fn emit(self) {
123 counter!(
124 "buffer_sent_events_total",
125 "buffer_id" => self.buffer_id.clone(),
126 "stage" => self.idx.to_string()
127 )
128 .increment(self.count);
129 counter!(
130 "buffer_sent_bytes_total",
131 "buffer_id" => self.buffer_id.clone(),
132 "stage" => self.idx.to_string()
133 )
134 .increment(self.byte_size);
135 gauge!(
137 "buffer_events",
138 "buffer_id" => self.buffer_id.clone(),
139 "stage" => self.idx.to_string()
140 )
141 .set(self.total_count as f64);
142 gauge!(
143 "buffer_size_events",
144 "buffer_id" => self.buffer_id.clone(),
145 "stage" => self.idx.to_string()
146 )
147 .set(self.total_count as f64);
148 gauge!(
149 "buffer_size_bytes",
150 "buffer_id" => self.buffer_id.clone(),
151 "stage" => self.idx.to_string()
152 )
153 .set(self.total_byte_size as f64);
154 gauge!(
156 "buffer_byte_size",
157 "buffer_id" => self.buffer_id,
158 "stage" => self.idx.to_string()
159 )
160 .set(self.total_byte_size as f64);
161 }
162}
163
164#[derive(NamedInternalEvent)]
165pub struct BufferEventsDropped {
166 pub buffer_id: String,
167 pub idx: usize,
168 pub count: u64,
169 pub byte_size: u64,
170 pub total_count: u64,
171 pub total_byte_size: u64,
172 pub intentional: bool,
173 pub reason: &'static str,
174}
175
176impl InternalEvent for BufferEventsDropped {
177 #[expect(clippy::cast_precision_loss)]
178 fn emit(self) {
179 let intentional_str = if self.intentional { "true" } else { "false" };
180 if self.intentional {
181 debug!(
182 message = "Events dropped.",
183 count = %self.count,
184 byte_size = %self.byte_size,
185 intentional = %intentional_str,
186 reason = %self.reason,
187 buffer_id = %self.buffer_id,
188 stage = %self.idx,
189 );
190 } else {
191 error!(
192 message = "Events dropped.",
193 count = %self.count,
194 byte_size = %self.byte_size,
195 intentional = %intentional_str,
196 reason = %self.reason,
197 buffer_id = %self.buffer_id,
198 stage = %self.idx,
199 );
200 }
201
202 counter!(
203 "buffer_discarded_events_total",
204 "buffer_id" => self.buffer_id.clone(),
205 "stage" => self.idx.to_string(),
206 "intentional" => intentional_str,
207 )
208 .increment(self.count);
209 counter!(
210 "buffer_discarded_bytes_total",
211 "buffer_id" => self.buffer_id.clone(),
212 "stage" => self.idx.to_string(),
213 "intentional" => intentional_str,
214 )
215 .increment(self.byte_size);
216 gauge!(
218 "buffer_events",
219 "buffer_id" => self.buffer_id.clone(),
220 "stage" => self.idx.to_string()
221 )
222 .set(self.total_count as f64);
223 gauge!(
224 "buffer_size_events",
225 "buffer_id" => self.buffer_id.clone(),
226 "stage" => self.idx.to_string()
227 )
228 .set(self.total_count as f64);
229 gauge!(
230 "buffer_size_bytes",
231 "buffer_id" => self.buffer_id.clone(),
232 "stage" => self.idx.to_string()
233 )
234 .set(self.total_byte_size as f64);
235 gauge!(
237 "buffer_byte_size",
238 "buffer_id" => self.buffer_id,
239 "stage" => self.idx.to_string()
240 )
241 .set(self.total_byte_size as f64);
242 }
243}
244
245#[derive(NamedInternalEvent)]
246pub struct BufferReadError {
247 pub error_code: &'static str,
248 pub error: String,
249}
250
251impl InternalEvent for BufferReadError {
252 fn emit(self) {
253 error!(
254 message = "Error encountered during buffer read.",
255 error = %self.error,
256 error_code = self.error_code,
257 error_type = error_type::READER_FAILED,
258 stage = "processing",
259 );
260 counter!(
261 "buffer_errors_total", "error_code" => self.error_code,
262 "error_type" => "reader_failed",
263 "stage" => "processing",
264 )
265 .increment(1);
266 }
267}
268
269registered_event! {
270 BufferSendDuration {
271 stage: usize,
272 } => {
273 send_duration: Histogram = histogram!("buffer_send_duration_seconds", "stage" => self.stage.to_string()),
274 }
275
276 fn emit(&self, duration: Duration) {
277 self.send_duration.record(duration);
278 }
279}