1use std::{
2 collections::HashMap,
3 pin::Pin,
4 task::{Context, Poll, ready},
5 time::{Duration, Instant},
6};
7
8use futures::{Stream, StreamExt};
9use metrics::Gauge;
10use pin_project::pin_project;
11use tokio::{
12 sync::mpsc::{Receiver, Sender, channel},
13 time::interval,
14};
15use tokio_stream::wrappers::IntervalStream;
16use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal};
17
18use crate::stats;
19
20#[pin_project]
21pub(crate) struct Utilization<S> {
22 intervals: IntervalStream,
23 timer_tx: UtilizationComponentSender,
24 component_key: ComponentKey,
25 inner: S,
26}
27
28impl<S> Utilization<S> {
29 #[allow(clippy::missing_const_for_fn)]
34 pub(crate) fn into_inner(self) -> S {
35 self.inner
36 }
37}
38
39impl<S> Stream for Utilization<S>
40where
41 S: Stream + Unpin,
42{
43 type Item = S::Item;
44
45 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46 let this = self.project();
54 this.timer_tx.try_send_start_wait();
55 let _ = this.intervals.poll_next_unpin(cx);
56 let result = ready!(this.inner.poll_next_unpin(cx));
57 this.timer_tx.try_send_stop_wait();
58 Poll::Ready(result)
59 }
60}
61
62pub(crate) struct Timer {
63 overall_start: Instant,
64 span_start: Instant,
65 waiting: bool,
66 total_wait: Duration,
67 ewma: stats::Ewma,
68 gauge: Gauge,
69}
70
71impl Timer {
80 pub(crate) fn new(gauge: Gauge) -> Self {
81 Self {
82 overall_start: Instant::now(),
83 span_start: Instant::now(),
84 waiting: false,
85 total_wait: Duration::new(0, 0),
86 ewma: stats::Ewma::new(0.9),
87 gauge,
88 }
89 }
90
91 pub(crate) fn start_wait(&mut self, at: Instant) {
93 if !self.waiting {
94 self.end_span(at);
95 self.waiting = true;
96 }
97 }
98
99 pub(crate) fn stop_wait(&mut self, at: Instant) -> Instant {
101 if self.waiting {
102 let now = self.end_span(at);
103 self.waiting = false;
104 now
105 } else {
106 at
107 }
108 }
109
110 pub(crate) fn report(&mut self) {
114 let now = self.end_span(Instant::now());
118
119 let total_duration = now.duration_since(self.overall_start);
120 let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
121 let utilization = 1.0 - wait_ratio;
122
123 self.ewma.update(utilization);
124 let avg = self.ewma.average().unwrap_or(f64::NAN);
125 debug!(utilization = %avg);
126 self.gauge.set(avg);
127
128 self.overall_start = self.span_start;
130 self.total_wait = Duration::new(0, 0);
131 }
132
133 fn end_span(&mut self, at: Instant) -> Instant {
134 if self.waiting {
135 self.total_wait += at - self.span_start;
136 }
137 self.span_start = at;
138 self.span_start
139 }
140}
141
142#[derive(Debug)]
143enum UtilizationTimerMessage {
144 StartWait(ComponentKey, Instant),
145 StopWait(ComponentKey, Instant),
146}
147
148pub(crate) struct UtilizationComponentSender {
149 component_key: ComponentKey,
150 timer_tx: Sender<UtilizationTimerMessage>,
151}
152
153impl UtilizationComponentSender {
154 pub(crate) fn try_send_start_wait(&self) {
155 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
156 self.component_key.clone(),
157 Instant::now(),
158 )) {
159 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
160 }
161 }
162
163 pub(crate) fn try_send_stop_wait(&self) {
164 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
165 self.component_key.clone(),
166 Instant::now(),
167 )) {
168 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
169 }
170 }
171}
172
173pub(crate) struct UtilizationEmitter {
174 timers: HashMap<ComponentKey, Timer>,
175 timer_rx: Receiver<UtilizationTimerMessage>,
176 timer_tx: Sender<UtilizationTimerMessage>,
177 intervals: IntervalStream,
178}
179
180impl UtilizationEmitter {
181 pub(crate) fn new() -> Self {
182 let (timer_tx, timer_rx) = channel(4096);
183 Self {
184 timers: HashMap::default(),
185 intervals: IntervalStream::new(interval(Duration::from_secs(5))),
186 timer_tx,
187 timer_rx,
188 }
189 }
190
191 pub(crate) fn add_component(
195 &mut self,
196 key: ComponentKey,
197 gauge: Gauge,
198 ) -> UtilizationComponentSender {
199 self.timers.insert(key.clone(), Timer::new(gauge));
200 UtilizationComponentSender {
201 timer_tx: self.timer_tx.clone(),
202 component_key: key,
203 }
204 }
205
206 pub(crate) async fn run_utilization(&mut self, mut shutdown: ShutdownSignal) {
207 loop {
208 tokio::select! {
209 message = self.timer_rx.recv() => {
210 match message {
211 Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
212 self.timers.get_mut(&key).expect("Utilization timer missing for component").start_wait(start_time);
213 }
214 Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
215 self.timers.get_mut(&key).expect("Utilization timer missing for component").stop_wait(stop_time);
216 }
217 None => break,
218 }
219 },
220
221 Some(_) = self.intervals.next() => {
222 for timer in self.timers.values_mut() {
223 timer.report();
224 }
225 },
226
227 _ = &mut shutdown => {
228 break
229 }
230 }
231 }
232 }
233}
234
235pub(crate) fn wrap<S>(
243 timer_tx: UtilizationComponentSender,
244 component_key: ComponentKey,
245 inner: S,
246) -> Utilization<S> {
247 Utilization {
248 intervals: IntervalStream::new(interval(Duration::from_secs(5))),
249 timer_tx,
250 component_key,
251 inner,
252 }
253}