vector/
utilization.rs

1use std::{
2    collections::HashMap,
3    pin::Pin,
4    sync::{Arc, Mutex},
5    task::{Context, Poll, ready},
6    time::Duration,
7};
8
9#[cfg(not(test))]
10use std::time::Instant;
11
12#[cfg(test)]
13use mock_instant::global::Instant;
14
15use futures::{Stream, StreamExt};
16use metrics::Gauge;
17use pin_project::pin_project;
18use tokio::{
19    sync::mpsc::{Receiver, Sender, channel},
20    time::interval,
21};
22use tokio_stream::wrappers::IntervalStream;
23use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal};
24
25use crate::stats;
26
27const UTILIZATION_EMITTER_DURATION: Duration = Duration::from_secs(5);
28
29#[pin_project]
30pub(crate) struct Utilization<S> {
31    intervals: IntervalStream,
32    timer_tx: UtilizationComponentSender,
33    component_key: ComponentKey,
34    inner: S,
35}
36
37impl<S> Utilization<S> {
38    /// Consumes this wrapper and returns the inner stream.
39    ///
40    /// This can't be constant because destructors can't be run in a const context, and we're
41    /// discarding `IntervalStream`/`Timer` when we call this.
42    #[allow(clippy::missing_const_for_fn)]
43    pub(crate) fn into_inner(self) -> S {
44        self.inner
45    }
46}
47
48impl<S> Stream for Utilization<S>
49where
50    S: Stream + Unpin,
51{
52    type Item = S::Item;
53
54    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55        // The goal of this function is to measure the time between when the
56        // caller requests the next Event from the stream and before one is
57        // ready, with the side-effect of reporting every so often about how
58        // long the wait gap is.
59        //
60        // This will just measure the time, while UtilizationEmitter collects
61        // all the timers and emits utilization value periodically
62        let this = self.project();
63        this.timer_tx.try_send_start_wait();
64        let _ = this.intervals.poll_next_unpin(cx);
65        let result = ready!(this.inner.poll_next_unpin(cx));
66        this.timer_tx.try_send_stop_wait();
67        Poll::Ready(result)
68    }
69}
70
71pub(crate) struct Timer {
72    overall_start: Instant,
73    span_start: Instant,
74    waiting: bool,
75    total_wait: Duration,
76    ewma: stats::Ewma,
77    gauge: Gauge,
78    #[cfg(debug_assertions)]
79    report_count: u32,
80    #[cfg(debug_assertions)]
81    component_id: Arc<str>,
82}
83
84/// A simple, specialized timer for tracking spans of waiting vs not-waiting
85/// time and reporting a smoothed estimate of utilization.
86///
87/// This implementation uses the idea of spans and reporting periods. Spans are
88/// a period of time spent entirely in one state, aligning with state
89/// transitions but potentially more granular.  Reporting periods are expected
90/// to be of uniform length and used to aggregate span data into time-weighted
91/// averages.
92impl Timer {
93    pub(crate) fn new(gauge: Gauge, #[cfg(debug_assertions)] component_id: Arc<str>) -> Self {
94        Self {
95            overall_start: Instant::now(),
96            span_start: Instant::now(),
97            waiting: false,
98            total_wait: Duration::new(0, 0),
99            ewma: stats::Ewma::new(0.9),
100            gauge,
101            #[cfg(debug_assertions)]
102            report_count: 0,
103            #[cfg(debug_assertions)]
104            component_id,
105        }
106    }
107
108    /// Begin a new span representing time spent waiting
109    pub(crate) fn start_wait(&mut self, at: Instant) {
110        if !self.waiting {
111            // Clamp start time in case of a late message
112            self.end_span(at.max(self.overall_start));
113            self.waiting = true;
114        }
115    }
116
117    /// Complete the current waiting span and begin a non-waiting span
118    pub(crate) fn stop_wait(&mut self, at: Instant) {
119        if self.waiting {
120            // Clamp stop time in case of a late message
121            self.end_span(at.max(self.overall_start));
122            self.waiting = false;
123        }
124    }
125
126    /// Meant to be called on a regular interval, this method calculates wait
127    /// ratio since the last time it was called and reports the resulting
128    /// utilization average.
129    pub(crate) fn update_utilization(&mut self) {
130        // End the current span so it can be accounted for, but do not change
131        // whether or not we're in the waiting state. This way the next span
132        // inherits the correct status.
133        let now = Instant::now();
134        self.end_span(now);
135
136        let total_duration = now.duration_since(self.overall_start);
137        let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
138        let utilization = 1.0 - wait_ratio;
139
140        self.ewma.update(utilization);
141        let avg = self.ewma.average().unwrap_or(f64::NAN);
142        let avg_rounded = (avg * 10000.0).round() / 10000.0; // 4 digit precision
143        self.gauge.set(avg_rounded);
144
145        // Reset overall statistics for the next reporting period.
146        self.overall_start = now;
147        self.total_wait = Duration::new(0, 0);
148
149        #[cfg(debug_assertions)]
150        self.report(avg_rounded);
151    }
152
153    #[cfg(debug_assertions)]
154    fn report(&mut self, utilization: f64) {
155        // Note that changing the reporting interval would also affect the actual metric reporting frequency.
156        // This check reduces debug log spamming.
157        if self.report_count.is_multiple_of(5) {
158            debug!(component_id = %self.component_id, %utilization);
159        }
160        self.report_count = self.report_count.wrapping_add(1);
161    }
162
163    fn end_span(&mut self, at: Instant) {
164        if self.waiting {
165            // `at` can be before span start here, the result will be clamped to 0
166            // because `duration_since` returns zero if `at` is before span start
167            self.total_wait += at.duration_since(self.span_start);
168        }
169        self.span_start = at;
170    }
171}
172
173#[derive(Debug)]
174enum UtilizationTimerMessage {
175    StartWait(ComponentKey, Instant),
176    StopWait(ComponentKey, Instant),
177}
178
179pub(crate) struct UtilizationComponentSender {
180    component_key: ComponentKey,
181    timer_tx: Sender<UtilizationTimerMessage>,
182}
183
184impl UtilizationComponentSender {
185    pub(crate) fn try_send_start_wait(&self) {
186        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
187            self.component_key.clone(),
188            Instant::now(),
189        )) {
190            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
191        }
192    }
193
194    pub(crate) fn try_send_stop_wait(&self) {
195        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
196            self.component_key.clone(),
197            Instant::now(),
198        )) {
199            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
200        }
201    }
202}
203
204/// Registry for components sending utilization data.
205///
206/// Cloning this is cheap and does not clone the underlying data.
207#[derive(Clone)]
208pub struct UtilizationRegistry {
209    timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
210    timer_tx: Sender<UtilizationTimerMessage>,
211}
212
213impl UtilizationRegistry {
214    /// Adds a new component to this utilization metric emitter
215    ///
216    /// Returns a sender which can be used to send utilization information back to the emitter
217    pub(crate) fn add_component(
218        &self,
219        key: ComponentKey,
220        gauge: Gauge,
221    ) -> UtilizationComponentSender {
222        self.timers.lock().expect("mutex poisoned").insert(
223            key.clone(),
224            Timer::new(
225                gauge,
226                #[cfg(debug_assertions)]
227                key.id().into(),
228            ),
229        );
230        UtilizationComponentSender {
231            timer_tx: self.timer_tx.clone(),
232            component_key: key,
233        }
234    }
235
236    /// Removes a component from this utilization metric emitter
237    pub(crate) fn remove_component(&self, key: &ComponentKey) {
238        self.timers.lock().expect("mutex poisoned").remove(key);
239    }
240}
241
242pub(crate) struct UtilizationEmitter {
243    timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
244    timer_rx: Receiver<UtilizationTimerMessage>,
245}
246
247impl UtilizationEmitter {
248    pub(crate) fn new() -> (Self, UtilizationRegistry) {
249        let (timer_tx, timer_rx) = channel(4096);
250        let timers = Arc::new(Mutex::new(HashMap::default()));
251        (
252            Self {
253                timers: Arc::clone(&timers),
254                timer_rx,
255            },
256            UtilizationRegistry { timers, timer_tx },
257        )
258    }
259
260    pub(crate) async fn run_utilization(mut self, mut shutdown: ShutdownSignal) {
261        let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
262        loop {
263            tokio::select! {
264                message = self.timer_rx.recv() => {
265                    match message {
266                        Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
267                            // Timer could be removed in the registry while message is still in the queue
268                            if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
269                                timer.start_wait(start_time);
270                            }
271                        }
272                        Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
273                            // Timer could be removed in the registry while message is still in the queue
274                            if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
275                                timer.stop_wait(stop_time);
276                            }
277                        }
278                        None => break,
279                    }
280                },
281
282                Some(_) = intervals.next() => {
283                    for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
284                        timer.update_utilization();
285                    }
286                },
287
288                _ = &mut shutdown => {
289                    break
290                }
291            }
292        }
293    }
294}
295
296/// Wrap a stream to emit stats about utilization. This is designed for use with
297/// the input channels of transform and sinks components, and measures the
298/// amount of time that the stream is waiting for input from upstream. We make
299/// the simplifying assumption that this wait time is when the component is idle
300/// and the rest of the time it is doing useful work. This is more true for
301/// sinks than transforms, which can be blocked by downstream components, but
302/// with knowledge of the config the data is still useful.
303pub(crate) fn wrap<S>(
304    timer_tx: UtilizationComponentSender,
305    component_key: ComponentKey,
306    inner: S,
307) -> Utilization<S> {
308    Utilization {
309        intervals: IntervalStream::new(interval(Duration::from_secs(5))),
310        timer_tx,
311        component_key,
312        inner,
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use mock_instant::global::MockClock;
319    use serial_test::serial;
320
321    use super::*;
322
323    /// Helper function to reset mock clock and create a timer at T=100
324    fn setup_timer() -> Timer {
325        // Set mock clock to T=100
326        MockClock::set_time(Duration::from_secs(100));
327
328        Timer::new(
329            metrics::gauge!("test_utilization"),
330            #[cfg(debug_assertions)]
331            "test_component".into(),
332        )
333    }
334
335    const TOLERANCE: f64 = 0.01;
336
337    /// Helper function to assert utilization is approximately equal to expected value
338    /// and within valid bounds [0, 1]
339    fn assert_approx_eq(actual: f64, expected: f64, description: &str) {
340        assert!(
341            (0.0..=1.0).contains(&actual),
342            "Utilization {actual} is outside [0, 1]"
343        );
344        assert!(
345            (actual - expected).abs() < TOLERANCE,
346            "Expected utilization {description}, got {actual}"
347        );
348    }
349
350    #[test]
351    #[serial]
352    fn test_utilization_in_bounds_on_late_start() {
353        let mut timer = setup_timer();
354
355        MockClock::advance(Duration::from_secs(5));
356
357        timer.update_utilization();
358
359        let avg = timer.ewma.average().unwrap();
360        assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
361
362        // Late message for start wait
363        timer.start_wait(Instant::now() - Duration::from_secs(1));
364        MockClock::advance(Duration::from_secs(5));
365
366        timer.update_utilization();
367        let avg = timer.ewma.average().unwrap();
368        assert_approx_eq(avg, 0.1, "~0.1");
369    }
370
371    #[test]
372    #[serial]
373    fn test_utilization_in_bounds_on_late_stop() {
374        let mut timer = setup_timer();
375
376        MockClock::advance(Duration::from_secs(5));
377
378        timer.waiting = true;
379        timer.update_utilization();
380
381        let avg = timer.ewma.average().unwrap();
382        assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
383
384        // Late message for stop wait
385        timer.stop_wait(Instant::now() - Duration::from_secs(4));
386        MockClock::advance(Duration::from_secs(5));
387
388        timer.update_utilization();
389        let avg = timer.ewma.average().unwrap();
390        assert_approx_eq(avg, 0.9, "~0.9");
391    }
392
393    #[test]
394    #[serial]
395    fn test_normal_utilization_within_bounds() {
396        let mut timer = setup_timer();
397
398        // Timer created at T=100. Advance 1 second and start waiting
399        MockClock::advance(Duration::from_secs(1));
400        timer.start_wait(Instant::now());
401
402        // Advance 2 seconds while waiting (T=101 to T=103)
403        MockClock::advance(Duration::from_secs(2));
404        timer.stop_wait(Instant::now());
405
406        // Advance 2 more seconds (not waiting), then report (T=103 to T=105)
407        MockClock::advance(Duration::from_secs(2));
408        timer.update_utilization();
409
410        // total_wait = 2 seconds, total_duration = 5 seconds (T=100 to T=105)
411        // wait_ratio = 2/5 = 0.4, utilization = 1.0 - 0.4 = 0.6
412        let avg = timer.ewma.average().unwrap();
413        assert_approx_eq(avg, 0.6, "~0.6");
414    }
415
416    #[test]
417    #[serial]
418    fn test_always_waiting_utilization() {
419        let mut timer = setup_timer();
420
421        // Timer created at T=100. Start waiting immediately
422        timer.start_wait(Instant::now());
423
424        // Advance 5 seconds while waiting (T=100 to T=105)
425        MockClock::advance(Duration::from_secs(5));
426        timer.update_utilization();
427
428        // We waited the entire time: total_wait = 5s, total_duration = 5s
429        // wait_ratio = 1.0, utilization = 0.0
430        let avg = timer.ewma.average().unwrap();
431        assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
432    }
433
434    #[test]
435    #[serial]
436    fn test_never_waiting_utilization() {
437        let mut timer = setup_timer();
438
439        // Advance 5 seconds without waiting (T=100 to T=105)
440        MockClock::advance(Duration::from_secs(5));
441        timer.update_utilization();
442
443        // Never waited: total_wait = 0, total_duration = 5s
444        // wait_ratio = 0.0, utilization = 1.0
445        let avg = timer.ewma.average().unwrap();
446        assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
447    }
448}