vector_buffers/
internal_events.rs

1use dashmap::DashMap;
2use std::sync::atomic::{AtomicI64, Ordering};
3use std::sync::LazyLock;
4use std::time::Duration;
5
6use crate::cast_utils::{i64_to_f64_safe, u64_to_f64_safe};
7use metrics::{counter, gauge, histogram, Histogram};
8use vector_common::{
9    internal_event::{error_type, InternalEvent},
10    registered_event,
11};
12
13static BUFFER_COUNTERS: LazyLock<DashMap<(String, usize), (AtomicI64, AtomicI64)>> =
14    LazyLock::new(DashMap::new);
15
16fn update_and_get(counter: &AtomicI64, delta: i64) -> i64 {
17    let mut new_val = 0;
18    counter
19        .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
20            let updated = current.saturating_add(delta).clamp(0, i64::MAX);
21            new_val = updated;
22            Some(updated)
23        })
24        .ok();
25    new_val
26}
27
28fn update_buffer_gauge(buffer_id: &str, stage: usize, events_delta: i64, bytes_delta: i64) {
29    let counters = BUFFER_COUNTERS
30        .entry((buffer_id.to_string(), stage))
31        .or_insert_with(|| (AtomicI64::new(0), AtomicI64::new(0)));
32
33    let new_events = update_and_get(&counters.0, events_delta);
34    let new_bytes = update_and_get(&counters.1, bytes_delta);
35
36    gauge!("buffer_events",
37        "buffer_id" => buffer_id.to_string(),
38        "stage" => stage.to_string()
39    )
40    .set(i64_to_f64_safe(new_events));
41
42    gauge!("buffer_byte_size",
43        "buffer_id" => buffer_id.to_string(),
44        "stage" => stage.to_string()
45    )
46    .set(i64_to_f64_safe(new_bytes));
47}
48
49pub struct BufferCreated {
50    pub idx: usize,
51    pub max_size_events: usize,
52    pub max_size_bytes: u64,
53}
54
55impl InternalEvent for BufferCreated {
56    fn emit(self) {
57        if self.max_size_events != 0 {
58            gauge!("buffer_max_event_size", "stage" => self.idx.to_string())
59                .set(u64_to_f64_safe(self.max_size_events as u64));
60        }
61        if self.max_size_bytes != 0 {
62            gauge!("buffer_max_byte_size", "stage" => self.idx.to_string())
63                .set(u64_to_f64_safe(self.max_size_bytes));
64        }
65    }
66}
67
68pub struct BufferEventsReceived {
69    pub buffer_id: String,
70    pub idx: usize,
71    pub count: u64,
72    pub byte_size: u64,
73}
74
75impl InternalEvent for BufferEventsReceived {
76    fn emit(self) {
77        counter!("buffer_received_events_total",
78            "buffer_id" => self.buffer_id.clone(),
79            "stage" => self.idx.to_string()
80        )
81        .increment(self.count);
82
83        counter!("buffer_received_bytes_total",
84            "buffer_id" => self.buffer_id.clone(),
85            "stage" => self.idx.to_string()
86        )
87        .increment(self.byte_size);
88
89        let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX);
90        let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX);
91        update_buffer_gauge(&self.buffer_id, self.idx, count_delta, bytes_delta);
92    }
93}
94
95pub struct BufferEventsSent {
96    pub buffer_id: String,
97    pub idx: usize,
98    pub count: u64,
99    pub byte_size: u64,
100}
101
102impl InternalEvent for BufferEventsSent {
103    fn emit(self) {
104        counter!("buffer_sent_events_total",
105            "buffer_id" => self.buffer_id.clone(),
106            "stage" => self.idx.to_string()
107        )
108        .increment(self.count);
109
110        counter!("buffer_sent_bytes_total",
111            "buffer_id" => self.buffer_id.clone(),
112            "stage" => self.idx.to_string())
113        .increment(self.byte_size);
114
115        let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX);
116        let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX);
117        update_buffer_gauge(&self.buffer_id, self.idx, -count_delta, -bytes_delta);
118    }
119}
120
121pub struct BufferEventsDropped {
122    pub buffer_id: String,
123    pub idx: usize,
124    pub count: u64,
125    pub byte_size: u64,
126    pub intentional: bool,
127    pub reason: &'static str,
128}
129
130impl InternalEvent for BufferEventsDropped {
131    fn emit(self) {
132        let intentional_str = if self.intentional { "true" } else { "false" };
133        if self.intentional {
134            debug!(
135                message = "Events dropped.",
136                count = %self.count,
137                intentional = %intentional_str,
138                reason = %self.reason,
139                buffer_id = %self.buffer_id,
140                stage = %self.idx,
141            );
142        } else {
143            error!(
144                message = "Events dropped.",
145                count = %self.count,
146                intentional = %intentional_str,
147                reason = %self.reason,
148                buffer_id = %self.buffer_id,
149                stage = %self.idx,
150            );
151        }
152
153        counter!(
154            "buffer_discarded_events_total",
155            "buffer_id" => self.buffer_id.clone(),
156            "intentional" => intentional_str,
157        )
158        .increment(self.count);
159
160        let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX);
161        let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX);
162
163        update_buffer_gauge(&self.buffer_id, self.idx, -count_delta, -bytes_delta);
164    }
165}
166
167pub struct BufferReadError {
168    pub error_code: &'static str,
169    pub error: String,
170}
171
172impl InternalEvent for BufferReadError {
173    fn emit(self) {
174        error!(
175            message = "Error encountered during buffer read.",
176            error = %self.error,
177            error_code = self.error_code,
178            error_type = error_type::READER_FAILED,
179            stage = "processing",
180            internal_log_rate_limit = true,
181        );
182        counter!(
183            "buffer_errors_total", "error_code" => self.error_code,
184            "error_type" => "reader_failed",
185            "stage" => "processing",
186        )
187        .increment(1);
188    }
189}
190
191registered_event! {
192    BufferSendDuration {
193        stage: usize,
194    } => {
195        send_duration: Histogram = histogram!("buffer_send_duration_seconds", "stage" => self.stage.to_string()),
196    }
197
198    fn emit(&self, duration: Duration) {
199        self.send_duration.record(duration);
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::cast_utils::F64_SAFE_INT_MAX;
207    use metrics::{Key, Label};
208    use metrics_util::debugging::{DebugValue, DebuggingRecorder};
209    use metrics_util::{CompositeKey, MetricKind};
210    use ordered_float::OrderedFloat;
211    use std::borrow::Cow;
212    use std::sync::Mutex;
213    use std::thread;
214
215    static TEST_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
216
217    fn reset_counters() {
218        BUFFER_COUNTERS.clear();
219    }
220
221    fn get_counter_values(buffer_id: &str, stage: usize) -> (i64, i64) {
222        match BUFFER_COUNTERS.get(&(buffer_id.to_string(), stage)) {
223            Some(counters) => {
224                let events = counters.0.load(Ordering::Relaxed);
225                let bytes = counters.1.load(Ordering::Relaxed);
226                (events, bytes)
227            }
228            None => (0, 0),
229        }
230    }
231
232    fn assert_gauge_state(
233        buffer_id: &str,
234        stage: usize,
235        updates: &[(i64, i64)],
236        expected_events: f64,
237        expected_bytes: f64,
238    ) {
239        let _guard = TEST_LOCK
240            .lock()
241            .unwrap_or_else(std::sync::PoisonError::into_inner);
242
243        reset_counters();
244
245        let recorder = DebuggingRecorder::default();
246        let snapshotter = recorder.snapshotter();
247
248        metrics::with_local_recorder(&recorder, move || {
249            for (events_delta, bytes_delta) in updates {
250                update_buffer_gauge(buffer_id, stage, *events_delta, *bytes_delta);
251            }
252
253            let metrics = snapshotter.snapshot().into_vec();
254
255            let buffer_id_cow: Cow<'static, str> = Cow::Owned(buffer_id.to_string());
256            let buffer_id_label = Label::new("buffer_id", buffer_id_cow);
257
258            let stage_label = Label::new("stage", stage.to_string());
259
260            let expected_metrics = vec![
261                (
262                    CompositeKey::new(
263                        MetricKind::Gauge,
264                        Key::from_parts(
265                            "buffer_events",
266                            vec![buffer_id_label.clone(), stage_label.clone()],
267                        ),
268                    ),
269                    None,
270                    None,
271                    DebugValue::Gauge(OrderedFloat(expected_events)),
272                ),
273                (
274                    CompositeKey::new(
275                        MetricKind::Gauge,
276                        Key::from_parts(
277                            "buffer_byte_size",
278                            vec![buffer_id_label.clone(), stage_label],
279                        ),
280                    ),
281                    None,
282                    None,
283                    DebugValue::Gauge(OrderedFloat(expected_bytes)),
284                ),
285            ];
286
287            // Compare metrics without needing to sort if order doesn't matter
288            assert_eq!(metrics.len(), expected_metrics.len());
289            for expected in &expected_metrics {
290                assert!(
291                    metrics.contains(expected),
292                    "Missing expected metric: {expected:?}"
293                );
294            }
295        });
296    }
297
298    #[test]
299    fn test_increment() {
300        let _guard = TEST_LOCK
301            .lock()
302            .unwrap_or_else(std::sync::PoisonError::into_inner);
303
304        reset_counters();
305
306        update_buffer_gauge("test_buffer", 0, 10, 1024);
307        let (events, bytes) = get_counter_values("test_buffer", 0);
308        assert_eq!(events, 10);
309        assert_eq!(bytes, 1024);
310    }
311
312    #[test]
313    fn test_increment_and_decrement() {
314        let _guard = TEST_LOCK.lock().unwrap();
315        reset_counters();
316
317        update_buffer_gauge("test_buffer", 1, 100, 2048);
318        update_buffer_gauge("test_buffer", 1, -50, -1024);
319        let (events, bytes) = get_counter_values("test_buffer", 1);
320        assert_eq!(events, 50);
321        assert_eq!(bytes, 1024);
322    }
323
324    #[test]
325    fn test_decrement_below_zero_clamped_to_zero() {
326        let _guard = TEST_LOCK.lock().unwrap();
327        reset_counters();
328
329        update_buffer_gauge("test_buffer", 2, 5, 100);
330        update_buffer_gauge("test_buffer", 2, -10, -200);
331        let (events, bytes) = get_counter_values("test_buffer", 2);
332
333        assert_eq!(events, 0);
334        assert_eq!(bytes, 0);
335    }
336
337    #[test]
338    fn test_multiple_stages_are_independent() {
339        let _guard = TEST_LOCK.lock().unwrap();
340        reset_counters();
341
342        update_buffer_gauge("test_buffer", 0, 10, 100);
343        update_buffer_gauge("test_buffer", 1, 20, 200);
344        let (events0, bytes0) = get_counter_values("test_buffer", 0);
345        let (events1, bytes1) = get_counter_values("test_buffer", 1);
346        assert_eq!(events0, 10);
347        assert_eq!(bytes0, 100);
348        assert_eq!(events1, 20);
349        assert_eq!(bytes1, 200);
350    }
351
352    #[test]
353    fn test_multithreaded_updates_are_correct() {
354        let _guard = TEST_LOCK
355            .lock()
356            .unwrap_or_else(std::sync::PoisonError::into_inner);
357
358        reset_counters();
359
360        let num_threads = 10;
361        let increments_per_thread = 1000;
362        let mut handles = vec![];
363
364        for _ in 0..num_threads {
365            let handle = thread::spawn(move || {
366                for _ in 0..increments_per_thread {
367                    update_buffer_gauge("test_buffer", 0, 1, 10);
368                }
369            });
370            handles.push(handle);
371        }
372
373        for handle in handles {
374            handle.join().unwrap();
375        }
376
377        let (final_events, final_bytes) = get_counter_values("test_buffer", 0);
378        let expected_events = i64::from(num_threads * increments_per_thread);
379        let expected_bytes = i64::from(num_threads * increments_per_thread * 10);
380
381        assert_eq!(final_events, expected_events);
382        assert_eq!(final_bytes, expected_bytes);
383    }
384
385    #[test]
386    fn test_large_values_capped_to_f64_safe_max() {
387        let _guard = TEST_LOCK
388            .lock()
389            .unwrap_or_else(std::sync::PoisonError::into_inner);
390
391        reset_counters();
392
393        update_buffer_gauge("test_buffer", 3, F64_SAFE_INT_MAX * 2, F64_SAFE_INT_MAX * 2);
394
395        let (events, bytes) = get_counter_values("test_buffer", 3);
396
397        assert!(events > F64_SAFE_INT_MAX);
398        assert!(bytes > F64_SAFE_INT_MAX);
399
400        let capped_events = events.min(F64_SAFE_INT_MAX);
401        let capped_bytes = bytes.min(F64_SAFE_INT_MAX);
402
403        assert_eq!(capped_events, F64_SAFE_INT_MAX);
404        assert_eq!(capped_bytes, F64_SAFE_INT_MAX);
405    }
406
407    #[test]
408    fn test_increment_with_recorder() {
409        assert_gauge_state("test_buffer", 0, &[(100, 2048), (200, 1024)], 300.0, 3072.0);
410    }
411
412    #[test]
413    fn test_should_not_be_negative_with_recorder() {
414        assert_gauge_state("test_buffer", 1, &[(100, 1024), (-200, -4096)], 0.0, 0.0);
415    }
416
417    #[test]
418    fn test_increment_with_custom_buffer_id() {
419        assert_gauge_state(
420            "buffer_alpha",
421            0,
422            &[(100, 2048), (200, 1024)],
423            300.0,
424            3072.0,
425        );
426    }
427
428    #[test]
429    fn test_increment_with_another_buffer_id() {
430        assert_gauge_state("buffer_beta", 0, &[(10, 100), (5, 50)], 15.0, 150.0);
431    }
432}