1use std::{
2 collections::HashMap,
3 pin::Pin,
4 sync::{Arc, Mutex},
5 task::{Context, Poll, ready},
6 time::Duration,
7};
8
9#[cfg(not(test))]
10use std::time::Instant;
11
12#[cfg(test)]
13use mock_instant::global::Instant;
14
15use futures::{Stream, StreamExt};
16use metrics::Gauge;
17use pin_project::pin_project;
18use tokio::{
19 sync::mpsc::{Receiver, Sender, channel},
20 time::interval,
21};
22use tokio_stream::wrappers::IntervalStream;
23use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal, stats};
24
25const UTILIZATION_EMITTER_DURATION: Duration = Duration::from_secs(5);
26
27#[pin_project]
28pub(crate) struct Utilization<S> {
29 intervals: IntervalStream,
30 timer_tx: UtilizationComponentSender,
31 component_key: ComponentKey,
32 inner: S,
33}
34
35impl<S> Utilization<S> {
36 #[allow(clippy::missing_const_for_fn)]
41 pub(crate) fn into_inner(self) -> S {
42 self.inner
43 }
44}
45
46impl<S> Stream for Utilization<S>
47where
48 S: Stream + Unpin,
49{
50 type Item = S::Item;
51
52 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53 let this = self.project();
61 this.timer_tx.try_send_start_wait();
62 let _ = this.intervals.poll_next_unpin(cx);
63 let result = ready!(this.inner.poll_next_unpin(cx));
64 this.timer_tx.try_send_stop_wait();
65 Poll::Ready(result)
66 }
67}
68
69pub(crate) struct Timer {
70 overall_start: Instant,
71 span_start: Instant,
72 waiting: bool,
73 total_wait: Duration,
74 ewma: stats::Ewma,
75 gauge: Gauge,
76 #[cfg(debug_assertions)]
77 component_id: Arc<str>,
78}
79
80impl Timer {
89 pub(crate) fn new(gauge: Gauge, #[cfg(debug_assertions)] component_id: Arc<str>) -> Self {
90 Self {
91 overall_start: Instant::now(),
92 span_start: Instant::now(),
93 waiting: false,
94 total_wait: Duration::new(0, 0),
95 ewma: stats::Ewma::new(0.9),
96 gauge,
97 #[cfg(debug_assertions)]
98 component_id,
99 }
100 }
101
102 pub(crate) fn start_wait(&mut self, at: Instant) {
104 if !self.waiting {
105 self.end_span(at.max(self.overall_start));
107 self.waiting = true;
108 }
109 }
110
111 pub(crate) fn stop_wait(&mut self, at: Instant) {
113 if self.waiting {
114 self.end_span(at.max(self.overall_start));
116 self.waiting = false;
117 }
118 }
119
120 pub(crate) fn update_utilization(&mut self) {
124 let now = Instant::now();
128 self.end_span(now);
129
130 let total_duration = now.duration_since(self.overall_start);
131 let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
132 let utilization = 1.0 - wait_ratio;
133
134 self.ewma.update(utilization);
135 let avg = self.ewma.average().unwrap_or(f64::NAN);
136 let avg_rounded = (avg * 10000.0).round() / 10000.0; self.gauge.set(avg_rounded);
138
139 self.overall_start = now;
141 self.total_wait = Duration::new(0, 0);
142
143 #[cfg(debug_assertions)]
144 debug!(component_id = %self.component_id, utilization = %avg_rounded, internal_log_rate_limit = false);
145 }
146
147 fn end_span(&mut self, at: Instant) {
148 if self.waiting {
149 self.total_wait += at.duration_since(self.span_start);
152 }
153 self.span_start = at;
154 }
155}
156
157#[derive(Debug)]
158enum UtilizationTimerMessage {
159 StartWait(ComponentKey, Instant),
160 StopWait(ComponentKey, Instant),
161}
162
163pub(crate) struct UtilizationComponentSender {
164 component_key: ComponentKey,
165 timer_tx: Sender<UtilizationTimerMessage>,
166}
167
168impl UtilizationComponentSender {
169 pub(crate) fn try_send_start_wait(&self) {
170 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
171 self.component_key.clone(),
172 Instant::now(),
173 )) {
174 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
175 }
176 }
177
178 pub(crate) fn try_send_stop_wait(&self) {
179 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
180 self.component_key.clone(),
181 Instant::now(),
182 )) {
183 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
184 }
185 }
186}
187
188#[derive(Clone)]
192pub struct UtilizationRegistry {
193 timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
194 timer_tx: Sender<UtilizationTimerMessage>,
195}
196
197impl UtilizationRegistry {
198 pub(crate) fn add_component(
202 &self,
203 key: ComponentKey,
204 gauge: Gauge,
205 ) -> UtilizationComponentSender {
206 self.timers.lock().expect("mutex poisoned").insert(
207 key.clone(),
208 Timer::new(
209 gauge,
210 #[cfg(debug_assertions)]
211 key.id().into(),
212 ),
213 );
214 UtilizationComponentSender {
215 timer_tx: self.timer_tx.clone(),
216 component_key: key,
217 }
218 }
219
220 pub(crate) fn remove_component(&self, key: &ComponentKey) {
222 self.timers.lock().expect("mutex poisoned").remove(key);
223 }
224}
225
226pub(crate) struct UtilizationEmitter {
227 timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
228 timer_rx: Receiver<UtilizationTimerMessage>,
229}
230
231impl UtilizationEmitter {
232 pub(crate) fn new() -> (Self, UtilizationRegistry) {
233 let (timer_tx, timer_rx) = channel(4096);
234 let timers = Arc::new(Mutex::new(HashMap::default()));
235 (
236 Self {
237 timers: Arc::clone(&timers),
238 timer_rx,
239 },
240 UtilizationRegistry { timers, timer_tx },
241 )
242 }
243
244 pub(crate) async fn run_utilization(mut self, mut shutdown: ShutdownSignal) {
245 let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
246 loop {
247 tokio::select! {
248 message = self.timer_rx.recv() => {
249 match message {
250 Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
251 if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
253 timer.start_wait(start_time);
254 }
255 }
256 Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
257 if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
259 timer.stop_wait(stop_time);
260 }
261 }
262 None => break,
263 }
264 },
265
266 Some(_) = intervals.next() => {
267 for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
268 timer.update_utilization();
269 }
270 },
271
272 _ = &mut shutdown => {
273 break
274 }
275 }
276 }
277 }
278}
279
280pub(crate) fn wrap<S>(
288 timer_tx: UtilizationComponentSender,
289 component_key: ComponentKey,
290 inner: S,
291) -> Utilization<S> {
292 Utilization {
293 intervals: IntervalStream::new(interval(Duration::from_secs(5))),
294 timer_tx,
295 component_key,
296 inner,
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use mock_instant::global::MockClock;
303 use serial_test::serial;
304
305 use super::*;
306
307 fn setup_timer() -> Timer {
309 MockClock::set_time(Duration::from_secs(100));
311
312 Timer::new(
313 metrics::gauge!("test_utilization"),
314 #[cfg(debug_assertions)]
315 "test_component".into(),
316 )
317 }
318
319 const TOLERANCE: f64 = 0.01;
320
321 fn assert_approx_eq(actual: f64, expected: f64, description: &str) {
324 assert!(
325 (0.0..=1.0).contains(&actual),
326 "Utilization {actual} is outside [0, 1]"
327 );
328 assert!(
329 (actual - expected).abs() < TOLERANCE,
330 "Expected utilization {description}, got {actual}"
331 );
332 }
333
334 #[test]
335 #[serial]
336 fn test_utilization_in_bounds_on_late_start() {
337 let mut timer = setup_timer();
338
339 MockClock::advance(Duration::from_secs(5));
340
341 timer.update_utilization();
342
343 let avg = timer.ewma.average().unwrap();
344 assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
345
346 timer.start_wait(Instant::now() - Duration::from_secs(1));
348 MockClock::advance(Duration::from_secs(5));
349
350 timer.update_utilization();
351 let avg = timer.ewma.average().unwrap();
352 assert_approx_eq(avg, 0.1, "~0.1");
353 }
354
355 #[test]
356 #[serial]
357 fn test_utilization_in_bounds_on_late_stop() {
358 let mut timer = setup_timer();
359
360 MockClock::advance(Duration::from_secs(5));
361
362 timer.waiting = true;
363 timer.update_utilization();
364
365 let avg = timer.ewma.average().unwrap();
366 assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
367
368 timer.stop_wait(Instant::now() - Duration::from_secs(4));
370 MockClock::advance(Duration::from_secs(5));
371
372 timer.update_utilization();
373 let avg = timer.ewma.average().unwrap();
374 assert_approx_eq(avg, 0.9, "~0.9");
375 }
376
377 #[test]
378 #[serial]
379 fn test_normal_utilization_within_bounds() {
380 let mut timer = setup_timer();
381
382 MockClock::advance(Duration::from_secs(1));
384 timer.start_wait(Instant::now());
385
386 MockClock::advance(Duration::from_secs(2));
388 timer.stop_wait(Instant::now());
389
390 MockClock::advance(Duration::from_secs(2));
392 timer.update_utilization();
393
394 let avg = timer.ewma.average().unwrap();
397 assert_approx_eq(avg, 0.6, "~0.6");
398 }
399
400 #[test]
401 #[serial]
402 fn test_always_waiting_utilization() {
403 let mut timer = setup_timer();
404
405 timer.start_wait(Instant::now());
407
408 MockClock::advance(Duration::from_secs(5));
410 timer.update_utilization();
411
412 let avg = timer.ewma.average().unwrap();
415 assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
416 }
417
418 #[test]
419 #[serial]
420 fn test_never_waiting_utilization() {
421 let mut timer = setup_timer();
422
423 MockClock::advance(Duration::from_secs(5));
425 timer.update_utilization();
426
427 let avg = timer.ewma.average().unwrap();
430 assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
431 }
432}