vector_buffers/
buffer_usage_data.rs

1use std::{
2    sync::{
3        Arc,
4        atomic::{AtomicU64, Ordering},
5    },
6    time::Duration,
7};
8
9use tokio::time::interval;
10use tracing::{Instrument, Span};
11use vector_common::internal_event::emit;
12
13use crate::{
14    internal_events::{BufferCreated, BufferEventsDropped, BufferEventsReceived, BufferEventsSent},
15    spawn_named,
16};
17
18/// Since none of the values used with atomic operations are used to protect other values, we can
19/// always used a "relaxed" ordering when updating them.
20const ORDERING: Ordering = Ordering::Relaxed;
21
22fn increment_counter(counter: &AtomicU64, delta: u64) {
23    counter
24        .fetch_update(ORDERING, ORDERING, |current| {
25            Some(current.checked_add(delta).unwrap_or_else(|| {
26                warn!(
27                    current,
28                    delta, "Buffer counter overflowed. Clamping value to `u64::MAX`."
29                );
30                u64::MAX
31            }))
32        })
33        .ok();
34}
35
36fn decrement_counter(counter: &AtomicU64, delta: u64) {
37    counter
38        .fetch_update(ORDERING, ORDERING, |current| {
39            Some(current.checked_sub(delta).unwrap_or_else(|| {
40                warn!(
41                    current,
42                    delta, "Buffer counter underflowed. Clamping value to `0`."
43                );
44                0
45            }))
46        })
47        .ok();
48}
49
50/// Snapshot of category metrics.
51struct CategorySnapshot {
52    event_count: u64,
53    event_byte_size: u64,
54}
55
56impl CategorySnapshot {
57    /// Returns `true` if any of the values are non-zero.
58    fn has_updates(&self) -> bool {
59        self.event_count > 0 || self.event_byte_size > 0
60    }
61}
62
63/// Per-category metrics.
64///
65/// This tracks the number of events, and their size in the buffer, that a given category has interacted with. A
66/// category in this case could be something like the receive or send categories i.e. being written into the buffer, and
67/// then read out of the buffer. Overall, it's a simple grouping mechanism because we often want to track the change in
68/// both number of events, and their size as measured by the buffer.
69#[derive(Debug, Default)]
70struct CategoryMetrics {
71    event_count: AtomicU64,
72    event_byte_size: AtomicU64,
73}
74
75impl CategoryMetrics {
76    /// Increments the event count and byte size by the given amounts.
77    fn increment(&self, event_count: u64, event_byte_size: u64) {
78        increment_counter(&self.event_count, event_count);
79        increment_counter(&self.event_byte_size, event_byte_size);
80    }
81
82    /// Decrements the event count and byte size by the given amounts.
83    fn decrement(&self, event_count: u64, event_byte_size: u64) {
84        decrement_counter(&self.event_count, event_count);
85        decrement_counter(&self.event_byte_size, event_byte_size);
86    }
87
88    /// Sets the event count and event byte size to the given amount.
89    ///
90    /// Most updates are meant to be incremental, so this should be used sparingly.
91    fn set(&self, event_count: u64, event_byte_size: u64) {
92        self.event_count.store(event_count, ORDERING);
93        self.event_byte_size.store(event_byte_size, ORDERING);
94    }
95
96    /// Gets a snapshot of the event count and event byte size.
97    fn get(&self) -> CategorySnapshot {
98        CategorySnapshot {
99            event_count: self.event_count.load(ORDERING),
100            event_byte_size: self.event_byte_size.load(ORDERING),
101        }
102    }
103
104    /// Gets a snapshot of the event count and event byte size by "consuming" the values.
105    ///
106    /// This essentially resets both metrics while capturing their value at the time they were reset. This is useful if
107    /// you want to only emit updates when values have been incremented/set to a non-zero value, as by consuming each
108    /// time, you can tell if anything has changed since the last call to `consume` without needing internal state to
109    /// track the last seen values.
110    fn consume(&self) -> CategorySnapshot {
111        CategorySnapshot {
112            event_count: self.event_count.swap(0, ORDERING),
113            event_byte_size: self.event_byte_size.swap(0, ORDERING),
114        }
115    }
116}
117
118/// Handle to buffer usage metrics for a specific buffer stage.
119#[derive(Clone, Debug)]
120pub struct BufferUsageHandle {
121    state: Arc<BufferUsageData>,
122}
123
124impl BufferUsageHandle {
125    /// Creates a no-op [`BufferUsageHandle`] handle.
126    ///
127    /// No usage data is written or stored.
128    pub(crate) fn noop() -> Self {
129        BufferUsageHandle {
130            state: Arc::new(BufferUsageData::new(0)),
131        }
132    }
133
134    /// Gets a snapshot of the buffer usage data, representing an instantaneous view of the different values.
135    pub fn snapshot(&self) -> BufferUsageSnapshot {
136        self.state.snapshot()
137    }
138
139    /// Sets the limits for this buffer component.
140    ///
141    /// Limits are exposed as gauges to provide stable values when superimposed on dashboards/graphs with the "actual"
142    /// usage amounts.
143    pub fn set_buffer_limits(&self, max_bytes: Option<u64>, max_events: Option<usize>) {
144        let max_events = max_events
145            .and_then(|n| u64::try_from(n).ok().or(Some(u64::MAX)))
146            .unwrap_or(0);
147        let max_bytes = max_bytes.unwrap_or(0);
148
149        self.state.max_size.set(max_events, max_bytes);
150    }
151
152    /// Increments the number of events (and their total size) received by this buffer component.
153    ///
154    /// This represents the events being sent into the buffer.
155    pub fn increment_received_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
156        if count > 0 || byte_size > 0 {
157            self.state.received.increment(count, byte_size);
158            self.state.current.increment(count, byte_size);
159        }
160    }
161
162    /// Increments the number of events (and their total size) sent by this buffer component.
163    ///
164    /// This represents the events being read out of the buffer.
165    pub fn increment_sent_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
166        if count > 0 || byte_size > 0 {
167            self.state.sent.increment(count, byte_size);
168            self.state.current.decrement(count, byte_size);
169        }
170    }
171
172    /// Increment the number of dropped events (and their total size) for this buffer component.
173    pub fn increment_dropped_event_count_and_byte_size(
174        &self,
175        count: u64,
176        byte_size: u64,
177        intentional: bool,
178    ) {
179        if count > 0 || byte_size > 0 {
180            if intentional {
181                self.state.dropped_intentional.increment(count, byte_size);
182            } else {
183                self.state.dropped.increment(count, byte_size);
184            }
185            self.state.current.decrement(count, byte_size);
186        }
187    }
188}
189
190#[derive(Debug, Default)]
191struct BufferUsageData {
192    idx: usize,
193    received: CategoryMetrics,
194    sent: CategoryMetrics,
195    dropped: CategoryMetrics,
196    dropped_intentional: CategoryMetrics,
197    max_size: CategoryMetrics,
198    current: CategoryMetrics,
199}
200
201impl BufferUsageData {
202    fn new(idx: usize) -> Self {
203        Self {
204            idx,
205            ..Default::default()
206        }
207    }
208
209    fn snapshot(&self) -> BufferUsageSnapshot {
210        let received = self.received.get();
211        let sent = self.sent.get();
212        let dropped = self.dropped.get();
213        let dropped_intentional = self.dropped_intentional.get();
214        let max_size = self.max_size.get();
215
216        BufferUsageSnapshot {
217            received_event_count: received.event_count,
218            received_byte_size: received.event_byte_size,
219            sent_event_count: sent.event_count,
220            sent_byte_size: sent.event_byte_size,
221            dropped_event_count: dropped.event_count,
222            dropped_event_byte_size: dropped.event_byte_size,
223            dropped_event_count_intentional: dropped_intentional.event_count,
224            dropped_event_byte_size_intentional: dropped_intentional.event_byte_size,
225            max_size_bytes: max_size.event_byte_size,
226            max_size_events: max_size
227                .event_count
228                .try_into()
229                .expect("should never be bigger than `usize`"),
230        }
231    }
232}
233
234/// Snapshot of buffer usage metrics.
235#[derive(Debug)]
236pub struct BufferUsageSnapshot {
237    pub received_event_count: u64,
238    pub received_byte_size: u64,
239    pub sent_event_count: u64,
240    pub sent_byte_size: u64,
241    pub dropped_event_count: u64,
242    pub dropped_event_byte_size: u64,
243    pub dropped_event_count_intentional: u64,
244    pub dropped_event_byte_size_intentional: u64,
245    pub max_size_bytes: u64,
246    pub max_size_events: usize,
247}
248
249/// Builder for tracking buffer usage metrics.
250///
251/// While building a buffer topology, `BufferUsage` can be utilized to create metrics storage for each individual buffer
252/// stage. A handle is provided to allow each buffer stage to update their metrics from one or multiple locations, as
253/// needed. Reporting of the metrics is handled centrally to keep buffer stages simpler and ensure consistent reporting.
254pub struct BufferUsage {
255    span: Span,
256    stages: Vec<Arc<BufferUsageData>>,
257}
258
259impl BufferUsage {
260    /// Creates an instance of [`BufferUsage`] attached to the given span.
261    ///
262    /// As buffers can have multiple stages, callers have the ability to register each stage via [`add_stage`].
263    pub fn from_span(span: Span) -> BufferUsage {
264        Self {
265            span,
266            stages: Vec::new(),
267        }
268    }
269
270    /// Adds a new stage to track usage for.
271    ///
272    /// A [`BufferUsageHandle`] is returned that the caller can use to actually update the usage metrics with.  This
273    /// handle will only update the usage metrics for the particular stage it was added for.
274    pub fn add_stage(&mut self, idx: usize) -> BufferUsageHandle {
275        let data = Arc::new(BufferUsageData::new(idx));
276        let handle = BufferUsageHandle {
277            state: Arc::clone(&data),
278        };
279
280        self.stages.push(data);
281        handle
282    }
283
284    /// Installs a reporter for the configured stages which periodically reports buffer usage metrics.
285    ///
286    /// Metrics are reported every 2 seconds.
287    ///
288    /// The `buffer_id` should be a unique name -- ideally the `component_id` of the sink using this buffer -- but is
289    /// not used for anything other than reporting, and so has no _requirement_ to be unique.
290    pub fn install(self, buffer_id: &str) {
291        let buffer_id = buffer_id.to_string();
292        let span = self.span;
293        let stages = self.stages;
294        let task_name = format!("buffer usage reporter ({buffer_id})");
295
296        let task = async move {
297            let mut interval = interval(Duration::from_secs(2));
298            loop {
299                interval.tick().await;
300
301                for stage in &stages {
302                    let max_size = stage.max_size.get();
303                    emit(BufferCreated {
304                        buffer_id: buffer_id.clone(),
305                        idx: stage.idx,
306                        max_size_bytes: max_size.event_byte_size,
307                        max_size_events: max_size
308                            .event_count
309                            .try_into()
310                            .expect("should never be bigger than `usize`"),
311                    });
312
313                    let current = stage.current.get();
314                    let received = stage.received.consume();
315                    if received.has_updates() {
316                        emit(BufferEventsReceived {
317                            buffer_id: buffer_id.clone(),
318                            idx: stage.idx,
319                            count: received.event_count,
320                            byte_size: received.event_byte_size,
321                            total_count: current.event_count,
322                            total_byte_size: current.event_byte_size,
323                        });
324                    }
325
326                    let sent = stage.sent.consume();
327                    if sent.has_updates() {
328                        emit(BufferEventsSent {
329                            buffer_id: buffer_id.clone(),
330                            idx: stage.idx,
331                            count: sent.event_count,
332                            byte_size: sent.event_byte_size,
333                            total_count: current.event_count,
334                            total_byte_size: current.event_byte_size,
335                        });
336                    }
337
338                    let dropped = stage.dropped.consume();
339                    if dropped.has_updates() {
340                        emit(BufferEventsDropped {
341                            buffer_id: buffer_id.clone(),
342                            idx: stage.idx,
343                            intentional: false,
344                            reason: "corrupted_events",
345                            count: dropped.event_count,
346                            byte_size: dropped.event_byte_size,
347                            total_count: current.event_count,
348                            total_byte_size: current.event_byte_size,
349                        });
350                    }
351
352                    let dropped_intentional = stage.dropped_intentional.consume();
353                    if dropped_intentional.has_updates() {
354                        emit(BufferEventsDropped {
355                            buffer_id: buffer_id.clone(),
356                            idx: stage.idx,
357                            intentional: true,
358                            reason: "drop_newest",
359                            count: dropped_intentional.event_count,
360                            byte_size: dropped_intentional.event_byte_size,
361                            total_count: current.event_count,
362                            total_byte_size: current.event_byte_size,
363                        });
364                    }
365                }
366            }
367        };
368
369        spawn_named(task.instrument(span.or_current()), task_name.as_str());
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use std::thread;
376
377    use super::*;
378
379    #[test]
380    fn test_multithreaded_updates_are_correct() {
381        const NUM_THREADS: u64 = 16;
382        const INCREMENTS_PER_THREAD: u64 = 10_000;
383
384        let counter = Arc::new(AtomicU64::new(0));
385
386        let mut handles = vec![];
387
388        for _ in 0..NUM_THREADS {
389            let counter = Arc::clone(&counter);
390            let handle = thread::spawn(move || {
391                for _ in 0..INCREMENTS_PER_THREAD {
392                    increment_counter(&counter, 1);
393                    decrement_counter(&counter, 1);
394                }
395            });
396            handles.push(handle);
397        }
398
399        for handle in handles {
400            handle.join().unwrap();
401        }
402
403        assert_eq!(counter.load(ORDERING), 0);
404    }
405
406    #[test]
407    fn test_decrement_counter_prevents_negatives() {
408        let counter = AtomicU64::new(100);
409
410        decrement_counter(&counter, 50);
411        assert_eq!(counter.load(ORDERING), 50);
412
413        decrement_counter(&counter, 100);
414        assert_eq!(counter.load(ORDERING), 0);
415
416        decrement_counter(&counter, 50);
417        assert_eq!(counter.load(ORDERING), 0);
418
419        decrement_counter(&counter, u64::MAX);
420        assert_eq!(counter.load(ORDERING), 0);
421    }
422
423    #[test]
424    fn test_increment_counter_prevents_overflow() {
425        let counter = AtomicU64::new(u64::MAX - 2);
426
427        increment_counter(&counter, 1);
428        assert_eq!(counter.load(ORDERING), u64::MAX - 1);
429
430        increment_counter(&counter, 1);
431        assert_eq!(counter.load(ORDERING), u64::MAX);
432
433        increment_counter(&counter, 1);
434        assert_eq!(counter.load(ORDERING), u64::MAX);
435
436        increment_counter(&counter, u64::MAX);
437        assert_eq!(counter.load(ORDERING), u64::MAX);
438    }
439}