vector/
utilization.rs

1use std::{
2    collections::HashMap,
3    pin::Pin,
4    sync::{Arc, Mutex},
5    task::{Context, Poll, ready},
6    time::Duration,
7};
8
9#[cfg(not(test))]
10use std::time::Instant;
11
12#[cfg(test)]
13use mock_instant::global::Instant;
14
15use futures::{Stream, StreamExt};
16use metrics::Gauge;
17use pin_project::pin_project;
18use tokio::{
19    sync::mpsc::{Receiver, Sender, channel},
20    time::interval,
21};
22use tokio_stream::wrappers::IntervalStream;
23use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal, stats};
24
25const UTILIZATION_EMITTER_DURATION: Duration = Duration::from_secs(5);
26
27/// Stream wrappers used to approximate component utilization from poll timing.
28///
29/// Current model:
30///
31///
32#[pin_project]
33pub(crate) struct Utilization<S> {
34    intervals: IntervalStream,
35    timer_tx: UtilizationComponentSender,
36    component_key: ComponentKey,
37    inner: S,
38}
39
40impl<S> Utilization<S> {
41    /// Output-side utilization wrapper for task transforms.
42    ///
43    /// Measures the time after the wrapped stream yields an item until downstream
44    /// polls it again.
45    pub(crate) fn new(
46        timer_tx: UtilizationComponentSender,
47        component_key: ComponentKey,
48        inner: S,
49    ) -> Self {
50        Self {
51            intervals: IntervalStream::new(interval(Duration::from_secs(5))),
52            timer_tx,
53            component_key,
54            inner,
55        }
56    }
57
58    /// Consumes this wrapper and returns the inner stream.
59    ///
60    /// This can't be constant because destructors can't be run in a const context, and we're
61    /// discarding `IntervalStream`/`Timer` when we call this.
62    #[allow(clippy::missing_const_for_fn)]
63    pub(crate) fn into_inner(self) -> S {
64        self.inner
65    }
66}
67
68impl<S> Stream for Utilization<S>
69where
70    S: Stream + Unpin,
71{
72    type Item = S::Item;
73
74    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75        // The goal of this function is to measure the time between when the
76        // caller requests the next Event from the stream and before one is
77        // ready, with the side-effect of reporting every so often about how
78        // long the wait gap is.
79        //
80        // This will just measure the time, while UtilizationEmitter collects
81        // all the timers and emits utilization value periodically
82        let this = self.project();
83        this.timer_tx.try_send_start_wait();
84        let _ = this.intervals.poll_next_unpin(cx);
85        let result = ready!(this.inner.poll_next_unpin(cx));
86        this.timer_tx.try_send_stop_wait();
87        Poll::Ready(result)
88    }
89}
90
91pub(crate) struct Timer {
92    overall_start: Instant,
93    span_start: Instant,
94    waiting: bool,
95    total_wait: Duration,
96    ewma: stats::Ewma,
97    gauge: Gauge,
98    #[cfg(debug_assertions)]
99    component_id: Arc<str>,
100}
101
102/// A simple, specialized timer for tracking spans of waiting vs not-waiting
103/// time and reporting a smoothed estimate of utilization.
104///
105/// This implementation uses the idea of spans and reporting periods. Spans are
106/// a period of time spent entirely in one state, aligning with state
107/// transitions but potentially more granular.  Reporting periods are expected
108/// to be of uniform length and used to aggregate span data into time-weighted
109/// averages.
110impl Timer {
111    pub(crate) fn new(gauge: Gauge, #[cfg(debug_assertions)] component_id: Arc<str>) -> Self {
112        Self {
113            overall_start: Instant::now(),
114            span_start: Instant::now(),
115            waiting: false,
116            total_wait: Duration::new(0, 0),
117            ewma: stats::Ewma::new(0.9),
118            gauge,
119            #[cfg(debug_assertions)]
120            component_id,
121        }
122    }
123
124    /// Begin a new span representing time spent waiting
125    pub(crate) fn start_wait(&mut self, at: Instant) {
126        if !self.waiting {
127            // Clamp start time in case of a late message
128            self.end_span(at.max(self.overall_start));
129            self.waiting = true;
130        }
131    }
132
133    /// Complete the current waiting span and begin a non-waiting span
134    pub(crate) fn stop_wait(&mut self, at: Instant) {
135        if self.waiting {
136            // Clamp stop time in case of a late message
137            self.end_span(at.max(self.overall_start));
138            self.waiting = false;
139        }
140    }
141
142    /// Meant to be called on a regular interval, this method calculates wait
143    /// ratio since the last time it was called and reports the resulting
144    /// utilization average.
145    pub(crate) fn update_utilization(&mut self) {
146        // End the current span so it can be accounted for, but do not change
147        // whether or not we're in the waiting state. This way the next span
148        // inherits the correct status.
149        let now = Instant::now();
150        self.end_span(now);
151
152        let total_duration = now.duration_since(self.overall_start);
153        let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
154        let utilization = 1.0 - wait_ratio;
155
156        self.ewma.update(utilization);
157        let avg = self.ewma.average().unwrap_or(f64::NAN);
158        let avg_rounded = (avg * 10000.0).round() / 10000.0; // 4 digit precision
159        self.gauge.set(avg_rounded);
160
161        // Reset overall statistics for the next reporting period.
162        self.overall_start = now;
163        self.total_wait = Duration::new(0, 0);
164
165        #[cfg(debug_assertions)]
166        debug!(component_id = %self.component_id, utilization = %avg_rounded, internal_log_rate_limit = false);
167    }
168
169    fn end_span(&mut self, at: Instant) {
170        if self.waiting {
171            // `at` can be before span start here, the result will be clamped to 0
172            // because `duration_since` returns zero if `at` is before span start
173            self.total_wait += at.duration_since(self.span_start);
174        }
175        self.span_start = at;
176    }
177}
178
179#[derive(Debug)]
180enum UtilizationTimerMessage {
181    StartWait(ComponentKey, Instant),
182    StopWait(ComponentKey, Instant),
183}
184
185#[derive(Clone)]
186pub(crate) struct UtilizationComponentSender {
187    component_key: ComponentKey,
188    timer_tx: Sender<UtilizationTimerMessage>,
189}
190
191impl UtilizationComponentSender {
192    pub(crate) fn try_send_start_wait(&self) {
193        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
194            self.component_key.clone(),
195            Instant::now(),
196        )) {
197            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
198        }
199    }
200
201    pub(crate) fn try_send_stop_wait(&self) {
202        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
203            self.component_key.clone(),
204            Instant::now(),
205        )) {
206            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
207        }
208    }
209}
210
211/// Registry for components sending utilization data.
212///
213/// Cloning this is cheap and does not clone the underlying data.
214#[derive(Clone)]
215pub struct UtilizationRegistry {
216    timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
217    timer_tx: Sender<UtilizationTimerMessage>,
218}
219
220impl UtilizationRegistry {
221    /// Adds a new component to this utilization metric emitter
222    ///
223    /// Returns a sender which can be used to send utilization information back to the emitter
224    pub(crate) fn add_component(
225        &self,
226        key: ComponentKey,
227        gauge: Gauge,
228    ) -> UtilizationComponentSender {
229        self.timers.lock().expect("mutex poisoned").insert(
230            key.clone(),
231            Timer::new(
232                gauge,
233                #[cfg(debug_assertions)]
234                key.id().into(),
235            ),
236        );
237        UtilizationComponentSender {
238            timer_tx: self.timer_tx.clone(),
239            component_key: key,
240        }
241    }
242
243    /// Removes a component from this utilization metric emitter
244    pub(crate) fn remove_component(&self, key: &ComponentKey) {
245        self.timers.lock().expect("mutex poisoned").remove(key);
246    }
247}
248
249pub(crate) struct UtilizationEmitter {
250    timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
251    timer_rx: Receiver<UtilizationTimerMessage>,
252}
253
254impl UtilizationEmitter {
255    pub(crate) fn new() -> (Self, UtilizationRegistry) {
256        let (timer_tx, timer_rx) = channel(4096);
257        let timers = Arc::new(Mutex::new(HashMap::default()));
258        (
259            Self {
260                timers: Arc::clone(&timers),
261                timer_rx,
262            },
263            UtilizationRegistry { timers, timer_tx },
264        )
265    }
266
267    pub(crate) async fn run_utilization(mut self, mut shutdown: ShutdownSignal) {
268        let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
269        loop {
270            tokio::select! {
271                message = self.timer_rx.recv() => {
272                    match message {
273                        Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
274                            // Timer could be removed in the registry while message is still in the queue
275                            if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
276                                timer.start_wait(start_time);
277                            }
278                        }
279                        Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
280                            // Timer could be removed in the registry while message is still in the queue
281                            if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
282                                timer.stop_wait(stop_time);
283                            }
284                        }
285                        None => break,
286                    }
287                },
288
289                Some(_) = intervals.next() => {
290                    for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
291                        timer.update_utilization();
292                    }
293                },
294
295                _ = &mut shutdown => {
296                    break
297                }
298            }
299        }
300    }
301}
302
303/// Output-side counterpart of [`Utilization`]. Wraps the output stream of a
304/// task transform to track time spent waiting for downstream to accept items.
305///
306/// While [`Utilization`] measures idle time as "waiting for upstream input",
307/// this wrapper measures the complementary case: after yielding an item, the
308/// time until the consumer polls again is counted as downstream wait (idle).
309/// Both wrappers share a single [`Timer`] per component, and the idempotent
310/// `start_wait`/`stop_wait` transitions ensure no double-counting.
311pub(crate) struct OutputUtilization<S> {
312    timer_tx: UtilizationComponentSender,
313    inner: S,
314}
315
316impl<S> Stream for OutputUtilization<S>
317where
318    S: Stream + Unpin,
319{
320    type Item = S::Item;
321
322    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
323        let this = self.get_mut();
324        this.timer_tx.try_send_stop_wait();
325        let result = ready!(this.inner.poll_next_unpin(cx));
326        if result.is_some() {
327            this.timer_tx.try_send_start_wait();
328        }
329        Poll::Ready(result)
330    }
331}
332
333impl<S> OutputUtilization<S> {
334    /// Wrap a task transform stream to track time spent waiting for downstream
335    /// to consume items. This is the output-side counterpart to
336    /// [`Utilization::new`], designed for use with task transforms where the
337    /// framework cannot otherwise detect downstream backpressure.
338    pub(crate) const fn new(timer_tx: UtilizationComponentSender, inner: S) -> Self {
339        Self { timer_tx, inner }
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use mock_instant::global::MockClock;
346    use serial_test::serial;
347    use strum::IntoEnumIterator;
348    use vector_lib::gauge;
349    use vector_lib::internal_event::GaugeName;
350
351    use super::*;
352
353    /// Helper function to reset mock clock and create a timer at T=100
354    fn setup_timer() -> Timer {
355        // Set mock clock to T=100
356        MockClock::set_time(Duration::from_secs(100));
357
358        Timer::new(
359            gauge!(GaugeName::iter().next().unwrap()),
360            #[cfg(debug_assertions)]
361            "test_component".into(),
362        )
363    }
364
365    const TOLERANCE: f64 = 0.01;
366
367    /// Helper function to assert utilization is approximately equal to expected value
368    /// and within valid bounds [0, 1]
369    fn assert_approx_eq(actual: f64, expected: f64, description: &str) {
370        assert!(
371            (0.0..=1.0).contains(&actual),
372            "Utilization {actual} is outside [0, 1]"
373        );
374        assert!(
375            (actual - expected).abs() < TOLERANCE,
376            "Expected utilization {description}, got {actual}"
377        );
378    }
379
380    #[test]
381    #[serial]
382    fn test_utilization_in_bounds_on_late_start() {
383        let mut timer = setup_timer();
384
385        MockClock::advance(Duration::from_secs(5));
386
387        timer.update_utilization();
388
389        let avg = timer.ewma.average().unwrap();
390        assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
391
392        // Late message for start wait
393        timer.start_wait(Instant::now() - Duration::from_secs(1));
394        MockClock::advance(Duration::from_secs(5));
395
396        timer.update_utilization();
397        let avg = timer.ewma.average().unwrap();
398        assert_approx_eq(avg, 0.1, "~0.1");
399    }
400
401    #[test]
402    #[serial]
403    fn test_utilization_in_bounds_on_late_stop() {
404        let mut timer = setup_timer();
405
406        MockClock::advance(Duration::from_secs(5));
407
408        timer.waiting = true;
409        timer.update_utilization();
410
411        let avg = timer.ewma.average().unwrap();
412        assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
413
414        // Late message for stop wait
415        timer.stop_wait(Instant::now() - Duration::from_secs(4));
416        MockClock::advance(Duration::from_secs(5));
417
418        timer.update_utilization();
419        let avg = timer.ewma.average().unwrap();
420        assert_approx_eq(avg, 0.9, "~0.9");
421    }
422
423    #[test]
424    #[serial]
425    fn test_normal_utilization_within_bounds() {
426        let mut timer = setup_timer();
427
428        // Timer created at T=100. Advance 1 second and start waiting
429        MockClock::advance(Duration::from_secs(1));
430        timer.start_wait(Instant::now());
431
432        // Advance 2 seconds while waiting (T=101 to T=103)
433        MockClock::advance(Duration::from_secs(2));
434        timer.stop_wait(Instant::now());
435
436        // Advance 2 more seconds (not waiting), then report (T=103 to T=105)
437        MockClock::advance(Duration::from_secs(2));
438        timer.update_utilization();
439
440        // total_wait = 2 seconds, total_duration = 5 seconds (T=100 to T=105)
441        // wait_ratio = 2/5 = 0.4, utilization = 1.0 - 0.4 = 0.6
442        let avg = timer.ewma.average().unwrap();
443        assert_approx_eq(avg, 0.6, "~0.6");
444    }
445
446    #[test]
447    #[serial]
448    fn test_always_waiting_utilization() {
449        let mut timer = setup_timer();
450
451        // Timer created at T=100. Start waiting immediately
452        timer.start_wait(Instant::now());
453
454        // Advance 5 seconds while waiting (T=100 to T=105)
455        MockClock::advance(Duration::from_secs(5));
456        timer.update_utilization();
457
458        // We waited the entire time: total_wait = 5s, total_duration = 5s
459        // wait_ratio = 1.0, utilization = 0.0
460        let avg = timer.ewma.average().unwrap();
461        assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
462    }
463
464    #[test]
465    #[serial]
466    fn test_never_waiting_utilization() {
467        let mut timer = setup_timer();
468
469        // Advance 5 seconds without waiting (T=100 to T=105)
470        MockClock::advance(Duration::from_secs(5));
471        timer.update_utilization();
472
473        // Never waited: total_wait = 0, total_duration = 5s
474        // wait_ratio = 0.0, utilization = 1.0
475        let avg = timer.ewma.average().unwrap();
476        assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
477    }
478
479    /// Mock task transform that passes events through unchanged, simulating
480    /// a configurable processing delay by advancing MockClock per item.
481    struct MockTaskTransform {
482        processing_time: Duration,
483    }
484
485    use crate::event::EventArray;
486    use crate::transforms::TaskTransform;
487
488    impl TaskTransform<EventArray> for MockTaskTransform {
489        fn transform(
490            self: Box<Self>,
491            task: Pin<Box<dyn Stream<Item = EventArray> + Send>>,
492        ) -> Pin<Box<dyn Stream<Item = EventArray> + Send>> {
493            let processing_time = self.processing_time;
494            task.map(move |events| {
495                MockClock::advance(processing_time);
496                events
497            })
498            .boxed()
499        }
500    }
501
502    /// End-to-end test exercising the Utilization (input) and
503    /// OutputUtilization (output) stream wrappers with a mock TaskTransform,
504    /// wired up the same way `build_task_transform` does in the builder.
505    ///
506    /// Pipeline: channel → Utilization(input) → TaskTransform → OutputUtilization(output)
507    ///
508    /// Timeline (10s):
509    ///   T=100..103  waiting for input       (3s wait)
510    ///   T=103..105  transform processing    (2s work)
511    ///   T=105..108  blocked on downstream   (3s wait, even though input has data)
512    ///   T=108..110  transform processing    (2s work)
513    ///
514    /// Expected utilization = 4/10 = 0.4
515    #[tokio::test]
516    #[serial]
517    async fn test_task_transform_utilization_end_to_end() {
518        use crate::event::{EventArray, LogEvent};
519        use crate::transforms::TaskTransform;
520        use futures::SinkExt;
521        use futures::channel::mpsc as futures_mpsc;
522
523        MockClock::set_time(Duration::from_secs(100));
524
525        let (mut emitter, registry) = UtilizationEmitter::new();
526        let key = ComponentKey::from("test_transform");
527
528        let sender = registry.add_component(key.clone(), gauge!(GaugeName::Utilization));
529        let output_sender = sender.clone();
530
531        // Upstream channel carrying EventArrays.
532        let (mut input_tx, input_rx) = futures_mpsc::channel::<EventArray>(10);
533
534        // Wire up the pipeline exactly like build_task_transform:
535        //   Utilization(input) → transform.transform() → OutputUtilization(output)
536        let input_wrapped = Utilization::new(sender, key.clone(), input_rx);
537        let transform = Box::new(MockTaskTransform {
538            processing_time: Duration::from_secs(2),
539        });
540        let transform_output = transform.transform(Box::pin(input_wrapped));
541        let mut pipeline = OutputUtilization::new(output_sender, transform_output);
542
543        let waker = futures::task::noop_waker();
544        let mut cx = std::task::Context::from_waker(&waker);
545
546        // -- Phase 1: poll transform, no input available --
547        // T=100: start_wait sent by input wrapper.
548        assert!(Pin::new(&mut pipeline).poll_next(&mut cx).is_pending());
549        MockClock::advance(Duration::from_secs(3));
550        // T=103: 3s of input wait. Period: 0s work / 3s → util = 0.0.
551        check_timer_utilization(&mut emitter, &key, 0.0, "0.0 (phase 1: all input wait)");
552
553        // -- Phase 2: send input, poll pipeline → transform processes --
554        input_tx
555            .send(EventArray::from(LogEvent::default()))
556            .await
557            .unwrap();
558        assert!(Pin::new(&mut pipeline).poll_next(&mut cx).is_ready());
559        // T=105: MockClock advanced 2s inside transform. start_wait sent by
560        // output wrapper. Period: 2s work / 0s wait → util = 1.0.
561        // EWMA: 0.9×1.0 + 0.1×0.0 = 0.9
562        check_timer_utilization(&mut emitter, &key, 0.9, "0.9 (phase 2: all work)");
563
564        // -- Phase 3: send another event, simulate downstream blocking --
565        // Data is available in the input channel, but the transform is not
566        // polled so no progress is made: the time counts as downstream wait.
567        input_tx
568            .send(EventArray::from(LogEvent::default()))
569            .await
570            .unwrap();
571        MockClock::advance(Duration::from_secs(3));
572        // T=108: 3s downstream wait. Period: 0s work / 3s → util = 0.0.
573        // EWMA: 0.9×0.0 + 0.1×0.9 = 0.09
574        check_timer_utilization(
575            &mut emitter,
576            &key,
577            0.09,
578            "0.09 (phase 3: downstream wait despite buffered input)",
579        );
580
581        // -- Phase 4: poll pipeline, processes the queued event --
582        assert!(Pin::new(&mut pipeline).poll_next(&mut cx).is_ready());
583        // T=110: MockClock advanced 2s inside transform. Period: 2s work / 0s wait → util = 1.0.
584        // EWMA: 0.9×1.0 + 0.1×0.09 ≈ 0.909
585        check_timer_utilization(
586            &mut emitter,
587            &key,
588            0.909,
589            "0.909 (phase 4: queued event processed)",
590        );
591    }
592
593    /// Drain pending timer messages and assert the current EWMA utilization
594    /// for a single reporting period.
595    fn check_timer_utilization(
596        emitter: &mut UtilizationEmitter,
597        key: &ComponentKey,
598        expected: f64,
599        description: &str,
600    ) {
601        drain_emitter_messages(emitter);
602        let mut timers = emitter.timers.lock().expect("mutex poisoned");
603        let timer = timers.get_mut(key).expect("timer should exist");
604        timer.update_utilization();
605        let avg = timer.ewma.average().unwrap();
606        assert_approx_eq(avg, expected, description);
607    }
608
609    /// Drain all pending messages from the emitter's channel into the timers,
610    /// simulating what `run_utilization` does in a loop.
611    fn drain_emitter_messages(emitter: &mut UtilizationEmitter) {
612        while let Ok(message) = emitter.timer_rx.try_recv() {
613            let mut timers = emitter.timers.lock().expect("mutex poisoned");
614            match message {
615                UtilizationTimerMessage::StartWait(key, at) => {
616                    if let Some(timer) = timers.get_mut(&key) {
617                        timer.start_wait(at);
618                    }
619                }
620                UtilizationTimerMessage::StopWait(key, at) => {
621                    if let Some(timer) = timers.get_mut(&key) {
622                        timer.stop_wait(at);
623                    }
624                }
625            }
626        }
627    }
628}