vector/
utilization.rs

1use std::{
2    collections::HashMap,
3    pin::Pin,
4    task::{ready, Context, Poll},
5    time::{Duration, Instant},
6};
7use tokio::sync::mpsc::{channel, Receiver, Sender};
8
9use futures::{Stream, StreamExt};
10use metrics::Gauge;
11use pin_project::pin_project;
12use tokio::time::interval;
13use tokio_stream::wrappers::IntervalStream;
14use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal};
15
16use crate::stats;
17
18#[pin_project]
19pub(crate) struct Utilization<S> {
20    intervals: IntervalStream,
21    timer_tx: UtilizationComponentSender,
22    component_key: ComponentKey,
23    inner: S,
24}
25
26impl<S> Utilization<S> {
27    /// Consumes this wrapper and returns the inner stream.
28    ///
29    /// This can't be constant because destructors can't be run in a const context, and we're
30    /// discarding `IntervalStream`/`Timer` when we call this.
31    #[allow(clippy::missing_const_for_fn)]
32    pub(crate) fn into_inner(self) -> S {
33        self.inner
34    }
35}
36
37impl<S> Stream for Utilization<S>
38where
39    S: Stream + Unpin,
40{
41    type Item = S::Item;
42
43    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44        // The goal of this function is to measure the time between when the
45        // caller requests the next Event from the stream and before one is
46        // ready, with the side-effect of reporting every so often about how
47        // long the wait gap is.
48        //
49        // This will just measure the time, while UtilizationEmitter collects
50        // all the timers and emits utilization value periodically
51        let this = self.project();
52        this.timer_tx.try_send_start_wait();
53        let _ = this.intervals.poll_next_unpin(cx);
54        let result = ready!(this.inner.poll_next_unpin(cx));
55        this.timer_tx.try_send_stop_wait();
56        Poll::Ready(result)
57    }
58}
59
60pub(crate) struct Timer {
61    overall_start: Instant,
62    span_start: Instant,
63    waiting: bool,
64    total_wait: Duration,
65    ewma: stats::Ewma,
66    gauge: Gauge,
67}
68
69/// A simple, specialized timer for tracking spans of waiting vs not-waiting
70/// time and reporting a smoothed estimate of utilization.
71///
72/// This implementation uses the idea of spans and reporting periods. Spans are
73/// a period of time spent entirely in one state, aligning with state
74/// transitions but potentially more granular.  Reporting periods are expected
75/// to be of uniform length and used to aggregate span data into time-weighted
76/// averages.
77impl Timer {
78    pub(crate) fn new(gauge: Gauge) -> Self {
79        Self {
80            overall_start: Instant::now(),
81            span_start: Instant::now(),
82            waiting: false,
83            total_wait: Duration::new(0, 0),
84            ewma: stats::Ewma::new(0.9),
85            gauge,
86        }
87    }
88
89    /// Begin a new span representing time spent waiting
90    pub(crate) fn start_wait(&mut self, at: Instant) {
91        if !self.waiting {
92            self.end_span(at);
93            self.waiting = true;
94        }
95    }
96
97    /// Complete the current waiting span and begin a non-waiting span
98    pub(crate) fn stop_wait(&mut self, at: Instant) -> Instant {
99        if self.waiting {
100            let now = self.end_span(at);
101            self.waiting = false;
102            now
103        } else {
104            at
105        }
106    }
107
108    /// Meant to be called on a regular interval, this method calculates wait
109    /// ratio since the last time it was called and reports the resulting
110    /// utilization average.
111    pub(crate) fn report(&mut self) {
112        // End the current span so it can be accounted for, but do not change
113        // whether or not we're in the waiting state. This way the next span
114        // inherits the correct status.
115        let now = self.end_span(Instant::now());
116
117        let total_duration = now.duration_since(self.overall_start);
118        let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
119        let utilization = 1.0 - wait_ratio;
120
121        self.ewma.update(utilization);
122        let avg = self.ewma.average().unwrap_or(f64::NAN);
123        debug!(utilization = %avg);
124        self.gauge.set(avg);
125
126        // Reset overall statistics for the next reporting period.
127        self.overall_start = self.span_start;
128        self.total_wait = Duration::new(0, 0);
129    }
130
131    fn end_span(&mut self, at: Instant) -> Instant {
132        if self.waiting {
133            self.total_wait += at - self.span_start;
134        }
135        self.span_start = at;
136        self.span_start
137    }
138}
139
140#[derive(Debug)]
141enum UtilizationTimerMessage {
142    StartWait(ComponentKey, Instant),
143    StopWait(ComponentKey, Instant),
144}
145
146pub(crate) struct UtilizationComponentSender {
147    component_key: ComponentKey,
148    timer_tx: Sender<UtilizationTimerMessage>,
149}
150
151impl UtilizationComponentSender {
152    pub(crate) fn try_send_start_wait(&self) {
153        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
154            self.component_key.clone(),
155            Instant::now(),
156        )) {
157            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
158        }
159    }
160
161    pub(crate) fn try_send_stop_wait(&self) {
162        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
163            self.component_key.clone(),
164            Instant::now(),
165        )) {
166            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
167        }
168    }
169}
170
171pub(crate) struct UtilizationEmitter {
172    timers: HashMap<ComponentKey, Timer>,
173    timer_rx: Receiver<UtilizationTimerMessage>,
174    timer_tx: Sender<UtilizationTimerMessage>,
175    intervals: IntervalStream,
176}
177
178impl UtilizationEmitter {
179    pub(crate) fn new() -> Self {
180        let (timer_tx, timer_rx) = channel(4096);
181        Self {
182            timers: HashMap::default(),
183            intervals: IntervalStream::new(interval(Duration::from_secs(5))),
184            timer_tx,
185            timer_rx,
186        }
187    }
188
189    /// Adds a new component to this utilization metric emitter
190    ///
191    /// Returns a sender which can be used to send utilization information back to the emitter
192    pub(crate) fn add_component(
193        &mut self,
194        key: ComponentKey,
195        gauge: Gauge,
196    ) -> UtilizationComponentSender {
197        self.timers.insert(key.clone(), Timer::new(gauge));
198        UtilizationComponentSender {
199            timer_tx: self.timer_tx.clone(),
200            component_key: key,
201        }
202    }
203
204    pub(crate) async fn run_utilization(&mut self, mut shutdown: ShutdownSignal) {
205        loop {
206            tokio::select! {
207                message = self.timer_rx.recv() => {
208                    match message {
209                        Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
210                            self.timers.get_mut(&key).expect("Utilization timer missing for component").start_wait(start_time);
211                        }
212                        Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
213                            self.timers.get_mut(&key).expect("Utilization timer missing for component").stop_wait(stop_time);
214                        }
215                        None => break,
216                    }
217                },
218
219                Some(_) = self.intervals.next() => {
220                    for timer in self.timers.values_mut() {
221                        timer.report();
222                    }
223                },
224
225                _ = &mut shutdown => {
226                    break
227                }
228            }
229        }
230    }
231}
232
233/// Wrap a stream to emit stats about utilization. This is designed for use with
234/// the input channels of transform and sinks components, and measures the
235/// amount of time that the stream is waiting for input from upstream. We make
236/// the simplifying assumption that this wait time is when the component is idle
237/// and the rest of the time it is doing useful work. This is more true for
238/// sinks than transforms, which can be blocked by downstream components, but
239/// with knowledge of the config the data is still useful.
240pub(crate) fn wrap<S>(
241    timer_tx: UtilizationComponentSender,
242    component_key: ComponentKey,
243    inner: S,
244) -> Utilization<S> {
245    Utilization {
246        intervals: IntervalStream::new(interval(Duration::from_secs(5))),
247        timer_tx,
248        component_key,
249        inner,
250    }
251}