1use std::{
2 collections::HashMap,
3 pin::Pin,
4 sync::{Arc, Mutex},
5 task::{Context, Poll, ready},
6 time::Duration,
7};
8
9#[cfg(not(test))]
10use std::time::Instant;
11
12#[cfg(test)]
13use mock_instant::global::Instant;
14
15use futures::{Stream, StreamExt};
16use metrics::Gauge;
17use pin_project::pin_project;
18use tokio::{
19 sync::mpsc::{Receiver, Sender, channel},
20 time::interval,
21};
22use tokio_stream::wrappers::IntervalStream;
23use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal};
24
25use crate::stats;
26
27const UTILIZATION_EMITTER_DURATION: Duration = Duration::from_secs(5);
28
29#[pin_project]
30pub(crate) struct Utilization<S> {
31 intervals: IntervalStream,
32 timer_tx: UtilizationComponentSender,
33 component_key: ComponentKey,
34 inner: S,
35}
36
37impl<S> Utilization<S> {
38 #[allow(clippy::missing_const_for_fn)]
43 pub(crate) fn into_inner(self) -> S {
44 self.inner
45 }
46}
47
48impl<S> Stream for Utilization<S>
49where
50 S: Stream + Unpin,
51{
52 type Item = S::Item;
53
54 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55 let this = self.project();
63 this.timer_tx.try_send_start_wait();
64 let _ = this.intervals.poll_next_unpin(cx);
65 let result = ready!(this.inner.poll_next_unpin(cx));
66 this.timer_tx.try_send_stop_wait();
67 Poll::Ready(result)
68 }
69}
70
71pub(crate) struct Timer {
72 overall_start: Instant,
73 span_start: Instant,
74 waiting: bool,
75 total_wait: Duration,
76 ewma: stats::Ewma,
77 gauge: Gauge,
78 #[cfg(debug_assertions)]
79 component_id: Arc<str>,
80}
81
82impl Timer {
91 pub(crate) fn new(gauge: Gauge, #[cfg(debug_assertions)] component_id: Arc<str>) -> Self {
92 Self {
93 overall_start: Instant::now(),
94 span_start: Instant::now(),
95 waiting: false,
96 total_wait: Duration::new(0, 0),
97 ewma: stats::Ewma::new(0.9),
98 gauge,
99 #[cfg(debug_assertions)]
100 component_id,
101 }
102 }
103
104 pub(crate) fn start_wait(&mut self, at: Instant) {
106 if !self.waiting {
107 self.end_span(at.max(self.overall_start));
109 self.waiting = true;
110 }
111 }
112
113 pub(crate) fn stop_wait(&mut self, at: Instant) {
115 if self.waiting {
116 self.end_span(at.max(self.overall_start));
118 self.waiting = false;
119 }
120 }
121
122 pub(crate) fn update_utilization(&mut self) {
126 let now = Instant::now();
130 self.end_span(now);
131
132 let total_duration = now.duration_since(self.overall_start);
133 let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
134 let utilization = 1.0 - wait_ratio;
135
136 self.ewma.update(utilization);
137 let avg = self.ewma.average().unwrap_or(f64::NAN);
138 let avg_rounded = (avg * 10000.0).round() / 10000.0; self.gauge.set(avg_rounded);
140
141 self.overall_start = now;
143 self.total_wait = Duration::new(0, 0);
144
145 #[cfg(debug_assertions)]
146 debug!(component_id = %self.component_id, utilization = %avg_rounded, internal_log_rate_limit = false);
147 }
148
149 fn end_span(&mut self, at: Instant) {
150 if self.waiting {
151 self.total_wait += at.duration_since(self.span_start);
154 }
155 self.span_start = at;
156 }
157}
158
159#[derive(Debug)]
160enum UtilizationTimerMessage {
161 StartWait(ComponentKey, Instant),
162 StopWait(ComponentKey, Instant),
163}
164
165pub(crate) struct UtilizationComponentSender {
166 component_key: ComponentKey,
167 timer_tx: Sender<UtilizationTimerMessage>,
168}
169
170impl UtilizationComponentSender {
171 pub(crate) fn try_send_start_wait(&self) {
172 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
173 self.component_key.clone(),
174 Instant::now(),
175 )) {
176 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
177 }
178 }
179
180 pub(crate) fn try_send_stop_wait(&self) {
181 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
182 self.component_key.clone(),
183 Instant::now(),
184 )) {
185 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
186 }
187 }
188}
189
190#[derive(Clone)]
194pub struct UtilizationRegistry {
195 timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
196 timer_tx: Sender<UtilizationTimerMessage>,
197}
198
199impl UtilizationRegistry {
200 pub(crate) fn add_component(
204 &self,
205 key: ComponentKey,
206 gauge: Gauge,
207 ) -> UtilizationComponentSender {
208 self.timers.lock().expect("mutex poisoned").insert(
209 key.clone(),
210 Timer::new(
211 gauge,
212 #[cfg(debug_assertions)]
213 key.id().into(),
214 ),
215 );
216 UtilizationComponentSender {
217 timer_tx: self.timer_tx.clone(),
218 component_key: key,
219 }
220 }
221
222 pub(crate) fn remove_component(&self, key: &ComponentKey) {
224 self.timers.lock().expect("mutex poisoned").remove(key);
225 }
226}
227
228pub(crate) struct UtilizationEmitter {
229 timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
230 timer_rx: Receiver<UtilizationTimerMessage>,
231}
232
233impl UtilizationEmitter {
234 pub(crate) fn new() -> (Self, UtilizationRegistry) {
235 let (timer_tx, timer_rx) = channel(4096);
236 let timers = Arc::new(Mutex::new(HashMap::default()));
237 (
238 Self {
239 timers: Arc::clone(&timers),
240 timer_rx,
241 },
242 UtilizationRegistry { timers, timer_tx },
243 )
244 }
245
246 pub(crate) async fn run_utilization(mut self, mut shutdown: ShutdownSignal) {
247 let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
248 loop {
249 tokio::select! {
250 message = self.timer_rx.recv() => {
251 match message {
252 Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
253 if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
255 timer.start_wait(start_time);
256 }
257 }
258 Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
259 if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
261 timer.stop_wait(stop_time);
262 }
263 }
264 None => break,
265 }
266 },
267
268 Some(_) = intervals.next() => {
269 for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
270 timer.update_utilization();
271 }
272 },
273
274 _ = &mut shutdown => {
275 break
276 }
277 }
278 }
279 }
280}
281
282pub(crate) fn wrap<S>(
290 timer_tx: UtilizationComponentSender,
291 component_key: ComponentKey,
292 inner: S,
293) -> Utilization<S> {
294 Utilization {
295 intervals: IntervalStream::new(interval(Duration::from_secs(5))),
296 timer_tx,
297 component_key,
298 inner,
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use mock_instant::global::MockClock;
305 use serial_test::serial;
306
307 use super::*;
308
309 fn setup_timer() -> Timer {
311 MockClock::set_time(Duration::from_secs(100));
313
314 Timer::new(
315 metrics::gauge!("test_utilization"),
316 #[cfg(debug_assertions)]
317 "test_component".into(),
318 )
319 }
320
321 const TOLERANCE: f64 = 0.01;
322
323 fn assert_approx_eq(actual: f64, expected: f64, description: &str) {
326 assert!(
327 (0.0..=1.0).contains(&actual),
328 "Utilization {actual} is outside [0, 1]"
329 );
330 assert!(
331 (actual - expected).abs() < TOLERANCE,
332 "Expected utilization {description}, got {actual}"
333 );
334 }
335
336 #[test]
337 #[serial]
338 fn test_utilization_in_bounds_on_late_start() {
339 let mut timer = setup_timer();
340
341 MockClock::advance(Duration::from_secs(5));
342
343 timer.update_utilization();
344
345 let avg = timer.ewma.average().unwrap();
346 assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
347
348 timer.start_wait(Instant::now() - Duration::from_secs(1));
350 MockClock::advance(Duration::from_secs(5));
351
352 timer.update_utilization();
353 let avg = timer.ewma.average().unwrap();
354 assert_approx_eq(avg, 0.1, "~0.1");
355 }
356
357 #[test]
358 #[serial]
359 fn test_utilization_in_bounds_on_late_stop() {
360 let mut timer = setup_timer();
361
362 MockClock::advance(Duration::from_secs(5));
363
364 timer.waiting = true;
365 timer.update_utilization();
366
367 let avg = timer.ewma.average().unwrap();
368 assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
369
370 timer.stop_wait(Instant::now() - Duration::from_secs(4));
372 MockClock::advance(Duration::from_secs(5));
373
374 timer.update_utilization();
375 let avg = timer.ewma.average().unwrap();
376 assert_approx_eq(avg, 0.9, "~0.9");
377 }
378
379 #[test]
380 #[serial]
381 fn test_normal_utilization_within_bounds() {
382 let mut timer = setup_timer();
383
384 MockClock::advance(Duration::from_secs(1));
386 timer.start_wait(Instant::now());
387
388 MockClock::advance(Duration::from_secs(2));
390 timer.stop_wait(Instant::now());
391
392 MockClock::advance(Duration::from_secs(2));
394 timer.update_utilization();
395
396 let avg = timer.ewma.average().unwrap();
399 assert_approx_eq(avg, 0.6, "~0.6");
400 }
401
402 #[test]
403 #[serial]
404 fn test_always_waiting_utilization() {
405 let mut timer = setup_timer();
406
407 timer.start_wait(Instant::now());
409
410 MockClock::advance(Duration::from_secs(5));
412 timer.update_utilization();
413
414 let avg = timer.ewma.average().unwrap();
417 assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
418 }
419
420 #[test]
421 #[serial]
422 fn test_never_waiting_utilization() {
423 let mut timer = setup_timer();
424
425 MockClock::advance(Duration::from_secs(5));
427 timer.update_utilization();
428
429 let avg = timer.ewma.average().unwrap();
432 assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
433 }
434}