1use std::{
2 sync::{
3 Arc,
4 atomic::{AtomicU64, Ordering},
5 },
6 time::Duration,
7};
8
9use tokio::time::interval;
10use tracing::{Instrument, Span};
11use vector_common::internal_event::emit;
12
13use crate::{
14 internal_events::{BufferCreated, BufferEventsDropped, BufferEventsReceived, BufferEventsSent},
15 spawn_named,
16};
17
18const ORDERING: Ordering = Ordering::Relaxed;
21
22fn increment_counter(counter: &AtomicU64, delta: u64) {
23 counter
24 .fetch_update(ORDERING, ORDERING, |current| {
25 Some(current.checked_add(delta).unwrap_or_else(|| {
26 warn!(
27 current,
28 delta, "Buffer counter overflowed. Clamping value to `u64::MAX`."
29 );
30 u64::MAX
31 }))
32 })
33 .ok();
34}
35
36fn decrement_counter(counter: &AtomicU64, delta: u64) {
37 counter
38 .fetch_update(ORDERING, ORDERING, |current| {
39 Some(current.checked_sub(delta).unwrap_or_else(|| {
40 warn!(
41 current,
42 delta, "Buffer counter underflowed. Clamping value to `0`."
43 );
44 0
45 }))
46 })
47 .ok();
48}
49
50struct CategorySnapshot {
52 event_count: u64,
53 event_byte_size: u64,
54}
55
56impl CategorySnapshot {
57 fn has_updates(&self) -> bool {
59 self.event_count > 0 || self.event_byte_size > 0
60 }
61}
62
63#[derive(Debug, Default)]
70struct CategoryMetrics {
71 event_count: AtomicU64,
72 event_byte_size: AtomicU64,
73}
74
75impl CategoryMetrics {
76 fn increment(&self, event_count: u64, event_byte_size: u64) {
78 increment_counter(&self.event_count, event_count);
79 increment_counter(&self.event_byte_size, event_byte_size);
80 }
81
82 fn decrement(&self, event_count: u64, event_byte_size: u64) {
84 decrement_counter(&self.event_count, event_count);
85 decrement_counter(&self.event_byte_size, event_byte_size);
86 }
87
88 fn set(&self, event_count: u64, event_byte_size: u64) {
92 self.event_count.store(event_count, ORDERING);
93 self.event_byte_size.store(event_byte_size, ORDERING);
94 }
95
96 fn get(&self) -> CategorySnapshot {
98 CategorySnapshot {
99 event_count: self.event_count.load(ORDERING),
100 event_byte_size: self.event_byte_size.load(ORDERING),
101 }
102 }
103
104 fn consume(&self) -> CategorySnapshot {
111 CategorySnapshot {
112 event_count: self.event_count.swap(0, ORDERING),
113 event_byte_size: self.event_byte_size.swap(0, ORDERING),
114 }
115 }
116}
117
118#[derive(Clone, Debug)]
120pub struct BufferUsageHandle {
121 state: Arc<BufferUsageData>,
122}
123
124impl BufferUsageHandle {
125 pub(crate) fn noop() -> Self {
129 BufferUsageHandle {
130 state: Arc::new(BufferUsageData::new(0)),
131 }
132 }
133
134 pub fn snapshot(&self) -> BufferUsageSnapshot {
136 self.state.snapshot()
137 }
138
139 pub fn set_buffer_limits(&self, max_bytes: Option<u64>, max_events: Option<usize>) {
144 let max_events = max_events
145 .and_then(|n| u64::try_from(n).ok().or(Some(u64::MAX)))
146 .unwrap_or(0);
147 let max_bytes = max_bytes.unwrap_or(0);
148
149 self.state.max_size.set(max_events, max_bytes);
150 }
151
152 pub fn increment_received_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
156 if count > 0 || byte_size > 0 {
157 self.state.received.increment(count, byte_size);
158 self.state.current.increment(count, byte_size);
159 }
160 }
161
162 pub fn increment_sent_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
166 if count > 0 || byte_size > 0 {
167 self.state.sent.increment(count, byte_size);
168 self.state.current.decrement(count, byte_size);
169 }
170 }
171
172 pub fn increment_dropped_event_count_and_byte_size(
174 &self,
175 count: u64,
176 byte_size: u64,
177 intentional: bool,
178 ) {
179 if count > 0 || byte_size > 0 {
180 if intentional {
181 self.state.dropped_intentional.increment(count, byte_size);
182 } else {
183 self.state.dropped.increment(count, byte_size);
184 }
185 self.state.current.decrement(count, byte_size);
186 }
187 }
188}
189
190#[derive(Debug, Default)]
191struct BufferUsageData {
192 idx: usize,
193 received: CategoryMetrics,
194 sent: CategoryMetrics,
195 dropped: CategoryMetrics,
196 dropped_intentional: CategoryMetrics,
197 max_size: CategoryMetrics,
198 current: CategoryMetrics,
199}
200
201impl BufferUsageData {
202 fn new(idx: usize) -> Self {
203 Self {
204 idx,
205 ..Default::default()
206 }
207 }
208
209 fn snapshot(&self) -> BufferUsageSnapshot {
210 let received = self.received.get();
211 let sent = self.sent.get();
212 let dropped = self.dropped.get();
213 let dropped_intentional = self.dropped_intentional.get();
214 let max_size = self.max_size.get();
215
216 BufferUsageSnapshot {
217 received_event_count: received.event_count,
218 received_byte_size: received.event_byte_size,
219 sent_event_count: sent.event_count,
220 sent_byte_size: sent.event_byte_size,
221 dropped_event_count: dropped.event_count,
222 dropped_event_byte_size: dropped.event_byte_size,
223 dropped_event_count_intentional: dropped_intentional.event_count,
224 dropped_event_byte_size_intentional: dropped_intentional.event_byte_size,
225 max_size_bytes: max_size.event_byte_size,
226 max_size_events: max_size
227 .event_count
228 .try_into()
229 .expect("should never be bigger than `usize`"),
230 }
231 }
232}
233
234#[derive(Debug)]
236pub struct BufferUsageSnapshot {
237 pub received_event_count: u64,
238 pub received_byte_size: u64,
239 pub sent_event_count: u64,
240 pub sent_byte_size: u64,
241 pub dropped_event_count: u64,
242 pub dropped_event_byte_size: u64,
243 pub dropped_event_count_intentional: u64,
244 pub dropped_event_byte_size_intentional: u64,
245 pub max_size_bytes: u64,
246 pub max_size_events: usize,
247}
248
249pub struct BufferUsage {
255 span: Span,
256 stages: Vec<Arc<BufferUsageData>>,
257}
258
259impl BufferUsage {
260 pub fn from_span(span: Span) -> BufferUsage {
264 Self {
265 span,
266 stages: Vec::new(),
267 }
268 }
269
270 pub fn add_stage(&mut self, idx: usize) -> BufferUsageHandle {
275 let data = Arc::new(BufferUsageData::new(idx));
276 let handle = BufferUsageHandle {
277 state: Arc::clone(&data),
278 };
279
280 self.stages.push(data);
281 handle
282 }
283
284 pub fn install(self, buffer_id: &str) {
291 let buffer_id = buffer_id.to_string();
292 let span = self.span;
293 let stages = self.stages;
294 let task_name = format!("buffer usage reporter ({buffer_id})");
295
296 let task = async move {
297 let mut interval = interval(Duration::from_secs(2));
298 loop {
299 interval.tick().await;
300
301 for stage in &stages {
302 let max_size = stage.max_size.get();
303 emit(BufferCreated {
304 buffer_id: buffer_id.clone(),
305 idx: stage.idx,
306 max_size_bytes: max_size.event_byte_size,
307 max_size_events: max_size
308 .event_count
309 .try_into()
310 .expect("should never be bigger than `usize`"),
311 });
312
313 let current = stage.current.get();
314 let received = stage.received.consume();
315 if received.has_updates() {
316 emit(BufferEventsReceived {
317 buffer_id: buffer_id.clone(),
318 idx: stage.idx,
319 count: received.event_count,
320 byte_size: received.event_byte_size,
321 total_count: current.event_count,
322 total_byte_size: current.event_byte_size,
323 });
324 }
325
326 let sent = stage.sent.consume();
327 if sent.has_updates() {
328 emit(BufferEventsSent {
329 buffer_id: buffer_id.clone(),
330 idx: stage.idx,
331 count: sent.event_count,
332 byte_size: sent.event_byte_size,
333 total_count: current.event_count,
334 total_byte_size: current.event_byte_size,
335 });
336 }
337
338 let dropped = stage.dropped.consume();
339 if dropped.has_updates() {
340 emit(BufferEventsDropped {
341 buffer_id: buffer_id.clone(),
342 idx: stage.idx,
343 intentional: false,
344 reason: "corrupted_events",
345 count: dropped.event_count,
346 byte_size: dropped.event_byte_size,
347 total_count: current.event_count,
348 total_byte_size: current.event_byte_size,
349 });
350 }
351
352 let dropped_intentional = stage.dropped_intentional.consume();
353 if dropped_intentional.has_updates() {
354 emit(BufferEventsDropped {
355 buffer_id: buffer_id.clone(),
356 idx: stage.idx,
357 intentional: true,
358 reason: "drop_newest",
359 count: dropped_intentional.event_count,
360 byte_size: dropped_intentional.event_byte_size,
361 total_count: current.event_count,
362 total_byte_size: current.event_byte_size,
363 });
364 }
365 }
366 }
367 };
368
369 spawn_named(task.instrument(span.or_current()), task_name.as_str());
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use std::thread;
376
377 use super::*;
378
379 #[test]
380 fn test_multithreaded_updates_are_correct() {
381 const NUM_THREADS: u64 = 16;
382 const INCREMENTS_PER_THREAD: u64 = 10_000;
383
384 let counter = Arc::new(AtomicU64::new(0));
385
386 let mut handles = vec![];
387
388 for _ in 0..NUM_THREADS {
389 let counter = Arc::clone(&counter);
390 let handle = thread::spawn(move || {
391 for _ in 0..INCREMENTS_PER_THREAD {
392 increment_counter(&counter, 1);
393 decrement_counter(&counter, 1);
394 }
395 });
396 handles.push(handle);
397 }
398
399 for handle in handles {
400 handle.join().unwrap();
401 }
402
403 assert_eq!(counter.load(ORDERING), 0);
404 }
405
406 #[test]
407 fn test_decrement_counter_prevents_negatives() {
408 let counter = AtomicU64::new(100);
409
410 decrement_counter(&counter, 50);
411 assert_eq!(counter.load(ORDERING), 50);
412
413 decrement_counter(&counter, 100);
414 assert_eq!(counter.load(ORDERING), 0);
415
416 decrement_counter(&counter, 50);
417 assert_eq!(counter.load(ORDERING), 0);
418
419 decrement_counter(&counter, u64::MAX);
420 assert_eq!(counter.load(ORDERING), 0);
421 }
422
423 #[test]
424 fn test_increment_counter_prevents_overflow() {
425 let counter = AtomicU64::new(u64::MAX - 2);
426
427 increment_counter(&counter, 1);
428 assert_eq!(counter.load(ORDERING), u64::MAX - 1);
429
430 increment_counter(&counter, 1);
431 assert_eq!(counter.load(ORDERING), u64::MAX);
432
433 increment_counter(&counter, 1);
434 assert_eq!(counter.load(ORDERING), u64::MAX);
435
436 increment_counter(&counter, u64::MAX);
437 assert_eq!(counter.load(ORDERING), u64::MAX);
438 }
439}