1use std::{
2 sync::{
3 Arc,
4 atomic::{AtomicU64, Ordering},
5 },
6 time::Duration,
7};
8
9use tokio::time::interval;
10use tracing::{Instrument, Span};
11use vector_common::internal_event::emit;
12
13use crate::{
14 internal_events::{BufferCreated, BufferEventsDropped, BufferEventsReceived, BufferEventsSent},
15 spawn_named,
16};
17
18const ORDERING: Ordering = Ordering::Relaxed;
21
22#[derive(Clone, Copy, Debug, Default)]
24struct CategorySnapshot {
25 event_count: u64,
26 event_byte_size: u64,
27}
28
29impl CategorySnapshot {
30 fn has_updates(&self) -> bool {
32 self.event_count > 0 || self.event_byte_size > 0
33 }
34}
35
36#[derive(Debug, Default)]
48struct CategoryMetrics {
49 event_count: AtomicU64,
50 event_byte_size: AtomicU64,
51}
52
53impl CategoryMetrics {
54 fn increment(&self, event_count: u64, event_byte_size: u64) {
56 self.event_count.fetch_add(event_count, ORDERING);
57 self.event_byte_size.fetch_add(event_byte_size, ORDERING);
58 }
59
60 fn set(&self, event_count: u64, event_byte_size: u64) {
64 self.event_count.store(event_count, ORDERING);
65 self.event_byte_size.store(event_byte_size, ORDERING);
66 }
67
68 fn get(&self) -> CategorySnapshot {
70 CategorySnapshot {
71 event_count: self.event_count.load(ORDERING),
72 event_byte_size: self.event_byte_size.load(ORDERING),
73 }
74 }
75
76 fn consume(&self) -> CategorySnapshot {
83 CategorySnapshot {
84 event_count: self.event_count.swap(0, ORDERING),
85 event_byte_size: self.event_byte_size.swap(0, ORDERING),
86 }
87 }
88}
89
90#[derive(Clone, Copy, Debug, Default)]
98struct ReporterCurrentMetrics {
99 total_entered: CategorySnapshot,
100 total_left: CategorySnapshot,
101}
102
103impl ReporterCurrentMetrics {
104 fn add_received(&mut self, snapshot: CategorySnapshot) {
105 self.total_entered.event_count = self
106 .total_entered
107 .event_count
108 .saturating_add(snapshot.event_count);
109 self.total_entered.event_byte_size = self
110 .total_entered
111 .event_byte_size
112 .saturating_add(snapshot.event_byte_size);
113 }
114
115 fn add_left(&mut self, snapshot: CategorySnapshot) {
116 self.total_left.event_count = self
117 .total_left
118 .event_count
119 .saturating_add(snapshot.event_count);
120 self.total_left.event_byte_size = self
121 .total_left
122 .event_byte_size
123 .saturating_add(snapshot.event_byte_size);
124 }
125
126 fn current(&self) -> CategorySnapshot {
127 CategorySnapshot {
128 event_count: self
129 .total_entered
130 .event_count
131 .saturating_sub(self.total_left.event_count),
132 event_byte_size: self
133 .total_entered
134 .event_byte_size
135 .saturating_sub(self.total_left.event_byte_size),
136 }
137 }
138}
139
140#[derive(Clone, Debug)]
142pub struct BufferUsageHandle {
143 state: Arc<BufferUsageData>,
144}
145
146impl BufferUsageHandle {
147 pub(crate) fn noop() -> Self {
151 BufferUsageHandle {
152 state: Arc::new(BufferUsageData::new(0)),
153 }
154 }
155
156 pub fn snapshot(&self) -> BufferUsageSnapshot {
158 self.state.snapshot()
159 }
160
161 pub fn set_buffer_limits(&self, max_bytes: Option<u64>, max_events: Option<usize>) {
166 let max_events = max_events
167 .and_then(|n| u64::try_from(n).ok().or(Some(u64::MAX)))
168 .unwrap_or(0);
169 let max_bytes = max_bytes.unwrap_or(0);
170
171 self.state.max_size.set(max_events, max_bytes);
172 }
173
174 pub fn increment_received_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
178 if count > 0 || byte_size > 0 {
179 self.state.received.increment(count, byte_size);
180 }
181 }
182
183 pub fn increment_sent_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
187 if count > 0 || byte_size > 0 {
188 self.state.sent.increment(count, byte_size);
189 }
190 }
191
192 pub fn increment_dropped_event_count_and_byte_size(
194 &self,
195 count: u64,
196 byte_size: u64,
197 intentional: bool,
198 ) {
199 if count > 0 || byte_size > 0 {
200 if intentional {
201 self.state.dropped_intentional.increment(count, byte_size);
202 } else {
203 self.state.dropped.increment(count, byte_size);
204 }
205 }
206 }
207}
208
209#[derive(Debug, Default)]
210struct BufferUsageData {
211 idx: usize,
212 received: CategoryMetrics,
213 sent: CategoryMetrics,
214 dropped: CategoryMetrics,
215 dropped_intentional: CategoryMetrics,
216 max_size: CategoryMetrics,
217}
218
219impl BufferUsageData {
220 fn new(idx: usize) -> Self {
221 Self {
222 idx,
223 ..Default::default()
224 }
225 }
226
227 fn snapshot(&self) -> BufferUsageSnapshot {
228 let received = self.received.get();
229 let sent = self.sent.get();
230 let dropped = self.dropped.get();
231 let dropped_intentional = self.dropped_intentional.get();
232 let max_size = self.max_size.get();
233
234 BufferUsageSnapshot {
235 received_event_count: received.event_count,
236 received_byte_size: received.event_byte_size,
237 sent_event_count: sent.event_count,
238 sent_byte_size: sent.event_byte_size,
239 dropped_event_count: dropped.event_count,
240 dropped_event_byte_size: dropped.event_byte_size,
241 dropped_event_count_intentional: dropped_intentional.event_count,
242 dropped_event_byte_size_intentional: dropped_intentional.event_byte_size,
243 max_size_bytes: max_size.event_byte_size,
244 max_size_events: max_size
245 .event_count
246 .try_into()
247 .expect("should never be bigger than `usize`"),
248 }
249 }
250
251 fn report(&self, current_metrics: &mut ReporterCurrentMetrics, buffer_id: &str) {
252 let max_size = self.max_size.get();
253 emit(BufferCreated {
254 buffer_id: buffer_id.to_string(),
255 idx: self.idx,
256 max_size_bytes: max_size.event_byte_size,
257 max_size_events: max_size
258 .event_count
259 .try_into()
260 .expect("should never be bigger than `usize`"),
261 });
262
263 let received = self.received.consume();
268 current_metrics.add_received(received);
269
270 let sent = self.sent.consume();
271 current_metrics.add_left(sent);
272
273 let dropped = self.dropped.consume();
274 current_metrics.add_left(dropped);
275
276 let dropped_intentional = self.dropped_intentional.consume();
277 current_metrics.add_left(dropped_intentional);
278
279 let current = current_metrics.current();
280
281 if received.has_updates() {
282 emit(BufferEventsReceived {
283 buffer_id: buffer_id.to_string(),
284 idx: self.idx,
285 count: received.event_count,
286 byte_size: received.event_byte_size,
287 total_count: current.event_count,
288 total_byte_size: current.event_byte_size,
289 });
290 }
291
292 if sent.has_updates() {
293 emit(BufferEventsSent {
294 buffer_id: buffer_id.to_string(),
295 idx: self.idx,
296 count: sent.event_count,
297 byte_size: sent.event_byte_size,
298 total_count: current.event_count,
299 total_byte_size: current.event_byte_size,
300 });
301 }
302
303 if dropped.has_updates() {
304 emit(BufferEventsDropped {
305 buffer_id: buffer_id.to_string(),
306 idx: self.idx,
307 intentional: false,
308 reason: "corrupted_events",
309 count: dropped.event_count,
310 byte_size: dropped.event_byte_size,
311 total_count: current.event_count,
312 total_byte_size: current.event_byte_size,
313 });
314 }
315
316 if dropped_intentional.has_updates() {
317 emit(BufferEventsDropped {
318 buffer_id: buffer_id.to_string(),
319 idx: self.idx,
320 intentional: true,
321 reason: "drop_newest",
322 count: dropped_intentional.event_count,
323 byte_size: dropped_intentional.event_byte_size,
324 total_count: current.event_count,
325 total_byte_size: current.event_byte_size,
326 });
327 }
328 }
329}
330
331#[derive(Debug)]
333pub struct BufferUsageSnapshot {
334 pub received_event_count: u64,
335 pub received_byte_size: u64,
336 pub sent_event_count: u64,
337 pub sent_byte_size: u64,
338 pub dropped_event_count: u64,
339 pub dropped_event_byte_size: u64,
340 pub dropped_event_count_intentional: u64,
341 pub dropped_event_byte_size_intentional: u64,
342 pub max_size_bytes: u64,
343 pub max_size_events: usize,
344}
345
346pub struct BufferUsage {
352 span: Span,
353 stages: Vec<Arc<BufferUsageData>>,
354}
355
356impl BufferUsage {
357 pub fn from_span(span: Span) -> BufferUsage {
361 Self {
362 span,
363 stages: Vec::new(),
364 }
365 }
366
367 pub fn add_stage(&mut self, idx: usize) -> BufferUsageHandle {
372 let data = Arc::new(BufferUsageData::new(idx));
373 let handle = BufferUsageHandle {
374 state: Arc::clone(&data),
375 };
376
377 self.stages.push(data);
378 handle
379 }
380
381 pub fn install(self, buffer_id: &str) {
388 let buffer_id = buffer_id.to_string();
389 let span = self.span;
390 let stages: Vec<_> = self
391 .stages
392 .into_iter()
393 .map(|stage| (stage, ReporterCurrentMetrics::default()))
394 .collect();
395 let task_name = format!("buffer usage reporter ({buffer_id})");
396
397 let task = Self::report_buffer_usage(stages, buffer_id).instrument(span.or_current());
398 spawn_named(task, task_name.as_str());
399 }
400
401 async fn report_buffer_usage(
402 mut stages: Vec<(Arc<BufferUsageData>, ReporterCurrentMetrics)>,
403 buffer_id: String,
404 ) {
405 let mut interval = interval(Duration::from_secs(2));
406 loop {
407 interval.tick().await;
408
409 for (stage, current_metrics) in &mut stages {
410 stage.report(current_metrics, &buffer_id);
411 }
412 }
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 #[test]
421 fn reporter_current_usage_is_derived_from_entered_and_left_totals() {
422 let mut current = ReporterCurrentMetrics::default();
423 current.add_received(CategorySnapshot {
424 event_count: 10,
425 event_byte_size: 1000,
426 });
427 current.add_left(CategorySnapshot {
428 event_count: 3,
429 event_byte_size: 300,
430 });
431 current.add_left(CategorySnapshot {
432 event_count: 2,
433 event_byte_size: 200,
434 });
435
436 let current = current.current();
437 assert_eq!(current.event_count, 5);
438 assert_eq!(current.event_byte_size, 500);
439 }
440
441 #[test]
442 fn reporter_current_usage_preserves_underflow_debt() {
443 let mut current = ReporterCurrentMetrics::default();
444 current.add_left(CategorySnapshot {
445 event_count: 10,
446 event_byte_size: 1000,
447 });
448 current.add_received(CategorySnapshot {
449 event_count: 15,
450 event_byte_size: 1500,
451 });
452
453 let current = current.current();
454 assert_eq!(current.event_count, 5);
455 assert_eq!(current.event_byte_size, 500);
456 }
457
458 #[test]
459 fn consume_resets_deltas_between_ticks() {
460 let data = BufferUsageData::new(0);
461 let mut metrics = ReporterCurrentMetrics::default();
462
463 data.received.increment(10, 1000);
464 data.sent.increment(3, 300);
465 data.report(&mut metrics, "test");
466 let current = metrics.current();
467 assert_eq!(current.event_count, 7);
468 assert_eq!(current.event_byte_size, 700);
469
470 data.report(&mut metrics, "test");
472 let current = metrics.current();
473 assert_eq!(current.event_count, 7);
474 assert_eq!(current.event_byte_size, 700);
475 }
476
477 #[test]
478 fn accumulates_across_multiple_ticks() {
479 let data = BufferUsageData::new(0);
480 let mut metrics = ReporterCurrentMetrics::default();
481
482 data.received.increment(10, 1000);
483 data.report(&mut metrics, "test");
484
485 data.received.increment(5, 500);
486 data.sent.increment(8, 800);
487 data.report(&mut metrics, "test");
488 let current = metrics.current();
489 assert_eq!(current.event_count, 7);
490 assert_eq!(current.event_byte_size, 700);
491 }
492
493 #[test]
494 fn drops_count_as_leaving_the_buffer() {
495 let data = BufferUsageData::new(0);
496 let mut metrics = ReporterCurrentMetrics::default();
497
498 data.received.increment(20, 2000);
499 data.sent.increment(5, 500);
500 data.dropped.increment(3, 300);
501 data.dropped_intentional.increment(2, 200);
502
503 data.report(&mut metrics, "test");
504 let current = metrics.current();
505 assert_eq!(current.event_count, 10);
506 assert_eq!(current.event_byte_size, 1000);
507 }
508}