1use std::{
2 sync::{
3 atomic::{AtomicU64, Ordering},
4 Arc,
5 },
6 time::Duration,
7};
8
9use tokio::time::interval;
10use tracing::{Instrument, Span};
11use vector_common::internal_event::emit;
12
13use crate::{
14 internal_events::{BufferCreated, BufferEventsDropped, BufferEventsReceived, BufferEventsSent},
15 spawn_named,
16};
17
18struct CategorySnapshot {
20 event_count: u64,
21 event_byte_size: u64,
22}
23
24impl CategorySnapshot {
25 fn has_updates(&self) -> bool {
27 self.event_count > 0 || self.event_byte_size > 0
28 }
29}
30
31#[derive(Debug, Default)]
38struct CategoryMetrics {
39 event_count: AtomicU64,
40 event_byte_size: AtomicU64,
41}
42
43impl CategoryMetrics {
44 fn increment(&self, event_count: u64, event_byte_size: u64) {
46 self.event_count.fetch_add(event_count, Ordering::Relaxed);
47 self.event_byte_size
48 .fetch_add(event_byte_size, Ordering::Relaxed);
49 }
50
51 fn set(&self, event_count: u64, event_byte_size: u64) {
55 self.event_count.store(event_count, Ordering::Release);
56 self.event_byte_size
57 .store(event_byte_size, Ordering::Release);
58 }
59
60 fn get(&self) -> CategorySnapshot {
62 CategorySnapshot {
63 event_count: self.event_count.load(Ordering::Acquire),
64 event_byte_size: self.event_byte_size.load(Ordering::Acquire),
65 }
66 }
67
68 fn consume(&self) -> CategorySnapshot {
75 CategorySnapshot {
76 event_count: self.event_count.swap(0, Ordering::AcqRel),
77 event_byte_size: self.event_byte_size.swap(0, Ordering::AcqRel),
78 }
79 }
80}
81
82#[derive(Clone, Debug)]
84pub struct BufferUsageHandle {
85 state: Arc<BufferUsageData>,
86}
87
88impl BufferUsageHandle {
89 pub(crate) fn noop() -> Self {
93 BufferUsageHandle {
94 state: Arc::new(BufferUsageData::new(0)),
95 }
96 }
97
98 pub fn snapshot(&self) -> BufferUsageSnapshot {
100 self.state.snapshot()
101 }
102
103 pub fn set_buffer_limits(&self, max_bytes: Option<u64>, max_events: Option<usize>) {
108 let max_events = max_events
109 .and_then(|n| u64::try_from(n).ok().or(Some(u64::MAX)))
110 .unwrap_or(0);
111 let max_bytes = max_bytes.unwrap_or(0);
112
113 self.state.max_size.set(max_events, max_bytes);
114 }
115
116 pub fn increment_received_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
120 self.state.received.increment(count, byte_size);
121 }
122
123 pub fn increment_sent_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
127 self.state.sent.increment(count, byte_size);
128 }
129
130 pub fn increment_dropped_event_count_and_byte_size(
132 &self,
133 count: u64,
134 byte_size: u64,
135 intentional: bool,
136 ) {
137 if intentional {
138 self.state.dropped_intentional.increment(count, byte_size);
139 } else {
140 self.state.dropped.increment(count, byte_size);
141 }
142 }
143}
144
145#[derive(Debug, Default)]
146struct BufferUsageData {
147 idx: usize,
148 received: CategoryMetrics,
149 sent: CategoryMetrics,
150 dropped: CategoryMetrics,
151 dropped_intentional: CategoryMetrics,
152 max_size: CategoryMetrics,
153}
154
155impl BufferUsageData {
156 fn new(idx: usize) -> Self {
157 Self {
158 idx,
159 ..Default::default()
160 }
161 }
162
163 fn snapshot(&self) -> BufferUsageSnapshot {
164 let received = self.received.get();
165 let sent = self.sent.get();
166 let dropped = self.dropped.get();
167 let dropped_intentional = self.dropped_intentional.get();
168 let max_size = self.max_size.get();
169
170 BufferUsageSnapshot {
171 received_event_count: received.event_count,
172 received_byte_size: received.event_byte_size,
173 sent_event_count: sent.event_count,
174 sent_byte_size: sent.event_byte_size,
175 dropped_event_count: dropped.event_count,
176 dropped_event_byte_size: dropped.event_byte_size,
177 dropped_event_count_intentional: dropped_intentional.event_count,
178 dropped_event_byte_size_intentional: dropped_intentional.event_byte_size,
179 max_size_bytes: max_size.event_byte_size,
180 max_size_events: max_size
181 .event_count
182 .try_into()
183 .expect("should never be bigger than `usize`"),
184 }
185 }
186}
187
188#[derive(Debug)]
190pub struct BufferUsageSnapshot {
191 pub received_event_count: u64,
192 pub received_byte_size: u64,
193 pub sent_event_count: u64,
194 pub sent_byte_size: u64,
195 pub dropped_event_count: u64,
196 pub dropped_event_byte_size: u64,
197 pub dropped_event_count_intentional: u64,
198 pub dropped_event_byte_size_intentional: u64,
199 pub max_size_bytes: u64,
200 pub max_size_events: usize,
201}
202
203pub struct BufferUsage {
209 span: Span,
210 stages: Vec<Arc<BufferUsageData>>,
211}
212
213impl BufferUsage {
214 pub fn from_span(span: Span) -> BufferUsage {
218 Self {
219 span,
220 stages: Vec::new(),
221 }
222 }
223
224 pub fn add_stage(&mut self, idx: usize) -> BufferUsageHandle {
229 let data = Arc::new(BufferUsageData::new(idx));
230 let handle = BufferUsageHandle {
231 state: Arc::clone(&data),
232 };
233
234 self.stages.push(data);
235 handle
236 }
237
238 pub fn install(self, buffer_id: &str) {
245 let buffer_id = buffer_id.to_string();
246 let span = self.span;
247 let stages = self.stages;
248 let task_name = format!("buffer usage reporter ({buffer_id})");
249
250 let task = async move {
251 let mut interval = interval(Duration::from_secs(2));
252 loop {
253 interval.tick().await;
254
255 for stage in &stages {
256 let max_size = stage.max_size.get();
257 emit(BufferCreated {
258 idx: stage.idx,
259 max_size_bytes: max_size.event_byte_size,
260 max_size_events: max_size
261 .event_count
262 .try_into()
263 .expect("should never be bigger than `usize`"),
264 });
265
266 let received = stage.received.consume();
267 if received.has_updates() {
268 emit(BufferEventsReceived {
269 buffer_id: buffer_id.clone(),
270 idx: stage.idx,
271 count: received.event_count,
272 byte_size: received.event_byte_size,
273 });
274 }
275
276 let sent = stage.sent.consume();
277 if sent.has_updates() {
278 emit(BufferEventsSent {
279 buffer_id: buffer_id.clone(),
280 idx: stage.idx,
281 count: sent.event_count,
282 byte_size: sent.event_byte_size,
283 });
284 }
285
286 let dropped = stage.dropped.consume();
287 if dropped.has_updates() {
288 emit(BufferEventsDropped {
289 buffer_id: buffer_id.clone(),
290 idx: stage.idx,
291 intentional: false,
292 reason: "corrupted_events",
293 count: dropped.event_count,
294 byte_size: dropped.event_byte_size,
295 });
296 }
297
298 let dropped_intentional = stage.dropped_intentional.consume();
299 if dropped_intentional.has_updates() {
300 emit(BufferEventsDropped {
301 buffer_id: buffer_id.clone(),
302 idx: stage.idx,
303 intentional: true,
304 reason: "drop_newest",
305 count: dropped_intentional.event_count,
306 byte_size: dropped_intentional.event_byte_size,
307 });
308 }
309 }
310 }
311 };
312
313 spawn_named(task.instrument(span.or_current()), task_name.as_str());
314 }
315}