vector/
utilization.rs

1use std::{
2    collections::HashMap,
3    pin::Pin,
4    sync::{Arc, Mutex},
5    task::{Context, Poll, ready},
6    time::Duration,
7};
8
9#[cfg(not(test))]
10use std::time::Instant;
11
12#[cfg(test)]
13use mock_instant::global::Instant;
14
15use futures::{Stream, StreamExt};
16use metrics::Gauge;
17use pin_project::pin_project;
18use tokio::{
19    sync::mpsc::{Receiver, Sender, channel},
20    time::interval,
21};
22use tokio_stream::wrappers::IntervalStream;
23use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal};
24
25use crate::stats;
26
27const UTILIZATION_EMITTER_DURATION: Duration = Duration::from_secs(5);
28
29#[pin_project]
30pub(crate) struct Utilization<S> {
31    intervals: IntervalStream,
32    timer_tx: UtilizationComponentSender,
33    component_key: ComponentKey,
34    inner: S,
35}
36
37impl<S> Utilization<S> {
38    /// Consumes this wrapper and returns the inner stream.
39    ///
40    /// This can't be constant because destructors can't be run in a const context, and we're
41    /// discarding `IntervalStream`/`Timer` when we call this.
42    #[allow(clippy::missing_const_for_fn)]
43    pub(crate) fn into_inner(self) -> S {
44        self.inner
45    }
46}
47
48impl<S> Stream for Utilization<S>
49where
50    S: Stream + Unpin,
51{
52    type Item = S::Item;
53
54    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55        // The goal of this function is to measure the time between when the
56        // caller requests the next Event from the stream and before one is
57        // ready, with the side-effect of reporting every so often about how
58        // long the wait gap is.
59        //
60        // This will just measure the time, while UtilizationEmitter collects
61        // all the timers and emits utilization value periodically
62        let this = self.project();
63        this.timer_tx.try_send_start_wait();
64        let _ = this.intervals.poll_next_unpin(cx);
65        let result = ready!(this.inner.poll_next_unpin(cx));
66        this.timer_tx.try_send_stop_wait();
67        Poll::Ready(result)
68    }
69}
70
71pub(crate) struct Timer {
72    overall_start: Instant,
73    span_start: Instant,
74    waiting: bool,
75    total_wait: Duration,
76    ewma: stats::Ewma,
77    gauge: Gauge,
78    #[cfg(debug_assertions)]
79    component_id: Arc<str>,
80}
81
82/// A simple, specialized timer for tracking spans of waiting vs not-waiting
83/// time and reporting a smoothed estimate of utilization.
84///
85/// This implementation uses the idea of spans and reporting periods. Spans are
86/// a period of time spent entirely in one state, aligning with state
87/// transitions but potentially more granular.  Reporting periods are expected
88/// to be of uniform length and used to aggregate span data into time-weighted
89/// averages.
90impl Timer {
91    pub(crate) fn new(gauge: Gauge, #[cfg(debug_assertions)] component_id: Arc<str>) -> Self {
92        Self {
93            overall_start: Instant::now(),
94            span_start: Instant::now(),
95            waiting: false,
96            total_wait: Duration::new(0, 0),
97            ewma: stats::Ewma::new(0.9),
98            gauge,
99            #[cfg(debug_assertions)]
100            component_id,
101        }
102    }
103
104    /// Begin a new span representing time spent waiting
105    pub(crate) fn start_wait(&mut self, at: Instant) {
106        if !self.waiting {
107            // Clamp start time in case of a late message
108            self.end_span(at.max(self.overall_start));
109            self.waiting = true;
110        }
111    }
112
113    /// Complete the current waiting span and begin a non-waiting span
114    pub(crate) fn stop_wait(&mut self, at: Instant) {
115        if self.waiting {
116            // Clamp stop time in case of a late message
117            self.end_span(at.max(self.overall_start));
118            self.waiting = false;
119        }
120    }
121
122    /// Meant to be called on a regular interval, this method calculates wait
123    /// ratio since the last time it was called and reports the resulting
124    /// utilization average.
125    pub(crate) fn update_utilization(&mut self) {
126        // End the current span so it can be accounted for, but do not change
127        // whether or not we're in the waiting state. This way the next span
128        // inherits the correct status.
129        let now = Instant::now();
130        self.end_span(now);
131
132        let total_duration = now.duration_since(self.overall_start);
133        let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
134        let utilization = 1.0 - wait_ratio;
135
136        self.ewma.update(utilization);
137        let avg = self.ewma.average().unwrap_or(f64::NAN);
138        let avg_rounded = (avg * 10000.0).round() / 10000.0; // 4 digit precision
139        self.gauge.set(avg_rounded);
140
141        // Reset overall statistics for the next reporting period.
142        self.overall_start = now;
143        self.total_wait = Duration::new(0, 0);
144
145        #[cfg(debug_assertions)]
146        debug!(component_id = %self.component_id, utilization = %avg_rounded, internal_log_rate_limit = false);
147    }
148
149    fn end_span(&mut self, at: Instant) {
150        if self.waiting {
151            // `at` can be before span start here, the result will be clamped to 0
152            // because `duration_since` returns zero if `at` is before span start
153            self.total_wait += at.duration_since(self.span_start);
154        }
155        self.span_start = at;
156    }
157}
158
159#[derive(Debug)]
160enum UtilizationTimerMessage {
161    StartWait(ComponentKey, Instant),
162    StopWait(ComponentKey, Instant),
163}
164
165pub(crate) struct UtilizationComponentSender {
166    component_key: ComponentKey,
167    timer_tx: Sender<UtilizationTimerMessage>,
168}
169
170impl UtilizationComponentSender {
171    pub(crate) fn try_send_start_wait(&self) {
172        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
173            self.component_key.clone(),
174            Instant::now(),
175        )) {
176            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
177        }
178    }
179
180    pub(crate) fn try_send_stop_wait(&self) {
181        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
182            self.component_key.clone(),
183            Instant::now(),
184        )) {
185            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
186        }
187    }
188}
189
190/// Registry for components sending utilization data.
191///
192/// Cloning this is cheap and does not clone the underlying data.
193#[derive(Clone)]
194pub struct UtilizationRegistry {
195    timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
196    timer_tx: Sender<UtilizationTimerMessage>,
197}
198
199impl UtilizationRegistry {
200    /// Adds a new component to this utilization metric emitter
201    ///
202    /// Returns a sender which can be used to send utilization information back to the emitter
203    pub(crate) fn add_component(
204        &self,
205        key: ComponentKey,
206        gauge: Gauge,
207    ) -> UtilizationComponentSender {
208        self.timers.lock().expect("mutex poisoned").insert(
209            key.clone(),
210            Timer::new(
211                gauge,
212                #[cfg(debug_assertions)]
213                key.id().into(),
214            ),
215        );
216        UtilizationComponentSender {
217            timer_tx: self.timer_tx.clone(),
218            component_key: key,
219        }
220    }
221
222    /// Removes a component from this utilization metric emitter
223    pub(crate) fn remove_component(&self, key: &ComponentKey) {
224        self.timers.lock().expect("mutex poisoned").remove(key);
225    }
226}
227
228pub(crate) struct UtilizationEmitter {
229    timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
230    timer_rx: Receiver<UtilizationTimerMessage>,
231}
232
233impl UtilizationEmitter {
234    pub(crate) fn new() -> (Self, UtilizationRegistry) {
235        let (timer_tx, timer_rx) = channel(4096);
236        let timers = Arc::new(Mutex::new(HashMap::default()));
237        (
238            Self {
239                timers: Arc::clone(&timers),
240                timer_rx,
241            },
242            UtilizationRegistry { timers, timer_tx },
243        )
244    }
245
246    pub(crate) async fn run_utilization(mut self, mut shutdown: ShutdownSignal) {
247        let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
248        loop {
249            tokio::select! {
250                message = self.timer_rx.recv() => {
251                    match message {
252                        Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
253                            // Timer could be removed in the registry while message is still in the queue
254                            if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
255                                timer.start_wait(start_time);
256                            }
257                        }
258                        Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
259                            // Timer could be removed in the registry while message is still in the queue
260                            if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
261                                timer.stop_wait(stop_time);
262                            }
263                        }
264                        None => break,
265                    }
266                },
267
268                Some(_) = intervals.next() => {
269                    for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
270                        timer.update_utilization();
271                    }
272                },
273
274                _ = &mut shutdown => {
275                    break
276                }
277            }
278        }
279    }
280}
281
282/// Wrap a stream to emit stats about utilization. This is designed for use with
283/// the input channels of transform and sinks components, and measures the
284/// amount of time that the stream is waiting for input from upstream. We make
285/// the simplifying assumption that this wait time is when the component is idle
286/// and the rest of the time it is doing useful work. This is more true for
287/// sinks than transforms, which can be blocked by downstream components, but
288/// with knowledge of the config the data is still useful.
289pub(crate) fn wrap<S>(
290    timer_tx: UtilizationComponentSender,
291    component_key: ComponentKey,
292    inner: S,
293) -> Utilization<S> {
294    Utilization {
295        intervals: IntervalStream::new(interval(Duration::from_secs(5))),
296        timer_tx,
297        component_key,
298        inner,
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use mock_instant::global::MockClock;
305    use serial_test::serial;
306
307    use super::*;
308
309    /// Helper function to reset mock clock and create a timer at T=100
310    fn setup_timer() -> Timer {
311        // Set mock clock to T=100
312        MockClock::set_time(Duration::from_secs(100));
313
314        Timer::new(
315            metrics::gauge!("test_utilization"),
316            #[cfg(debug_assertions)]
317            "test_component".into(),
318        )
319    }
320
321    const TOLERANCE: f64 = 0.01;
322
323    /// Helper function to assert utilization is approximately equal to expected value
324    /// and within valid bounds [0, 1]
325    fn assert_approx_eq(actual: f64, expected: f64, description: &str) {
326        assert!(
327            (0.0..=1.0).contains(&actual),
328            "Utilization {actual} is outside [0, 1]"
329        );
330        assert!(
331            (actual - expected).abs() < TOLERANCE,
332            "Expected utilization {description}, got {actual}"
333        );
334    }
335
336    #[test]
337    #[serial]
338    fn test_utilization_in_bounds_on_late_start() {
339        let mut timer = setup_timer();
340
341        MockClock::advance(Duration::from_secs(5));
342
343        timer.update_utilization();
344
345        let avg = timer.ewma.average().unwrap();
346        assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
347
348        // Late message for start wait
349        timer.start_wait(Instant::now() - Duration::from_secs(1));
350        MockClock::advance(Duration::from_secs(5));
351
352        timer.update_utilization();
353        let avg = timer.ewma.average().unwrap();
354        assert_approx_eq(avg, 0.1, "~0.1");
355    }
356
357    #[test]
358    #[serial]
359    fn test_utilization_in_bounds_on_late_stop() {
360        let mut timer = setup_timer();
361
362        MockClock::advance(Duration::from_secs(5));
363
364        timer.waiting = true;
365        timer.update_utilization();
366
367        let avg = timer.ewma.average().unwrap();
368        assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
369
370        // Late message for stop wait
371        timer.stop_wait(Instant::now() - Duration::from_secs(4));
372        MockClock::advance(Duration::from_secs(5));
373
374        timer.update_utilization();
375        let avg = timer.ewma.average().unwrap();
376        assert_approx_eq(avg, 0.9, "~0.9");
377    }
378
379    #[test]
380    #[serial]
381    fn test_normal_utilization_within_bounds() {
382        let mut timer = setup_timer();
383
384        // Timer created at T=100. Advance 1 second and start waiting
385        MockClock::advance(Duration::from_secs(1));
386        timer.start_wait(Instant::now());
387
388        // Advance 2 seconds while waiting (T=101 to T=103)
389        MockClock::advance(Duration::from_secs(2));
390        timer.stop_wait(Instant::now());
391
392        // Advance 2 more seconds (not waiting), then report (T=103 to T=105)
393        MockClock::advance(Duration::from_secs(2));
394        timer.update_utilization();
395
396        // total_wait = 2 seconds, total_duration = 5 seconds (T=100 to T=105)
397        // wait_ratio = 2/5 = 0.4, utilization = 1.0 - 0.4 = 0.6
398        let avg = timer.ewma.average().unwrap();
399        assert_approx_eq(avg, 0.6, "~0.6");
400    }
401
402    #[test]
403    #[serial]
404    fn test_always_waiting_utilization() {
405        let mut timer = setup_timer();
406
407        // Timer created at T=100. Start waiting immediately
408        timer.start_wait(Instant::now());
409
410        // Advance 5 seconds while waiting (T=100 to T=105)
411        MockClock::advance(Duration::from_secs(5));
412        timer.update_utilization();
413
414        // We waited the entire time: total_wait = 5s, total_duration = 5s
415        // wait_ratio = 1.0, utilization = 0.0
416        let avg = timer.ewma.average().unwrap();
417        assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
418    }
419
420    #[test]
421    #[serial]
422    fn test_never_waiting_utilization() {
423        let mut timer = setup_timer();
424
425        // Advance 5 seconds without waiting (T=100 to T=105)
426        MockClock::advance(Duration::from_secs(5));
427        timer.update_utilization();
428
429        // Never waited: total_wait = 0, total_duration = 5s
430        // wait_ratio = 0.0, utilization = 1.0
431        let avg = timer.ewma.average().unwrap();
432        assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
433    }
434}