1use std::{
2 collections::HashMap,
3 pin::Pin,
4 sync::{Arc, Mutex},
5 task::{Context, Poll, ready},
6 time::Duration,
7};
8
9#[cfg(not(test))]
10use std::time::Instant;
11
12#[cfg(test)]
13use mock_instant::global::Instant;
14
15use futures::{Stream, StreamExt};
16use metrics::Gauge;
17use pin_project::pin_project;
18use tokio::{
19 sync::mpsc::{Receiver, Sender, channel},
20 time::interval,
21};
22use tokio_stream::wrappers::IntervalStream;
23use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal};
24
25use crate::stats;
26
27const UTILIZATION_EMITTER_DURATION: Duration = Duration::from_secs(5);
28
29#[pin_project]
30pub(crate) struct Utilization<S> {
31 intervals: IntervalStream,
32 timer_tx: UtilizationComponentSender,
33 component_key: ComponentKey,
34 inner: S,
35}
36
37impl<S> Utilization<S> {
38 #[allow(clippy::missing_const_for_fn)]
43 pub(crate) fn into_inner(self) -> S {
44 self.inner
45 }
46}
47
48impl<S> Stream for Utilization<S>
49where
50 S: Stream + Unpin,
51{
52 type Item = S::Item;
53
54 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55 let this = self.project();
63 this.timer_tx.try_send_start_wait();
64 let _ = this.intervals.poll_next_unpin(cx);
65 let result = ready!(this.inner.poll_next_unpin(cx));
66 this.timer_tx.try_send_stop_wait();
67 Poll::Ready(result)
68 }
69}
70
71pub(crate) struct Timer {
72 overall_start: Instant,
73 span_start: Instant,
74 waiting: bool,
75 total_wait: Duration,
76 ewma: stats::Ewma,
77 gauge: Gauge,
78 #[cfg(debug_assertions)]
79 report_count: u32,
80 #[cfg(debug_assertions)]
81 component_id: Arc<str>,
82}
83
84impl Timer {
93 pub(crate) fn new(gauge: Gauge, #[cfg(debug_assertions)] component_id: Arc<str>) -> Self {
94 Self {
95 overall_start: Instant::now(),
96 span_start: Instant::now(),
97 waiting: false,
98 total_wait: Duration::new(0, 0),
99 ewma: stats::Ewma::new(0.9),
100 gauge,
101 #[cfg(debug_assertions)]
102 report_count: 0,
103 #[cfg(debug_assertions)]
104 component_id,
105 }
106 }
107
108 pub(crate) fn start_wait(&mut self, at: Instant) {
110 if !self.waiting {
111 self.end_span(at.max(self.overall_start));
113 self.waiting = true;
114 }
115 }
116
117 pub(crate) fn stop_wait(&mut self, at: Instant) {
119 if self.waiting {
120 self.end_span(at.max(self.overall_start));
122 self.waiting = false;
123 }
124 }
125
126 pub(crate) fn update_utilization(&mut self) {
130 let now = Instant::now();
134 self.end_span(now);
135
136 let total_duration = now.duration_since(self.overall_start);
137 let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
138 let utilization = 1.0 - wait_ratio;
139
140 self.ewma.update(utilization);
141 let avg = self.ewma.average().unwrap_or(f64::NAN);
142 let avg_rounded = (avg * 10000.0).round() / 10000.0; self.gauge.set(avg_rounded);
144
145 self.overall_start = now;
147 self.total_wait = Duration::new(0, 0);
148
149 #[cfg(debug_assertions)]
150 self.report(avg_rounded);
151 }
152
153 #[cfg(debug_assertions)]
154 fn report(&mut self, utilization: f64) {
155 if self.report_count.is_multiple_of(5) {
158 debug!(component_id = %self.component_id, %utilization);
159 }
160 self.report_count = self.report_count.wrapping_add(1);
161 }
162
163 fn end_span(&mut self, at: Instant) {
164 if self.waiting {
165 self.total_wait += at.duration_since(self.span_start);
168 }
169 self.span_start = at;
170 }
171}
172
173#[derive(Debug)]
174enum UtilizationTimerMessage {
175 StartWait(ComponentKey, Instant),
176 StopWait(ComponentKey, Instant),
177}
178
179pub(crate) struct UtilizationComponentSender {
180 component_key: ComponentKey,
181 timer_tx: Sender<UtilizationTimerMessage>,
182}
183
184impl UtilizationComponentSender {
185 pub(crate) fn try_send_start_wait(&self) {
186 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
187 self.component_key.clone(),
188 Instant::now(),
189 )) {
190 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
191 }
192 }
193
194 pub(crate) fn try_send_stop_wait(&self) {
195 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
196 self.component_key.clone(),
197 Instant::now(),
198 )) {
199 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
200 }
201 }
202}
203
204#[derive(Clone)]
208pub struct UtilizationRegistry {
209 timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
210 timer_tx: Sender<UtilizationTimerMessage>,
211}
212
213impl UtilizationRegistry {
214 pub(crate) fn add_component(
218 &self,
219 key: ComponentKey,
220 gauge: Gauge,
221 ) -> UtilizationComponentSender {
222 self.timers.lock().expect("mutex poisoned").insert(
223 key.clone(),
224 Timer::new(
225 gauge,
226 #[cfg(debug_assertions)]
227 key.id().into(),
228 ),
229 );
230 UtilizationComponentSender {
231 timer_tx: self.timer_tx.clone(),
232 component_key: key,
233 }
234 }
235
236 pub(crate) fn remove_component(&self, key: &ComponentKey) {
238 self.timers.lock().expect("mutex poisoned").remove(key);
239 }
240}
241
242pub(crate) struct UtilizationEmitter {
243 timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
244 timer_rx: Receiver<UtilizationTimerMessage>,
245}
246
247impl UtilizationEmitter {
248 pub(crate) fn new() -> (Self, UtilizationRegistry) {
249 let (timer_tx, timer_rx) = channel(4096);
250 let timers = Arc::new(Mutex::new(HashMap::default()));
251 (
252 Self {
253 timers: Arc::clone(&timers),
254 timer_rx,
255 },
256 UtilizationRegistry { timers, timer_tx },
257 )
258 }
259
260 pub(crate) async fn run_utilization(mut self, mut shutdown: ShutdownSignal) {
261 let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
262 loop {
263 tokio::select! {
264 message = self.timer_rx.recv() => {
265 match message {
266 Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
267 if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
269 timer.start_wait(start_time);
270 }
271 }
272 Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
273 if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
275 timer.stop_wait(stop_time);
276 }
277 }
278 None => break,
279 }
280 },
281
282 Some(_) = intervals.next() => {
283 for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
284 timer.update_utilization();
285 }
286 },
287
288 _ = &mut shutdown => {
289 break
290 }
291 }
292 }
293 }
294}
295
296pub(crate) fn wrap<S>(
304 timer_tx: UtilizationComponentSender,
305 component_key: ComponentKey,
306 inner: S,
307) -> Utilization<S> {
308 Utilization {
309 intervals: IntervalStream::new(interval(Duration::from_secs(5))),
310 timer_tx,
311 component_key,
312 inner,
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use mock_instant::global::MockClock;
319 use serial_test::serial;
320
321 use super::*;
322
323 fn setup_timer() -> Timer {
325 MockClock::set_time(Duration::from_secs(100));
327
328 Timer::new(
329 metrics::gauge!("test_utilization"),
330 #[cfg(debug_assertions)]
331 "test_component".into(),
332 )
333 }
334
335 const TOLERANCE: f64 = 0.01;
336
337 fn assert_approx_eq(actual: f64, expected: f64, description: &str) {
340 assert!(
341 (0.0..=1.0).contains(&actual),
342 "Utilization {actual} is outside [0, 1]"
343 );
344 assert!(
345 (actual - expected).abs() < TOLERANCE,
346 "Expected utilization {description}, got {actual}"
347 );
348 }
349
350 #[test]
351 #[serial]
352 fn test_utilization_in_bounds_on_late_start() {
353 let mut timer = setup_timer();
354
355 MockClock::advance(Duration::from_secs(5));
356
357 timer.update_utilization();
358
359 let avg = timer.ewma.average().unwrap();
360 assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
361
362 timer.start_wait(Instant::now() - Duration::from_secs(1));
364 MockClock::advance(Duration::from_secs(5));
365
366 timer.update_utilization();
367 let avg = timer.ewma.average().unwrap();
368 assert_approx_eq(avg, 0.1, "~0.1");
369 }
370
371 #[test]
372 #[serial]
373 fn test_utilization_in_bounds_on_late_stop() {
374 let mut timer = setup_timer();
375
376 MockClock::advance(Duration::from_secs(5));
377
378 timer.waiting = true;
379 timer.update_utilization();
380
381 let avg = timer.ewma.average().unwrap();
382 assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
383
384 timer.stop_wait(Instant::now() - Duration::from_secs(4));
386 MockClock::advance(Duration::from_secs(5));
387
388 timer.update_utilization();
389 let avg = timer.ewma.average().unwrap();
390 assert_approx_eq(avg, 0.9, "~0.9");
391 }
392
393 #[test]
394 #[serial]
395 fn test_normal_utilization_within_bounds() {
396 let mut timer = setup_timer();
397
398 MockClock::advance(Duration::from_secs(1));
400 timer.start_wait(Instant::now());
401
402 MockClock::advance(Duration::from_secs(2));
404 timer.stop_wait(Instant::now());
405
406 MockClock::advance(Duration::from_secs(2));
408 timer.update_utilization();
409
410 let avg = timer.ewma.average().unwrap();
413 assert_approx_eq(avg, 0.6, "~0.6");
414 }
415
416 #[test]
417 #[serial]
418 fn test_always_waiting_utilization() {
419 let mut timer = setup_timer();
420
421 timer.start_wait(Instant::now());
423
424 MockClock::advance(Duration::from_secs(5));
426 timer.update_utilization();
427
428 let avg = timer.ewma.average().unwrap();
431 assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
432 }
433
434 #[test]
435 #[serial]
436 fn test_never_waiting_utilization() {
437 let mut timer = setup_timer();
438
439 MockClock::advance(Duration::from_secs(5));
441 timer.update_utilization();
442
443 let avg = timer.ewma.average().unwrap();
446 assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
447 }
448}