vector_buffers/
buffer_usage_data.rs

1use std::{
2    sync::{
3        atomic::{AtomicU64, Ordering},
4        Arc,
5    },
6    time::Duration,
7};
8
9use tokio::time::interval;
10use tracing::{Instrument, Span};
11use vector_common::internal_event::emit;
12
13use crate::{
14    internal_events::{BufferCreated, BufferEventsDropped, BufferEventsReceived, BufferEventsSent},
15    spawn_named,
16};
17
18/// Snapshot of category metrics.
19struct CategorySnapshot {
20    event_count: u64,
21    event_byte_size: u64,
22}
23
24impl CategorySnapshot {
25    /// Returns `true` if any of the values are non-zero.
26    fn has_updates(&self) -> bool {
27        self.event_count > 0 || self.event_byte_size > 0
28    }
29}
30
31/// Per-category metrics.
32///
33/// This tracks the number of events, and their size in the buffer, that a given category has interacted with. A
34/// category in this case could be something like the receive or send categories i.e. being written into the buffer, and
35/// then read out of the buffer. Overall, it's a simple grouping mechanism because we often want to track the change in
36/// both number of events, and their size as measured by the buffer.
37#[derive(Debug, Default)]
38struct CategoryMetrics {
39    event_count: AtomicU64,
40    event_byte_size: AtomicU64,
41}
42
43impl CategoryMetrics {
44    /// Increments the event count and byte size by the given amounts.
45    fn increment(&self, event_count: u64, event_byte_size: u64) {
46        self.event_count.fetch_add(event_count, Ordering::Relaxed);
47        self.event_byte_size
48            .fetch_add(event_byte_size, Ordering::Relaxed);
49    }
50
51    /// Sets the event count and event byte size to the given amount.
52    ///
53    /// Most updates are meant to be incremental, so this should be used sparingly.
54    fn set(&self, event_count: u64, event_byte_size: u64) {
55        self.event_count.store(event_count, Ordering::Release);
56        self.event_byte_size
57            .store(event_byte_size, Ordering::Release);
58    }
59
60    /// Gets a snapshot of the event count and event byte size.
61    fn get(&self) -> CategorySnapshot {
62        CategorySnapshot {
63            event_count: self.event_count.load(Ordering::Acquire),
64            event_byte_size: self.event_byte_size.load(Ordering::Acquire),
65        }
66    }
67
68    /// Gets a snapshot of the event count and event byte size by "consuming" the values.
69    ///
70    /// This essentially resets both metrics while capturing their value at the time they were reset. This is useful if
71    /// you want to only emit updates when values have been incremented/set to a non-zero value, as by consuming each
72    /// time, you can tell if anything has changed since the last call to `consume` without needing internal state to
73    /// track the last seen values.
74    fn consume(&self) -> CategorySnapshot {
75        CategorySnapshot {
76            event_count: self.event_count.swap(0, Ordering::AcqRel),
77            event_byte_size: self.event_byte_size.swap(0, Ordering::AcqRel),
78        }
79    }
80}
81
82/// Handle to buffer usage metrics for a specific buffer stage.
83#[derive(Clone, Debug)]
84pub struct BufferUsageHandle {
85    state: Arc<BufferUsageData>,
86}
87
88impl BufferUsageHandle {
89    /// Creates a no-op [`BufferUsageHandle`] handle.
90    ///
91    /// No usage data is written or stored.
92    pub(crate) fn noop() -> Self {
93        BufferUsageHandle {
94            state: Arc::new(BufferUsageData::new(0)),
95        }
96    }
97
98    /// Gets a snapshot of the buffer usage data, representing an instantaneous view of the different values.
99    pub fn snapshot(&self) -> BufferUsageSnapshot {
100        self.state.snapshot()
101    }
102
103    /// Sets the limits for this buffer component.
104    ///
105    /// Limits are exposed as gauges to provide stable values when superimposed on dashboards/graphs with the "actual"
106    /// usage amounts.
107    pub fn set_buffer_limits(&self, max_bytes: Option<u64>, max_events: Option<usize>) {
108        let max_events = max_events
109            .and_then(|n| u64::try_from(n).ok().or(Some(u64::MAX)))
110            .unwrap_or(0);
111        let max_bytes = max_bytes.unwrap_or(0);
112
113        self.state.max_size.set(max_events, max_bytes);
114    }
115
116    /// Increments the number of events (and their total size) received by this buffer component.
117    ///
118    /// This represents the events being sent into the buffer.
119    pub fn increment_received_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
120        self.state.received.increment(count, byte_size);
121    }
122
123    /// Increments the number of events (and their total size) sent by this buffer component.
124    ///
125    /// This represents the events being read out of the buffer.
126    pub fn increment_sent_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
127        self.state.sent.increment(count, byte_size);
128    }
129
130    /// Increment the number of dropped events (and their total size) for this buffer component.
131    pub fn increment_dropped_event_count_and_byte_size(
132        &self,
133        count: u64,
134        byte_size: u64,
135        intentional: bool,
136    ) {
137        if intentional {
138            self.state.dropped_intentional.increment(count, byte_size);
139        } else {
140            self.state.dropped.increment(count, byte_size);
141        }
142    }
143}
144
145#[derive(Debug, Default)]
146struct BufferUsageData {
147    idx: usize,
148    received: CategoryMetrics,
149    sent: CategoryMetrics,
150    dropped: CategoryMetrics,
151    dropped_intentional: CategoryMetrics,
152    max_size: CategoryMetrics,
153}
154
155impl BufferUsageData {
156    fn new(idx: usize) -> Self {
157        Self {
158            idx,
159            ..Default::default()
160        }
161    }
162
163    fn snapshot(&self) -> BufferUsageSnapshot {
164        let received = self.received.get();
165        let sent = self.sent.get();
166        let dropped = self.dropped.get();
167        let dropped_intentional = self.dropped_intentional.get();
168        let max_size = self.max_size.get();
169
170        BufferUsageSnapshot {
171            received_event_count: received.event_count,
172            received_byte_size: received.event_byte_size,
173            sent_event_count: sent.event_count,
174            sent_byte_size: sent.event_byte_size,
175            dropped_event_count: dropped.event_count,
176            dropped_event_byte_size: dropped.event_byte_size,
177            dropped_event_count_intentional: dropped_intentional.event_count,
178            dropped_event_byte_size_intentional: dropped_intentional.event_byte_size,
179            max_size_bytes: max_size.event_byte_size,
180            max_size_events: max_size
181                .event_count
182                .try_into()
183                .expect("should never be bigger than `usize`"),
184        }
185    }
186}
187
188/// Snapshot of buffer usage metrics.
189#[derive(Debug)]
190pub struct BufferUsageSnapshot {
191    pub received_event_count: u64,
192    pub received_byte_size: u64,
193    pub sent_event_count: u64,
194    pub sent_byte_size: u64,
195    pub dropped_event_count: u64,
196    pub dropped_event_byte_size: u64,
197    pub dropped_event_count_intentional: u64,
198    pub dropped_event_byte_size_intentional: u64,
199    pub max_size_bytes: u64,
200    pub max_size_events: usize,
201}
202
203/// Builder for tracking buffer usage metrics.
204///
205/// While building a buffer topology, `BufferUsage` can be utilized to create metrics storage for each individual buffer
206/// stage. A handle is provided to allow each buffer stage to update their metrics from one or multiple locations, as
207/// needed. Reporting of the metrics is handled centrally to keep buffer stages simpler and ensure consistent reporting.
208pub struct BufferUsage {
209    span: Span,
210    stages: Vec<Arc<BufferUsageData>>,
211}
212
213impl BufferUsage {
214    /// Creates an instance of [`BufferUsage`] attached to the given span.
215    ///
216    /// As buffers can have multiple stages, callers have the ability to register each stage via [`add_stage`].
217    pub fn from_span(span: Span) -> BufferUsage {
218        Self {
219            span,
220            stages: Vec::new(),
221        }
222    }
223
224    /// Adds a new stage to track usage for.
225    ///
226    /// A [`BufferUsageHandle`] is returned that the caller can use to actually update the usage metrics with.  This
227    /// handle will only update the usage metrics for the particular stage it was added for.
228    pub fn add_stage(&mut self, idx: usize) -> BufferUsageHandle {
229        let data = Arc::new(BufferUsageData::new(idx));
230        let handle = BufferUsageHandle {
231            state: Arc::clone(&data),
232        };
233
234        self.stages.push(data);
235        handle
236    }
237
238    /// Installs a reporter for the configured stages which periodically reports buffer usage metrics.
239    ///
240    /// Metrics are reported every 2 seconds.
241    ///
242    /// The `buffer_id` should be a unique name -- ideally the `component_id` of the sink using this buffer -- but is
243    /// not used for anything other than reporting, and so has no _requirement_ to be unique.
244    pub fn install(self, buffer_id: &str) {
245        let buffer_id = buffer_id.to_string();
246        let span = self.span;
247        let stages = self.stages;
248        let task_name = format!("buffer usage reporter ({buffer_id})");
249
250        let task = async move {
251            let mut interval = interval(Duration::from_secs(2));
252            loop {
253                interval.tick().await;
254
255                for stage in &stages {
256                    let max_size = stage.max_size.get();
257                    emit(BufferCreated {
258                        idx: stage.idx,
259                        max_size_bytes: max_size.event_byte_size,
260                        max_size_events: max_size
261                            .event_count
262                            .try_into()
263                            .expect("should never be bigger than `usize`"),
264                    });
265
266                    let received = stage.received.consume();
267                    if received.has_updates() {
268                        emit(BufferEventsReceived {
269                            buffer_id: buffer_id.clone(),
270                            idx: stage.idx,
271                            count: received.event_count,
272                            byte_size: received.event_byte_size,
273                        });
274                    }
275
276                    let sent = stage.sent.consume();
277                    if sent.has_updates() {
278                        emit(BufferEventsSent {
279                            buffer_id: buffer_id.clone(),
280                            idx: stage.idx,
281                            count: sent.event_count,
282                            byte_size: sent.event_byte_size,
283                        });
284                    }
285
286                    let dropped = stage.dropped.consume();
287                    if dropped.has_updates() {
288                        emit(BufferEventsDropped {
289                            buffer_id: buffer_id.clone(),
290                            idx: stage.idx,
291                            intentional: false,
292                            reason: "corrupted_events",
293                            count: dropped.event_count,
294                            byte_size: dropped.event_byte_size,
295                        });
296                    }
297
298                    let dropped_intentional = stage.dropped_intentional.consume();
299                    if dropped_intentional.has_updates() {
300                        emit(BufferEventsDropped {
301                            buffer_id: buffer_id.clone(),
302                            idx: stage.idx,
303                            intentional: true,
304                            reason: "drop_newest",
305                            count: dropped_intentional.event_count,
306                            byte_size: dropped_intentional.event_byte_size,
307                        });
308                    }
309                }
310            }
311        };
312
313        spawn_named(task.instrument(span.or_current()), task_name.as_str());
314    }
315}