1use std::{
2 collections::HashMap,
3 pin::Pin,
4 task::{ready, Context, Poll},
5 time::{Duration, Instant},
6};
7use tokio::sync::mpsc::{channel, Receiver, Sender};
8
9use futures::{Stream, StreamExt};
10use metrics::Gauge;
11use pin_project::pin_project;
12use tokio::time::interval;
13use tokio_stream::wrappers::IntervalStream;
14use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal};
15
16use crate::stats;
17
18#[pin_project]
19pub(crate) struct Utilization<S> {
20 intervals: IntervalStream,
21 timer_tx: UtilizationComponentSender,
22 component_key: ComponentKey,
23 inner: S,
24}
25
26impl<S> Utilization<S> {
27 #[allow(clippy::missing_const_for_fn)]
32 pub(crate) fn into_inner(self) -> S {
33 self.inner
34 }
35}
36
37impl<S> Stream for Utilization<S>
38where
39 S: Stream + Unpin,
40{
41 type Item = S::Item;
42
43 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44 let this = self.project();
52 this.timer_tx.try_send_start_wait();
53 let _ = this.intervals.poll_next_unpin(cx);
54 let result = ready!(this.inner.poll_next_unpin(cx));
55 this.timer_tx.try_send_stop_wait();
56 Poll::Ready(result)
57 }
58}
59
60pub(crate) struct Timer {
61 overall_start: Instant,
62 span_start: Instant,
63 waiting: bool,
64 total_wait: Duration,
65 ewma: stats::Ewma,
66 gauge: Gauge,
67}
68
69impl Timer {
78 pub(crate) fn new(gauge: Gauge) -> Self {
79 Self {
80 overall_start: Instant::now(),
81 span_start: Instant::now(),
82 waiting: false,
83 total_wait: Duration::new(0, 0),
84 ewma: stats::Ewma::new(0.9),
85 gauge,
86 }
87 }
88
89 pub(crate) fn start_wait(&mut self, at: Instant) {
91 if !self.waiting {
92 self.end_span(at);
93 self.waiting = true;
94 }
95 }
96
97 pub(crate) fn stop_wait(&mut self, at: Instant) -> Instant {
99 if self.waiting {
100 let now = self.end_span(at);
101 self.waiting = false;
102 now
103 } else {
104 at
105 }
106 }
107
108 pub(crate) fn report(&mut self) {
112 let now = self.end_span(Instant::now());
116
117 let total_duration = now.duration_since(self.overall_start);
118 let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
119 let utilization = 1.0 - wait_ratio;
120
121 self.ewma.update(utilization);
122 let avg = self.ewma.average().unwrap_or(f64::NAN);
123 debug!(utilization = %avg);
124 self.gauge.set(avg);
125
126 self.overall_start = self.span_start;
128 self.total_wait = Duration::new(0, 0);
129 }
130
131 fn end_span(&mut self, at: Instant) -> Instant {
132 if self.waiting {
133 self.total_wait += at - self.span_start;
134 }
135 self.span_start = at;
136 self.span_start
137 }
138}
139
140#[derive(Debug)]
141enum UtilizationTimerMessage {
142 StartWait(ComponentKey, Instant),
143 StopWait(ComponentKey, Instant),
144}
145
146pub(crate) struct UtilizationComponentSender {
147 component_key: ComponentKey,
148 timer_tx: Sender<UtilizationTimerMessage>,
149}
150
151impl UtilizationComponentSender {
152 pub(crate) fn try_send_start_wait(&self) {
153 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
154 self.component_key.clone(),
155 Instant::now(),
156 )) {
157 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
158 }
159 }
160
161 pub(crate) fn try_send_stop_wait(&self) {
162 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
163 self.component_key.clone(),
164 Instant::now(),
165 )) {
166 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
167 }
168 }
169}
170
171pub(crate) struct UtilizationEmitter {
172 timers: HashMap<ComponentKey, Timer>,
173 timer_rx: Receiver<UtilizationTimerMessage>,
174 timer_tx: Sender<UtilizationTimerMessage>,
175 intervals: IntervalStream,
176}
177
178impl UtilizationEmitter {
179 pub(crate) fn new() -> Self {
180 let (timer_tx, timer_rx) = channel(4096);
181 Self {
182 timers: HashMap::default(),
183 intervals: IntervalStream::new(interval(Duration::from_secs(5))),
184 timer_tx,
185 timer_rx,
186 }
187 }
188
189 pub(crate) fn add_component(
193 &mut self,
194 key: ComponentKey,
195 gauge: Gauge,
196 ) -> UtilizationComponentSender {
197 self.timers.insert(key.clone(), Timer::new(gauge));
198 UtilizationComponentSender {
199 timer_tx: self.timer_tx.clone(),
200 component_key: key,
201 }
202 }
203
204 pub(crate) async fn run_utilization(&mut self, mut shutdown: ShutdownSignal) {
205 loop {
206 tokio::select! {
207 message = self.timer_rx.recv() => {
208 match message {
209 Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
210 self.timers.get_mut(&key).expect("Utilization timer missing for component").start_wait(start_time);
211 }
212 Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
213 self.timers.get_mut(&key).expect("Utilization timer missing for component").stop_wait(stop_time);
214 }
215 None => break,
216 }
217 },
218
219 Some(_) = self.intervals.next() => {
220 for timer in self.timers.values_mut() {
221 timer.report();
222 }
223 },
224
225 _ = &mut shutdown => {
226 break
227 }
228 }
229 }
230 }
231}
232
233pub(crate) fn wrap<S>(
241 timer_tx: UtilizationComponentSender,
242 component_key: ComponentKey,
243 inner: S,
244) -> Utilization<S> {
245 Utilization {
246 intervals: IntervalStream::new(interval(Duration::from_secs(5))),
247 timer_tx,
248 component_key,
249 inner,
250 }
251}