1use dashmap::DashMap;
2use std::sync::atomic::{AtomicI64, Ordering};
3use std::sync::LazyLock;
4use std::time::Duration;
5
6use crate::cast_utils::{i64_to_f64_safe, u64_to_f64_safe};
7use metrics::{counter, gauge, histogram, Histogram};
8use vector_common::{
9 internal_event::{error_type, InternalEvent},
10 registered_event,
11};
12
13static BUFFER_COUNTERS: LazyLock<DashMap<(String, usize), (AtomicI64, AtomicI64)>> =
14 LazyLock::new(DashMap::new);
15
16fn update_and_get(counter: &AtomicI64, delta: i64) -> i64 {
17 let mut new_val = 0;
18 counter
19 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
20 let updated = current.saturating_add(delta).clamp(0, i64::MAX);
21 new_val = updated;
22 Some(updated)
23 })
24 .ok();
25 new_val
26}
27
28fn update_buffer_gauge(buffer_id: &str, stage: usize, events_delta: i64, bytes_delta: i64) {
29 let counters = BUFFER_COUNTERS
30 .entry((buffer_id.to_string(), stage))
31 .or_insert_with(|| (AtomicI64::new(0), AtomicI64::new(0)));
32
33 let new_events = update_and_get(&counters.0, events_delta);
34 let new_bytes = update_and_get(&counters.1, bytes_delta);
35
36 gauge!("buffer_events",
37 "buffer_id" => buffer_id.to_string(),
38 "stage" => stage.to_string()
39 )
40 .set(i64_to_f64_safe(new_events));
41
42 gauge!("buffer_byte_size",
43 "buffer_id" => buffer_id.to_string(),
44 "stage" => stage.to_string()
45 )
46 .set(i64_to_f64_safe(new_bytes));
47}
48
49pub struct BufferCreated {
50 pub idx: usize,
51 pub max_size_events: usize,
52 pub max_size_bytes: u64,
53}
54
55impl InternalEvent for BufferCreated {
56 fn emit(self) {
57 if self.max_size_events != 0 {
58 gauge!("buffer_max_event_size", "stage" => self.idx.to_string())
59 .set(u64_to_f64_safe(self.max_size_events as u64));
60 }
61 if self.max_size_bytes != 0 {
62 gauge!("buffer_max_byte_size", "stage" => self.idx.to_string())
63 .set(u64_to_f64_safe(self.max_size_bytes));
64 }
65 }
66}
67
68pub struct BufferEventsReceived {
69 pub buffer_id: String,
70 pub idx: usize,
71 pub count: u64,
72 pub byte_size: u64,
73}
74
75impl InternalEvent for BufferEventsReceived {
76 fn emit(self) {
77 counter!("buffer_received_events_total",
78 "buffer_id" => self.buffer_id.clone(),
79 "stage" => self.idx.to_string()
80 )
81 .increment(self.count);
82
83 counter!("buffer_received_bytes_total",
84 "buffer_id" => self.buffer_id.clone(),
85 "stage" => self.idx.to_string()
86 )
87 .increment(self.byte_size);
88
89 let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX);
90 let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX);
91 update_buffer_gauge(&self.buffer_id, self.idx, count_delta, bytes_delta);
92 }
93}
94
95pub struct BufferEventsSent {
96 pub buffer_id: String,
97 pub idx: usize,
98 pub count: u64,
99 pub byte_size: u64,
100}
101
102impl InternalEvent for BufferEventsSent {
103 fn emit(self) {
104 counter!("buffer_sent_events_total",
105 "buffer_id" => self.buffer_id.clone(),
106 "stage" => self.idx.to_string()
107 )
108 .increment(self.count);
109
110 counter!("buffer_sent_bytes_total",
111 "buffer_id" => self.buffer_id.clone(),
112 "stage" => self.idx.to_string())
113 .increment(self.byte_size);
114
115 let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX);
116 let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX);
117 update_buffer_gauge(&self.buffer_id, self.idx, -count_delta, -bytes_delta);
118 }
119}
120
121pub struct BufferEventsDropped {
122 pub buffer_id: String,
123 pub idx: usize,
124 pub count: u64,
125 pub byte_size: u64,
126 pub intentional: bool,
127 pub reason: &'static str,
128}
129
130impl InternalEvent for BufferEventsDropped {
131 fn emit(self) {
132 let intentional_str = if self.intentional { "true" } else { "false" };
133 if self.intentional {
134 debug!(
135 message = "Events dropped.",
136 count = %self.count,
137 intentional = %intentional_str,
138 reason = %self.reason,
139 buffer_id = %self.buffer_id,
140 stage = %self.idx,
141 );
142 } else {
143 error!(
144 message = "Events dropped.",
145 count = %self.count,
146 intentional = %intentional_str,
147 reason = %self.reason,
148 buffer_id = %self.buffer_id,
149 stage = %self.idx,
150 );
151 }
152
153 counter!(
154 "buffer_discarded_events_total",
155 "buffer_id" => self.buffer_id.clone(),
156 "intentional" => intentional_str,
157 )
158 .increment(self.count);
159
160 let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX);
161 let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX);
162
163 update_buffer_gauge(&self.buffer_id, self.idx, -count_delta, -bytes_delta);
164 }
165}
166
167pub struct BufferReadError {
168 pub error_code: &'static str,
169 pub error: String,
170}
171
172impl InternalEvent for BufferReadError {
173 fn emit(self) {
174 error!(
175 message = "Error encountered during buffer read.",
176 error = %self.error,
177 error_code = self.error_code,
178 error_type = error_type::READER_FAILED,
179 stage = "processing",
180 internal_log_rate_limit = true,
181 );
182 counter!(
183 "buffer_errors_total", "error_code" => self.error_code,
184 "error_type" => "reader_failed",
185 "stage" => "processing",
186 )
187 .increment(1);
188 }
189}
190
191registered_event! {
192 BufferSendDuration {
193 stage: usize,
194 } => {
195 send_duration: Histogram = histogram!("buffer_send_duration_seconds", "stage" => self.stage.to_string()),
196 }
197
198 fn emit(&self, duration: Duration) {
199 self.send_duration.record(duration);
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use crate::cast_utils::F64_SAFE_INT_MAX;
207 use metrics::{Key, Label};
208 use metrics_util::debugging::{DebugValue, DebuggingRecorder};
209 use metrics_util::{CompositeKey, MetricKind};
210 use ordered_float::OrderedFloat;
211 use std::borrow::Cow;
212 use std::sync::Mutex;
213 use std::thread;
214
215 static TEST_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
216
217 fn reset_counters() {
218 BUFFER_COUNTERS.clear();
219 }
220
221 fn get_counter_values(buffer_id: &str, stage: usize) -> (i64, i64) {
222 match BUFFER_COUNTERS.get(&(buffer_id.to_string(), stage)) {
223 Some(counters) => {
224 let events = counters.0.load(Ordering::Relaxed);
225 let bytes = counters.1.load(Ordering::Relaxed);
226 (events, bytes)
227 }
228 None => (0, 0),
229 }
230 }
231
232 fn assert_gauge_state(
233 buffer_id: &str,
234 stage: usize,
235 updates: &[(i64, i64)],
236 expected_events: f64,
237 expected_bytes: f64,
238 ) {
239 let _guard = TEST_LOCK
240 .lock()
241 .unwrap_or_else(std::sync::PoisonError::into_inner);
242
243 reset_counters();
244
245 let recorder = DebuggingRecorder::default();
246 let snapshotter = recorder.snapshotter();
247
248 metrics::with_local_recorder(&recorder, move || {
249 for (events_delta, bytes_delta) in updates {
250 update_buffer_gauge(buffer_id, stage, *events_delta, *bytes_delta);
251 }
252
253 let metrics = snapshotter.snapshot().into_vec();
254
255 let buffer_id_cow: Cow<'static, str> = Cow::Owned(buffer_id.to_string());
256 let buffer_id_label = Label::new("buffer_id", buffer_id_cow);
257
258 let stage_label = Label::new("stage", stage.to_string());
259
260 let expected_metrics = vec![
261 (
262 CompositeKey::new(
263 MetricKind::Gauge,
264 Key::from_parts(
265 "buffer_events",
266 vec![buffer_id_label.clone(), stage_label.clone()],
267 ),
268 ),
269 None,
270 None,
271 DebugValue::Gauge(OrderedFloat(expected_events)),
272 ),
273 (
274 CompositeKey::new(
275 MetricKind::Gauge,
276 Key::from_parts(
277 "buffer_byte_size",
278 vec![buffer_id_label.clone(), stage_label],
279 ),
280 ),
281 None,
282 None,
283 DebugValue::Gauge(OrderedFloat(expected_bytes)),
284 ),
285 ];
286
287 assert_eq!(metrics.len(), expected_metrics.len());
289 for expected in &expected_metrics {
290 assert!(
291 metrics.contains(expected),
292 "Missing expected metric: {expected:?}"
293 );
294 }
295 });
296 }
297
298 #[test]
299 fn test_increment() {
300 let _guard = TEST_LOCK
301 .lock()
302 .unwrap_or_else(std::sync::PoisonError::into_inner);
303
304 reset_counters();
305
306 update_buffer_gauge("test_buffer", 0, 10, 1024);
307 let (events, bytes) = get_counter_values("test_buffer", 0);
308 assert_eq!(events, 10);
309 assert_eq!(bytes, 1024);
310 }
311
312 #[test]
313 fn test_increment_and_decrement() {
314 let _guard = TEST_LOCK.lock().unwrap();
315 reset_counters();
316
317 update_buffer_gauge("test_buffer", 1, 100, 2048);
318 update_buffer_gauge("test_buffer", 1, -50, -1024);
319 let (events, bytes) = get_counter_values("test_buffer", 1);
320 assert_eq!(events, 50);
321 assert_eq!(bytes, 1024);
322 }
323
324 #[test]
325 fn test_decrement_below_zero_clamped_to_zero() {
326 let _guard = TEST_LOCK.lock().unwrap();
327 reset_counters();
328
329 update_buffer_gauge("test_buffer", 2, 5, 100);
330 update_buffer_gauge("test_buffer", 2, -10, -200);
331 let (events, bytes) = get_counter_values("test_buffer", 2);
332
333 assert_eq!(events, 0);
334 assert_eq!(bytes, 0);
335 }
336
337 #[test]
338 fn test_multiple_stages_are_independent() {
339 let _guard = TEST_LOCK.lock().unwrap();
340 reset_counters();
341
342 update_buffer_gauge("test_buffer", 0, 10, 100);
343 update_buffer_gauge("test_buffer", 1, 20, 200);
344 let (events0, bytes0) = get_counter_values("test_buffer", 0);
345 let (events1, bytes1) = get_counter_values("test_buffer", 1);
346 assert_eq!(events0, 10);
347 assert_eq!(bytes0, 100);
348 assert_eq!(events1, 20);
349 assert_eq!(bytes1, 200);
350 }
351
352 #[test]
353 fn test_multithreaded_updates_are_correct() {
354 let _guard = TEST_LOCK
355 .lock()
356 .unwrap_or_else(std::sync::PoisonError::into_inner);
357
358 reset_counters();
359
360 let num_threads = 10;
361 let increments_per_thread = 1000;
362 let mut handles = vec![];
363
364 for _ in 0..num_threads {
365 let handle = thread::spawn(move || {
366 for _ in 0..increments_per_thread {
367 update_buffer_gauge("test_buffer", 0, 1, 10);
368 }
369 });
370 handles.push(handle);
371 }
372
373 for handle in handles {
374 handle.join().unwrap();
375 }
376
377 let (final_events, final_bytes) = get_counter_values("test_buffer", 0);
378 let expected_events = i64::from(num_threads * increments_per_thread);
379 let expected_bytes = i64::from(num_threads * increments_per_thread * 10);
380
381 assert_eq!(final_events, expected_events);
382 assert_eq!(final_bytes, expected_bytes);
383 }
384
385 #[test]
386 fn test_large_values_capped_to_f64_safe_max() {
387 let _guard = TEST_LOCK
388 .lock()
389 .unwrap_or_else(std::sync::PoisonError::into_inner);
390
391 reset_counters();
392
393 update_buffer_gauge("test_buffer", 3, F64_SAFE_INT_MAX * 2, F64_SAFE_INT_MAX * 2);
394
395 let (events, bytes) = get_counter_values("test_buffer", 3);
396
397 assert!(events > F64_SAFE_INT_MAX);
398 assert!(bytes > F64_SAFE_INT_MAX);
399
400 let capped_events = events.min(F64_SAFE_INT_MAX);
401 let capped_bytes = bytes.min(F64_SAFE_INT_MAX);
402
403 assert_eq!(capped_events, F64_SAFE_INT_MAX);
404 assert_eq!(capped_bytes, F64_SAFE_INT_MAX);
405 }
406
407 #[test]
408 fn test_increment_with_recorder() {
409 assert_gauge_state("test_buffer", 0, &[(100, 2048), (200, 1024)], 300.0, 3072.0);
410 }
411
412 #[test]
413 fn test_should_not_be_negative_with_recorder() {
414 assert_gauge_state("test_buffer", 1, &[(100, 1024), (-200, -4096)], 0.0, 0.0);
415 }
416
417 #[test]
418 fn test_increment_with_custom_buffer_id() {
419 assert_gauge_state(
420 "buffer_alpha",
421 0,
422 &[(100, 2048), (200, 1024)],
423 300.0,
424 3072.0,
425 );
426 }
427
428 #[test]
429 fn test_increment_with_another_buffer_id() {
430 assert_gauge_state("buffer_beta", 0, &[(10, 100), (5, 50)], 15.0, 150.0);
431 }
432}