vector/
utilization.rs

1use std::{
2    collections::HashMap,
3    pin::Pin,
4    task::{Context, Poll, ready},
5    time::{Duration, Instant},
6};
7
8use futures::{Stream, StreamExt};
9use metrics::Gauge;
10use pin_project::pin_project;
11use tokio::{
12    sync::mpsc::{Receiver, Sender, channel},
13    time::interval,
14};
15use tokio_stream::wrappers::IntervalStream;
16use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal};
17
18use crate::stats;
19
20#[pin_project]
21pub(crate) struct Utilization<S> {
22    intervals: IntervalStream,
23    timer_tx: UtilizationComponentSender,
24    component_key: ComponentKey,
25    inner: S,
26}
27
28impl<S> Utilization<S> {
29    /// Consumes this wrapper and returns the inner stream.
30    ///
31    /// This can't be constant because destructors can't be run in a const context, and we're
32    /// discarding `IntervalStream`/`Timer` when we call this.
33    #[allow(clippy::missing_const_for_fn)]
34    pub(crate) fn into_inner(self) -> S {
35        self.inner
36    }
37}
38
39impl<S> Stream for Utilization<S>
40where
41    S: Stream + Unpin,
42{
43    type Item = S::Item;
44
45    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        // The goal of this function is to measure the time between when the
47        // caller requests the next Event from the stream and before one is
48        // ready, with the side-effect of reporting every so often about how
49        // long the wait gap is.
50        //
51        // This will just measure the time, while UtilizationEmitter collects
52        // all the timers and emits utilization value periodically
53        let this = self.project();
54        this.timer_tx.try_send_start_wait();
55        let _ = this.intervals.poll_next_unpin(cx);
56        let result = ready!(this.inner.poll_next_unpin(cx));
57        this.timer_tx.try_send_stop_wait();
58        Poll::Ready(result)
59    }
60}
61
62pub(crate) struct Timer {
63    overall_start: Instant,
64    span_start: Instant,
65    waiting: bool,
66    total_wait: Duration,
67    ewma: stats::Ewma,
68    gauge: Gauge,
69}
70
71/// A simple, specialized timer for tracking spans of waiting vs not-waiting
72/// time and reporting a smoothed estimate of utilization.
73///
74/// This implementation uses the idea of spans and reporting periods. Spans are
75/// a period of time spent entirely in one state, aligning with state
76/// transitions but potentially more granular.  Reporting periods are expected
77/// to be of uniform length and used to aggregate span data into time-weighted
78/// averages.
79impl Timer {
80    pub(crate) fn new(gauge: Gauge) -> Self {
81        Self {
82            overall_start: Instant::now(),
83            span_start: Instant::now(),
84            waiting: false,
85            total_wait: Duration::new(0, 0),
86            ewma: stats::Ewma::new(0.9),
87            gauge,
88        }
89    }
90
91    /// Begin a new span representing time spent waiting
92    pub(crate) fn start_wait(&mut self, at: Instant) {
93        if !self.waiting {
94            self.end_span(at);
95            self.waiting = true;
96        }
97    }
98
99    /// Complete the current waiting span and begin a non-waiting span
100    pub(crate) fn stop_wait(&mut self, at: Instant) -> Instant {
101        if self.waiting {
102            let now = self.end_span(at);
103            self.waiting = false;
104            now
105        } else {
106            at
107        }
108    }
109
110    /// Meant to be called on a regular interval, this method calculates wait
111    /// ratio since the last time it was called and reports the resulting
112    /// utilization average.
113    pub(crate) fn report(&mut self) {
114        // End the current span so it can be accounted for, but do not change
115        // whether or not we're in the waiting state. This way the next span
116        // inherits the correct status.
117        let now = self.end_span(Instant::now());
118
119        let total_duration = now.duration_since(self.overall_start);
120        let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
121        let utilization = 1.0 - wait_ratio;
122
123        self.ewma.update(utilization);
124        let avg = self.ewma.average().unwrap_or(f64::NAN);
125        debug!(utilization = %avg);
126        self.gauge.set(avg);
127
128        // Reset overall statistics for the next reporting period.
129        self.overall_start = self.span_start;
130        self.total_wait = Duration::new(0, 0);
131    }
132
133    fn end_span(&mut self, at: Instant) -> Instant {
134        if self.waiting {
135            self.total_wait += at - self.span_start;
136        }
137        self.span_start = at;
138        self.span_start
139    }
140}
141
142#[derive(Debug)]
143enum UtilizationTimerMessage {
144    StartWait(ComponentKey, Instant),
145    StopWait(ComponentKey, Instant),
146}
147
148pub(crate) struct UtilizationComponentSender {
149    component_key: ComponentKey,
150    timer_tx: Sender<UtilizationTimerMessage>,
151}
152
153impl UtilizationComponentSender {
154    pub(crate) fn try_send_start_wait(&self) {
155        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
156            self.component_key.clone(),
157            Instant::now(),
158        )) {
159            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
160        }
161    }
162
163    pub(crate) fn try_send_stop_wait(&self) {
164        if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
165            self.component_key.clone(),
166            Instant::now(),
167        )) {
168            debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
169        }
170    }
171}
172
173pub(crate) struct UtilizationEmitter {
174    timers: HashMap<ComponentKey, Timer>,
175    timer_rx: Receiver<UtilizationTimerMessage>,
176    timer_tx: Sender<UtilizationTimerMessage>,
177    intervals: IntervalStream,
178}
179
180impl UtilizationEmitter {
181    pub(crate) fn new() -> Self {
182        let (timer_tx, timer_rx) = channel(4096);
183        Self {
184            timers: HashMap::default(),
185            intervals: IntervalStream::new(interval(Duration::from_secs(5))),
186            timer_tx,
187            timer_rx,
188        }
189    }
190
191    /// Adds a new component to this utilization metric emitter
192    ///
193    /// Returns a sender which can be used to send utilization information back to the emitter
194    pub(crate) fn add_component(
195        &mut self,
196        key: ComponentKey,
197        gauge: Gauge,
198    ) -> UtilizationComponentSender {
199        self.timers.insert(key.clone(), Timer::new(gauge));
200        UtilizationComponentSender {
201            timer_tx: self.timer_tx.clone(),
202            component_key: key,
203        }
204    }
205
206    pub(crate) async fn run_utilization(&mut self, mut shutdown: ShutdownSignal) {
207        loop {
208            tokio::select! {
209                message = self.timer_rx.recv() => {
210                    match message {
211                        Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
212                            self.timers.get_mut(&key).expect("Utilization timer missing for component").start_wait(start_time);
213                        }
214                        Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
215                            self.timers.get_mut(&key).expect("Utilization timer missing for component").stop_wait(stop_time);
216                        }
217                        None => break,
218                    }
219                },
220
221                Some(_) = self.intervals.next() => {
222                    for timer in self.timers.values_mut() {
223                        timer.report();
224                    }
225                },
226
227                _ = &mut shutdown => {
228                    break
229                }
230            }
231        }
232    }
233}
234
235/// Wrap a stream to emit stats about utilization. This is designed for use with
236/// the input channels of transform and sinks components, and measures the
237/// amount of time that the stream is waiting for input from upstream. We make
238/// the simplifying assumption that this wait time is when the component is idle
239/// and the rest of the time it is doing useful work. This is more true for
240/// sinks than transforms, which can be blocked by downstream components, but
241/// with knowledge of the config the data is still useful.
242pub(crate) fn wrap<S>(
243    timer_tx: UtilizationComponentSender,
244    component_key: ComponentKey,
245    inner: S,
246) -> Utilization<S> {
247    Utilization {
248        intervals: IntervalStream::new(interval(Duration::from_secs(5))),
249        timer_tx,
250        component_key,
251        inner,
252    }
253}