vector_buffers/
buffer_usage_data.rs

1use std::{
2    sync::{
3        Arc,
4        atomic::{AtomicU64, Ordering},
5    },
6    time::Duration,
7};
8
9use tokio::time::interval;
10use tracing::{Instrument, Span};
11use vector_common::internal_event::emit;
12
13use crate::{
14    internal_events::{BufferCreated, BufferEventsDropped, BufferEventsReceived, BufferEventsSent},
15    spawn_named,
16};
17
18/// Since none of the values used with atomic operations are used to protect other values, we can
19/// always used a "relaxed" ordering when updating them.
20const ORDERING: Ordering = Ordering::Relaxed;
21
22/// Snapshot of category metrics.
23#[derive(Clone, Copy, Debug, Default)]
24struct CategorySnapshot {
25    event_count: u64,
26    event_byte_size: u64,
27}
28
29impl CategorySnapshot {
30    /// Returns `true` if any of the values are non-zero.
31    fn has_updates(&self) -> bool {
32        self.event_count > 0 || self.event_byte_size > 0
33    }
34}
35
36/// Per-category metrics.
37///
38/// This tracks the number of events, and their size in the buffer, that a given category has
39/// interacted with. A category in this case could be something like the receive or send categories
40/// i.e. being written into the buffer, and then read out of the buffer. Overall, it's a simple
41/// grouping mechanism because we often want to track the change in both number of events, and their
42/// size as measured by the buffer.
43///
44///  At a sustained 1 GiB/sec, which is still faster than Vector can currently achieve, a `u64` byte
45///  counter would take over 500 years to overflow. As such, we don't handle failures due to
46///  overflow as it is effectively impossible.
47#[derive(Debug, Default)]
48struct CategoryMetrics {
49    event_count: AtomicU64,
50    event_byte_size: AtomicU64,
51}
52
53impl CategoryMetrics {
54    /// Increments the event count and byte size by the given amounts.
55    fn increment(&self, event_count: u64, event_byte_size: u64) {
56        self.event_count.fetch_add(event_count, ORDERING);
57        self.event_byte_size.fetch_add(event_byte_size, ORDERING);
58    }
59
60    /// Sets the event count and event byte size to the given amount.
61    ///
62    /// Most updates are meant to be incremental, so this should be used sparingly.
63    fn set(&self, event_count: u64, event_byte_size: u64) {
64        self.event_count.store(event_count, ORDERING);
65        self.event_byte_size.store(event_byte_size, ORDERING);
66    }
67
68    /// Gets a snapshot of the event count and event byte size.
69    fn get(&self) -> CategorySnapshot {
70        CategorySnapshot {
71            event_count: self.event_count.load(ORDERING),
72            event_byte_size: self.event_byte_size.load(ORDERING),
73        }
74    }
75
76    /// Gets a snapshot of the event count and event byte size by "consuming" the values.
77    ///
78    /// This essentially resets both metrics while capturing their value at the time they were reset. This is useful if
79    /// you want to only emit updates when values have been incremented/set to a non-zero value, as by consuming each
80    /// time, you can tell if anything has changed since the last call to `consume` without needing internal state to
81    /// track the last seen values.
82    fn consume(&self) -> CategorySnapshot {
83        CategorySnapshot {
84            event_count: self.event_count.swap(0, ORDERING),
85            event_byte_size: self.event_byte_size.swap(0, ORDERING),
86        }
87    }
88}
89
90/// Running totals of events that have entered and left a buffer stage.
91///
92/// Each reporting tick consumes the latest deltas from the atomic counters and
93/// folds them into these cumulative totals.  The difference
94/// (`total_entered - total_left`) gives the approximate current buffer
95/// occupancy without requiring a separate "current size" counter that would
96/// itself be subject to cross-thread races.
97#[derive(Clone, Copy, Debug, Default)]
98struct ReporterCurrentMetrics {
99    total_entered: CategorySnapshot,
100    total_left: CategorySnapshot,
101}
102
103impl ReporterCurrentMetrics {
104    fn add_received(&mut self, snapshot: CategorySnapshot) {
105        self.total_entered.event_count = self
106            .total_entered
107            .event_count
108            .saturating_add(snapshot.event_count);
109        self.total_entered.event_byte_size = self
110            .total_entered
111            .event_byte_size
112            .saturating_add(snapshot.event_byte_size);
113    }
114
115    fn add_left(&mut self, snapshot: CategorySnapshot) {
116        self.total_left.event_count = self
117            .total_left
118            .event_count
119            .saturating_add(snapshot.event_count);
120        self.total_left.event_byte_size = self
121            .total_left
122            .event_byte_size
123            .saturating_add(snapshot.event_byte_size);
124    }
125
126    fn current(&self) -> CategorySnapshot {
127        CategorySnapshot {
128            event_count: self
129                .total_entered
130                .event_count
131                .saturating_sub(self.total_left.event_count),
132            event_byte_size: self
133                .total_entered
134                .event_byte_size
135                .saturating_sub(self.total_left.event_byte_size),
136        }
137    }
138}
139
140/// Handle to buffer usage metrics for a specific buffer stage.
141#[derive(Clone, Debug)]
142pub struct BufferUsageHandle {
143    state: Arc<BufferUsageData>,
144}
145
146impl BufferUsageHandle {
147    /// Creates a no-op [`BufferUsageHandle`] handle.
148    ///
149    /// No usage data is written or stored.
150    pub(crate) fn noop() -> Self {
151        BufferUsageHandle {
152            state: Arc::new(BufferUsageData::new(0)),
153        }
154    }
155
156    /// Gets a snapshot of the buffer usage data, representing an instantaneous view of the different values.
157    pub fn snapshot(&self) -> BufferUsageSnapshot {
158        self.state.snapshot()
159    }
160
161    /// Sets the limits for this buffer component.
162    ///
163    /// Limits are exposed as gauges to provide stable values when superimposed on dashboards/graphs with the "actual"
164    /// usage amounts.
165    pub fn set_buffer_limits(&self, max_bytes: Option<u64>, max_events: Option<usize>) {
166        let max_events = max_events
167            .and_then(|n| u64::try_from(n).ok().or(Some(u64::MAX)))
168            .unwrap_or(0);
169        let max_bytes = max_bytes.unwrap_or(0);
170
171        self.state.max_size.set(max_events, max_bytes);
172    }
173
174    /// Increments the number of events (and their total size) received by this buffer component.
175    ///
176    /// This represents the events being sent into the buffer.
177    pub fn increment_received_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
178        if count > 0 || byte_size > 0 {
179            self.state.received.increment(count, byte_size);
180        }
181    }
182
183    /// Increments the number of events (and their total size) sent by this buffer component.
184    ///
185    /// This represents the events being read out of the buffer.
186    pub fn increment_sent_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
187        if count > 0 || byte_size > 0 {
188            self.state.sent.increment(count, byte_size);
189        }
190    }
191
192    /// Increment the number of dropped events (and their total size) for this buffer component.
193    pub fn increment_dropped_event_count_and_byte_size(
194        &self,
195        count: u64,
196        byte_size: u64,
197        intentional: bool,
198    ) {
199        if count > 0 || byte_size > 0 {
200            if intentional {
201                self.state.dropped_intentional.increment(count, byte_size);
202            } else {
203                self.state.dropped.increment(count, byte_size);
204            }
205        }
206    }
207}
208
209#[derive(Debug, Default)]
210struct BufferUsageData {
211    idx: usize,
212    received: CategoryMetrics,
213    sent: CategoryMetrics,
214    dropped: CategoryMetrics,
215    dropped_intentional: CategoryMetrics,
216    max_size: CategoryMetrics,
217}
218
219impl BufferUsageData {
220    fn new(idx: usize) -> Self {
221        Self {
222            idx,
223            ..Default::default()
224        }
225    }
226
227    fn snapshot(&self) -> BufferUsageSnapshot {
228        let received = self.received.get();
229        let sent = self.sent.get();
230        let dropped = self.dropped.get();
231        let dropped_intentional = self.dropped_intentional.get();
232        let max_size = self.max_size.get();
233
234        BufferUsageSnapshot {
235            received_event_count: received.event_count,
236            received_byte_size: received.event_byte_size,
237            sent_event_count: sent.event_count,
238            sent_byte_size: sent.event_byte_size,
239            dropped_event_count: dropped.event_count,
240            dropped_event_byte_size: dropped.event_byte_size,
241            dropped_event_count_intentional: dropped_intentional.event_count,
242            dropped_event_byte_size_intentional: dropped_intentional.event_byte_size,
243            max_size_bytes: max_size.event_byte_size,
244            max_size_events: max_size
245                .event_count
246                .try_into()
247                .expect("should never be bigger than `usize`"),
248        }
249    }
250
251    fn report(&self, current_metrics: &mut ReporterCurrentMetrics, buffer_id: &str) {
252        let max_size = self.max_size.get();
253        emit(BufferCreated {
254            buffer_id: buffer_id.to_string(),
255            idx: self.idx,
256            max_size_bytes: max_size.event_byte_size,
257            max_size_events: max_size
258                .event_count
259                .try_into()
260                .expect("should never be bigger than `usize`"),
261        });
262
263        // Consume received before sent/dropped so that, in the presence of
264        // a race between producers and consumers, the computed current usage
265        // will err on the side of overcounting, which is more likely to be an
266        // accurate representation of the current usage than undercounting.
267        let received = self.received.consume();
268        current_metrics.add_received(received);
269
270        let sent = self.sent.consume();
271        current_metrics.add_left(sent);
272
273        let dropped = self.dropped.consume();
274        current_metrics.add_left(dropped);
275
276        let dropped_intentional = self.dropped_intentional.consume();
277        current_metrics.add_left(dropped_intentional);
278
279        let current = current_metrics.current();
280
281        if received.has_updates() {
282            emit(BufferEventsReceived {
283                buffer_id: buffer_id.to_string(),
284                idx: self.idx,
285                count: received.event_count,
286                byte_size: received.event_byte_size,
287                total_count: current.event_count,
288                total_byte_size: current.event_byte_size,
289            });
290        }
291
292        if sent.has_updates() {
293            emit(BufferEventsSent {
294                buffer_id: buffer_id.to_string(),
295                idx: self.idx,
296                count: sent.event_count,
297                byte_size: sent.event_byte_size,
298                total_count: current.event_count,
299                total_byte_size: current.event_byte_size,
300            });
301        }
302
303        if dropped.has_updates() {
304            emit(BufferEventsDropped {
305                buffer_id: buffer_id.to_string(),
306                idx: self.idx,
307                intentional: false,
308                reason: "corrupted_events",
309                count: dropped.event_count,
310                byte_size: dropped.event_byte_size,
311                total_count: current.event_count,
312                total_byte_size: current.event_byte_size,
313            });
314        }
315
316        if dropped_intentional.has_updates() {
317            emit(BufferEventsDropped {
318                buffer_id: buffer_id.to_string(),
319                idx: self.idx,
320                intentional: true,
321                reason: "drop_newest",
322                count: dropped_intentional.event_count,
323                byte_size: dropped_intentional.event_byte_size,
324                total_count: current.event_count,
325                total_byte_size: current.event_byte_size,
326            });
327        }
328    }
329}
330
331/// Snapshot of buffer usage metrics.
332#[derive(Debug)]
333pub struct BufferUsageSnapshot {
334    pub received_event_count: u64,
335    pub received_byte_size: u64,
336    pub sent_event_count: u64,
337    pub sent_byte_size: u64,
338    pub dropped_event_count: u64,
339    pub dropped_event_byte_size: u64,
340    pub dropped_event_count_intentional: u64,
341    pub dropped_event_byte_size_intentional: u64,
342    pub max_size_bytes: u64,
343    pub max_size_events: usize,
344}
345
346/// Builder for tracking buffer usage metrics.
347///
348/// While building a buffer topology, `BufferUsage` can be utilized to create metrics storage for each individual buffer
349/// stage. A handle is provided to allow each buffer stage to update their metrics from one or multiple locations, as
350/// needed. Reporting of the metrics is handled centrally to keep buffer stages simpler and ensure consistent reporting.
351pub struct BufferUsage {
352    span: Span,
353    stages: Vec<Arc<BufferUsageData>>,
354}
355
356impl BufferUsage {
357    /// Creates an instance of [`BufferUsage`] attached to the given span.
358    ///
359    /// As buffers can have multiple stages, callers have the ability to register each stage via [`add_stage`].
360    pub fn from_span(span: Span) -> BufferUsage {
361        Self {
362            span,
363            stages: Vec::new(),
364        }
365    }
366
367    /// Adds a new stage to track usage for.
368    ///
369    /// A [`BufferUsageHandle`] is returned that the caller can use to actually update the usage metrics with.  This
370    /// handle will only update the usage metrics for the particular stage it was added for.
371    pub fn add_stage(&mut self, idx: usize) -> BufferUsageHandle {
372        let data = Arc::new(BufferUsageData::new(idx));
373        let handle = BufferUsageHandle {
374            state: Arc::clone(&data),
375        };
376
377        self.stages.push(data);
378        handle
379    }
380
381    /// Installs a reporter for the configured stages which periodically reports buffer usage metrics.
382    ///
383    /// Metrics are reported every 2 seconds.
384    ///
385    /// The `buffer_id` should be a unique name -- ideally the `component_id` of the sink using this buffer -- but is
386    /// not used for anything other than reporting, and so has no _requirement_ to be unique.
387    pub fn install(self, buffer_id: &str) {
388        let buffer_id = buffer_id.to_string();
389        let span = self.span;
390        let stages: Vec<_> = self
391            .stages
392            .into_iter()
393            .map(|stage| (stage, ReporterCurrentMetrics::default()))
394            .collect();
395        let task_name = format!("buffer usage reporter ({buffer_id})");
396
397        let task = Self::report_buffer_usage(stages, buffer_id).instrument(span.or_current());
398        spawn_named(task, task_name.as_str());
399    }
400
401    async fn report_buffer_usage(
402        mut stages: Vec<(Arc<BufferUsageData>, ReporterCurrentMetrics)>,
403        buffer_id: String,
404    ) {
405        let mut interval = interval(Duration::from_secs(2));
406        loop {
407            interval.tick().await;
408
409            for (stage, current_metrics) in &mut stages {
410                stage.report(current_metrics, &buffer_id);
411            }
412        }
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    #[test]
421    fn reporter_current_usage_is_derived_from_entered_and_left_totals() {
422        let mut current = ReporterCurrentMetrics::default();
423        current.add_received(CategorySnapshot {
424            event_count: 10,
425            event_byte_size: 1000,
426        });
427        current.add_left(CategorySnapshot {
428            event_count: 3,
429            event_byte_size: 300,
430        });
431        current.add_left(CategorySnapshot {
432            event_count: 2,
433            event_byte_size: 200,
434        });
435
436        let current = current.current();
437        assert_eq!(current.event_count, 5);
438        assert_eq!(current.event_byte_size, 500);
439    }
440
441    #[test]
442    fn reporter_current_usage_preserves_underflow_debt() {
443        let mut current = ReporterCurrentMetrics::default();
444        current.add_left(CategorySnapshot {
445            event_count: 10,
446            event_byte_size: 1000,
447        });
448        current.add_received(CategorySnapshot {
449            event_count: 15,
450            event_byte_size: 1500,
451        });
452
453        let current = current.current();
454        assert_eq!(current.event_count, 5);
455        assert_eq!(current.event_byte_size, 500);
456    }
457
458    #[test]
459    fn consume_resets_deltas_between_ticks() {
460        let data = BufferUsageData::new(0);
461        let mut metrics = ReporterCurrentMetrics::default();
462
463        data.received.increment(10, 1000);
464        data.sent.increment(3, 300);
465        data.report(&mut metrics, "test");
466        let current = metrics.current();
467        assert_eq!(current.event_count, 7);
468        assert_eq!(current.event_byte_size, 700);
469
470        // Second tick with no new activity should report the same totals.
471        data.report(&mut metrics, "test");
472        let current = metrics.current();
473        assert_eq!(current.event_count, 7);
474        assert_eq!(current.event_byte_size, 700);
475    }
476
477    #[test]
478    fn accumulates_across_multiple_ticks() {
479        let data = BufferUsageData::new(0);
480        let mut metrics = ReporterCurrentMetrics::default();
481
482        data.received.increment(10, 1000);
483        data.report(&mut metrics, "test");
484
485        data.received.increment(5, 500);
486        data.sent.increment(8, 800);
487        data.report(&mut metrics, "test");
488        let current = metrics.current();
489        assert_eq!(current.event_count, 7);
490        assert_eq!(current.event_byte_size, 700);
491    }
492
493    #[test]
494    fn drops_count_as_leaving_the_buffer() {
495        let data = BufferUsageData::new(0);
496        let mut metrics = ReporterCurrentMetrics::default();
497
498        data.received.increment(20, 2000);
499        data.sent.increment(5, 500);
500        data.dropped.increment(3, 300);
501        data.dropped_intentional.increment(2, 200);
502
503        data.report(&mut metrics, "test");
504        let current = metrics.current();
505        assert_eq!(current.event_count, 10);
506        assert_eq!(current.event_byte_size, 1000);
507    }
508}