vector/
utilization.rs

1use std::{
2    collections::HashMap,
3    pin::Pin,
4    sync::{Arc, Mutex},
5    task::{Context, Poll, ready},
6    time::Duration,
7};
8
9#[cfg(not(test))]
10use std::time::Instant;
11
12#[cfg(test)]
13use mock_instant::global::Instant;
14
15use futures::{Stream, StreamExt};
16use metrics::Gauge;
17use pin_project::pin_project;
18use tokio::{
19    sync::mpsc::{Receiver, Sender, channel},
20    time::interval,
21};
22use tokio_stream::wrappers::IntervalStream;
23use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal, stats};
24
25const UTILIZATION_EMITTER_DURATION: Duration = Duration::from_secs(5);
26
27#[pin_project]
28pub(crate) struct Utilization<S> {
29    intervals: IntervalStream,
30    timer_tx: UtilizationComponentSender,
31    component_key: ComponentKey,
32    inner: S,
33}
34
35impl<S> Utilization<S> {
36    /// Consumes this wrapper and returns the inner stream.
37    ///
38    /// This can't be constant because destructors can't be run in a const context, and we're
39    /// discarding `IntervalStream`/`Timer` when we call this.
40    #[allow(clippy::missing_const_for_fn)]
41    pub(crate) fn into_inner(self) -> S {
42        self.inner
43    }
44}
45
46impl<S> Stream for Utilization<S>
47where
48    S: Stream + Unpin,
49{
50    type Item = S::Item;
51
52    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53        // The goal of this function is to measure the time between when the
54        // caller requests the next Event from the stream and before one is
55        // ready, with the side-effect of reporting every so often about how
56        // long the wait gap is.
57        //
58        // This will just measure the time, while UtilizationEmitter collects
59        // all the timers and emits utilization value periodically
60        let this = self.project();
61        this.timer_tx.try_send_start_wait();
62        let _ = this.intervals.poll_next_unpin(cx);
63        let result = ready!(this.inner.poll_next_unpin(cx));
64        this.timer_tx.try_send_stop_wait();
65        Poll::Ready(result)
66    }
67}
68
69pub(crate) struct Timer {
70    overall_start: Instant,
71    span_start: Instant,
72    waiting: bool,
73    total_wait: Duration,
74    ewma: stats::Ewma,
75    gauge: Gauge,
76    #[cfg(debug_assertions)]
77    component_id: Arc<str>,
78}
79
80/// A simple, specialized timer for tracking spans of waiting vs not-waiting
81/// time and reporting a smoothed estimate of utilization.
82///
83/// This implementation uses the idea of spans and reporting periods. Spans are
84/// a period of time spent entirely in one state, aligning with state
85/// transitions but potentially more granular.  Reporting periods are expected
86/// to be of uniform length and used to aggregate span data into time-weighted
87/// averages.
88impl Timer {
89    pub(crate) fn new(gauge: Gauge, #[cfg(debug_assertions)] component_id: Arc<str>) -> Self {
90        Self {
91            overall_start: Instant::now(),
92            span_start: Instant::now(),
93            waiting: false,
94            total_wait: Duration::new(0, 0),
95            ewma: stats::Ewma::new(0.9),
96            gauge,
97            #[cfg(debug_assertions)]
98            component_id,
99        }
100    }
101
102    /// Begin a new span representing time spent waiting
103    pub(crate) fn start_wait(&mut self, at: Instant) {
104        if !self.waiting {
105            // Clamp start time in case of a late message
106            self.end_span(at.max(self.overall_start));
107            self.waiting = true;
108        }
109    }
110
111    /// Complete the current waiting span and begin a non-waiting span
112    pub(crate) fn stop_wait(&mut self, at: Instant) {
113        if self.waiting {
114            // Clamp stop time in case of a late message
115            self.end_span(at.max(self.overall_start));
116            self.waiting = false;
117        }
118    }
119
120    /// Meant to be called on a regular interval, this method calculates wait
121    /// ratio since the last time it was called and reports the resulting
122    /// utilization average.
123    pub(crate) fn update_utilization(&mut self) {
124        // End the current span so it can be accounted for, but do not change
125        // whether or not we're in the waiting state. This way the next span
126        // inherits the correct status.
127        let now = Instant::now();
128        self.end_span(now);
129
130        let total_duration = now.duration_since(self.overall_start);
131        let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
132        let utilization = 1.0 - wait_ratio;
133
134        self.ewma.update(utilization);
135        let avg = self.ewma.average().unwrap_or(f64::NAN);
136        let avg_rounded = (avg * 10000.0).round() / 10000.0; // 4 digit precision
137        self.gauge.set(avg_rounded);
138
139        // Reset overall statistics for the next reporting period.
140        self.overall_start = now;
141        self.total_wait = Duration::new(0, 0);
142
143        #[cfg(debug_assertions)]
144        debug!(component_id = %self.component_id, utilization = %avg_rounded, internal_log_rate_limit = false);
145    }
146
147    fn end_span(&mut self, at: Instant) {
148        if self.waiting {
149            // `at` can be before span start here, the result will be clamped to 0
150            // because `duration_since` returns zero if `at` is before span start
151            self.total_wait += at.duration_since(self.span_start);
152        }
153        self.span_start = at;
154    }
155}
156
157#[derive(Debug)]
158enum UtilizationTimerMessage {
159    StartWait(ComponentKey, Instant),
160    StopWait(ComponentKey, Instant),
161}
162
163pub(crate) struct UtilizationComponentSender {
164    component_key: ComponentKey,
165    timer_tx: Sender<UtilizationTimerMessage>,
166}
167
168impl UtilizationComponentSender {
169    pub(crate) fn try_send_start_wait(&self) {
170        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
171            self.component_key.clone(),
172            Instant::now(),
173        )) {
174            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
175        }
176    }
177
178    pub(crate) fn try_send_stop_wait(&self) {
179        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
180            self.component_key.clone(),
181            Instant::now(),
182        )) {
183            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
184        }
185    }
186}
187
188/// Registry for components sending utilization data.
189///
190/// Cloning this is cheap and does not clone the underlying data.
191#[derive(Clone)]
192pub struct UtilizationRegistry {
193    timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
194    timer_tx: Sender<UtilizationTimerMessage>,
195}
196
197impl UtilizationRegistry {
198    /// Adds a new component to this utilization metric emitter
199    ///
200    /// Returns a sender which can be used to send utilization information back to the emitter
201    pub(crate) fn add_component(
202        &self,
203        key: ComponentKey,
204        gauge: Gauge,
205    ) -> UtilizationComponentSender {
206        self.timers.lock().expect("mutex poisoned").insert(
207            key.clone(),
208            Timer::new(
209                gauge,
210                #[cfg(debug_assertions)]
211                key.id().into(),
212            ),
213        );
214        UtilizationComponentSender {
215            timer_tx: self.timer_tx.clone(),
216            component_key: key,
217        }
218    }
219
220    /// Removes a component from this utilization metric emitter
221    pub(crate) fn remove_component(&self, key: &ComponentKey) {
222        self.timers.lock().expect("mutex poisoned").remove(key);
223    }
224}
225
226pub(crate) struct UtilizationEmitter {
227    timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
228    timer_rx: Receiver<UtilizationTimerMessage>,
229}
230
231impl UtilizationEmitter {
232    pub(crate) fn new() -> (Self, UtilizationRegistry) {
233        let (timer_tx, timer_rx) = channel(4096);
234        let timers = Arc::new(Mutex::new(HashMap::default()));
235        (
236            Self {
237                timers: Arc::clone(&timers),
238                timer_rx,
239            },
240            UtilizationRegistry { timers, timer_tx },
241        )
242    }
243
244    pub(crate) async fn run_utilization(mut self, mut shutdown: ShutdownSignal) {
245        let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
246        loop {
247            tokio::select! {
248                message = self.timer_rx.recv() => {
249                    match message {
250                        Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
251                            // Timer could be removed in the registry while message is still in the queue
252                            if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
253                                timer.start_wait(start_time);
254                            }
255                        }
256                        Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
257                            // Timer could be removed in the registry while message is still in the queue
258                            if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
259                                timer.stop_wait(stop_time);
260                            }
261                        }
262                        None => break,
263                    }
264                },
265
266                Some(_) = intervals.next() => {
267                    for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
268                        timer.update_utilization();
269                    }
270                },
271
272                _ = &mut shutdown => {
273                    break
274                }
275            }
276        }
277    }
278}
279
280/// Wrap a stream to emit stats about utilization. This is designed for use with
281/// the input channels of transform and sinks components, and measures the
282/// amount of time that the stream is waiting for input from upstream. We make
283/// the simplifying assumption that this wait time is when the component is idle
284/// and the rest of the time it is doing useful work. This is more true for
285/// sinks than transforms, which can be blocked by downstream components, but
286/// with knowledge of the config the data is still useful.
287pub(crate) fn wrap<S>(
288    timer_tx: UtilizationComponentSender,
289    component_key: ComponentKey,
290    inner: S,
291) -> Utilization<S> {
292    Utilization {
293        intervals: IntervalStream::new(interval(Duration::from_secs(5))),
294        timer_tx,
295        component_key,
296        inner,
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use mock_instant::global::MockClock;
303    use serial_test::serial;
304
305    use super::*;
306
307    /// Helper function to reset mock clock and create a timer at T=100
308    fn setup_timer() -> Timer {
309        // Set mock clock to T=100
310        MockClock::set_time(Duration::from_secs(100));
311
312        Timer::new(
313            metrics::gauge!("test_utilization"),
314            #[cfg(debug_assertions)]
315            "test_component".into(),
316        )
317    }
318
319    const TOLERANCE: f64 = 0.01;
320
321    /// Helper function to assert utilization is approximately equal to expected value
322    /// and within valid bounds [0, 1]
323    fn assert_approx_eq(actual: f64, expected: f64, description: &str) {
324        assert!(
325            (0.0..=1.0).contains(&actual),
326            "Utilization {actual} is outside [0, 1]"
327        );
328        assert!(
329            (actual - expected).abs() < TOLERANCE,
330            "Expected utilization {description}, got {actual}"
331        );
332    }
333
334    #[test]
335    #[serial]
336    fn test_utilization_in_bounds_on_late_start() {
337        let mut timer = setup_timer();
338
339        MockClock::advance(Duration::from_secs(5));
340
341        timer.update_utilization();
342
343        let avg = timer.ewma.average().unwrap();
344        assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
345
346        // Late message for start wait
347        timer.start_wait(Instant::now() - Duration::from_secs(1));
348        MockClock::advance(Duration::from_secs(5));
349
350        timer.update_utilization();
351        let avg = timer.ewma.average().unwrap();
352        assert_approx_eq(avg, 0.1, "~0.1");
353    }
354
355    #[test]
356    #[serial]
357    fn test_utilization_in_bounds_on_late_stop() {
358        let mut timer = setup_timer();
359
360        MockClock::advance(Duration::from_secs(5));
361
362        timer.waiting = true;
363        timer.update_utilization();
364
365        let avg = timer.ewma.average().unwrap();
366        assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
367
368        // Late message for stop wait
369        timer.stop_wait(Instant::now() - Duration::from_secs(4));
370        MockClock::advance(Duration::from_secs(5));
371
372        timer.update_utilization();
373        let avg = timer.ewma.average().unwrap();
374        assert_approx_eq(avg, 0.9, "~0.9");
375    }
376
377    #[test]
378    #[serial]
379    fn test_normal_utilization_within_bounds() {
380        let mut timer = setup_timer();
381
382        // Timer created at T=100. Advance 1 second and start waiting
383        MockClock::advance(Duration::from_secs(1));
384        timer.start_wait(Instant::now());
385
386        // Advance 2 seconds while waiting (T=101 to T=103)
387        MockClock::advance(Duration::from_secs(2));
388        timer.stop_wait(Instant::now());
389
390        // Advance 2 more seconds (not waiting), then report (T=103 to T=105)
391        MockClock::advance(Duration::from_secs(2));
392        timer.update_utilization();
393
394        // total_wait = 2 seconds, total_duration = 5 seconds (T=100 to T=105)
395        // wait_ratio = 2/5 = 0.4, utilization = 1.0 - 0.4 = 0.6
396        let avg = timer.ewma.average().unwrap();
397        assert_approx_eq(avg, 0.6, "~0.6");
398    }
399
400    #[test]
401    #[serial]
402    fn test_always_waiting_utilization() {
403        let mut timer = setup_timer();
404
405        // Timer created at T=100. Start waiting immediately
406        timer.start_wait(Instant::now());
407
408        // Advance 5 seconds while waiting (T=100 to T=105)
409        MockClock::advance(Duration::from_secs(5));
410        timer.update_utilization();
411
412        // We waited the entire time: total_wait = 5s, total_duration = 5s
413        // wait_ratio = 1.0, utilization = 0.0
414        let avg = timer.ewma.average().unwrap();
415        assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
416    }
417
418    #[test]
419    #[serial]
420    fn test_never_waiting_utilization() {
421        let mut timer = setup_timer();
422
423        // Advance 5 seconds without waiting (T=100 to T=105)
424        MockClock::advance(Duration::from_secs(5));
425        timer.update_utilization();
426
427        // Never waited: total_wait = 0, total_duration = 5s
428        // wait_ratio = 0.0, utilization = 1.0
429        let avg = timer.ewma.average().unwrap();
430        assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
431    }
432}