1use std::{
2 collections::HashMap,
3 pin::Pin,
4 sync::{Arc, Mutex},
5 task::{Context, Poll, ready},
6 time::Duration,
7};
8
9#[cfg(not(test))]
10use std::time::Instant;
11
12#[cfg(test)]
13use mock_instant::global::Instant;
14
15use futures::{Stream, StreamExt};
16use metrics::Gauge;
17use pin_project::pin_project;
18use tokio::{
19 sync::mpsc::{Receiver, Sender, channel},
20 time::interval,
21};
22use tokio_stream::wrappers::IntervalStream;
23use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal, stats};
24
25const UTILIZATION_EMITTER_DURATION: Duration = Duration::from_secs(5);
26
27#[pin_project]
33pub(crate) struct Utilization<S> {
34 intervals: IntervalStream,
35 timer_tx: UtilizationComponentSender,
36 component_key: ComponentKey,
37 inner: S,
38}
39
40impl<S> Utilization<S> {
41 pub(crate) fn new(
46 timer_tx: UtilizationComponentSender,
47 component_key: ComponentKey,
48 inner: S,
49 ) -> Self {
50 Self {
51 intervals: IntervalStream::new(interval(Duration::from_secs(5))),
52 timer_tx,
53 component_key,
54 inner,
55 }
56 }
57
58 #[allow(clippy::missing_const_for_fn)]
63 pub(crate) fn into_inner(self) -> S {
64 self.inner
65 }
66}
67
68impl<S> Stream for Utilization<S>
69where
70 S: Stream + Unpin,
71{
72 type Item = S::Item;
73
74 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75 let this = self.project();
83 this.timer_tx.try_send_start_wait();
84 let _ = this.intervals.poll_next_unpin(cx);
85 let result = ready!(this.inner.poll_next_unpin(cx));
86 this.timer_tx.try_send_stop_wait();
87 Poll::Ready(result)
88 }
89}
90
91pub(crate) struct Timer {
92 overall_start: Instant,
93 span_start: Instant,
94 waiting: bool,
95 total_wait: Duration,
96 ewma: stats::Ewma,
97 gauge: Gauge,
98 #[cfg(debug_assertions)]
99 component_id: Arc<str>,
100}
101
102impl Timer {
111 pub(crate) fn new(gauge: Gauge, #[cfg(debug_assertions)] component_id: Arc<str>) -> Self {
112 Self {
113 overall_start: Instant::now(),
114 span_start: Instant::now(),
115 waiting: false,
116 total_wait: Duration::new(0, 0),
117 ewma: stats::Ewma::new(0.9),
118 gauge,
119 #[cfg(debug_assertions)]
120 component_id,
121 }
122 }
123
124 pub(crate) fn start_wait(&mut self, at: Instant) {
126 if !self.waiting {
127 self.end_span(at.max(self.overall_start));
129 self.waiting = true;
130 }
131 }
132
133 pub(crate) fn stop_wait(&mut self, at: Instant) {
135 if self.waiting {
136 self.end_span(at.max(self.overall_start));
138 self.waiting = false;
139 }
140 }
141
142 pub(crate) fn update_utilization(&mut self) {
146 let now = Instant::now();
150 self.end_span(now);
151
152 let total_duration = now.duration_since(self.overall_start);
153 let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
154 let utilization = 1.0 - wait_ratio;
155
156 self.ewma.update(utilization);
157 let avg = self.ewma.average().unwrap_or(f64::NAN);
158 let avg_rounded = (avg * 10000.0).round() / 10000.0; self.gauge.set(avg_rounded);
160
161 self.overall_start = now;
163 self.total_wait = Duration::new(0, 0);
164
165 #[cfg(debug_assertions)]
166 debug!(component_id = %self.component_id, utilization = %avg_rounded, internal_log_rate_limit = false);
167 }
168
169 fn end_span(&mut self, at: Instant) {
170 if self.waiting {
171 self.total_wait += at.duration_since(self.span_start);
174 }
175 self.span_start = at;
176 }
177}
178
179#[derive(Debug)]
180enum UtilizationTimerMessage {
181 StartWait(ComponentKey, Instant),
182 StopWait(ComponentKey, Instant),
183}
184
185#[derive(Clone)]
186pub(crate) struct UtilizationComponentSender {
187 component_key: ComponentKey,
188 timer_tx: Sender<UtilizationTimerMessage>,
189}
190
191impl UtilizationComponentSender {
192 pub(crate) fn try_send_start_wait(&self) {
193 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait(
194 self.component_key.clone(),
195 Instant::now(),
196 )) {
197 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message.");
198 }
199 }
200
201 pub(crate) fn try_send_stop_wait(&self) {
202 if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait(
203 self.component_key.clone(),
204 Instant::now(),
205 )) {
206 debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message.");
207 }
208 }
209}
210
211#[derive(Clone)]
215pub struct UtilizationRegistry {
216 timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
217 timer_tx: Sender<UtilizationTimerMessage>,
218}
219
220impl UtilizationRegistry {
221 pub(crate) fn add_component(
225 &self,
226 key: ComponentKey,
227 gauge: Gauge,
228 ) -> UtilizationComponentSender {
229 self.timers.lock().expect("mutex poisoned").insert(
230 key.clone(),
231 Timer::new(
232 gauge,
233 #[cfg(debug_assertions)]
234 key.id().into(),
235 ),
236 );
237 UtilizationComponentSender {
238 timer_tx: self.timer_tx.clone(),
239 component_key: key,
240 }
241 }
242
243 pub(crate) fn remove_component(&self, key: &ComponentKey) {
245 self.timers.lock().expect("mutex poisoned").remove(key);
246 }
247}
248
249pub(crate) struct UtilizationEmitter {
250 timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
251 timer_rx: Receiver<UtilizationTimerMessage>,
252}
253
254impl UtilizationEmitter {
255 pub(crate) fn new() -> (Self, UtilizationRegistry) {
256 let (timer_tx, timer_rx) = channel(4096);
257 let timers = Arc::new(Mutex::new(HashMap::default()));
258 (
259 Self {
260 timers: Arc::clone(&timers),
261 timer_rx,
262 },
263 UtilizationRegistry { timers, timer_tx },
264 )
265 }
266
267 pub(crate) async fn run_utilization(mut self, mut shutdown: ShutdownSignal) {
268 let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
269 loop {
270 tokio::select! {
271 message = self.timer_rx.recv() => {
272 match message {
273 Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
274 if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
276 timer.start_wait(start_time);
277 }
278 }
279 Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
280 if let Some(timer) = self.timers.lock().expect("mutex poisoned").get_mut(&key) {
282 timer.stop_wait(stop_time);
283 }
284 }
285 None => break,
286 }
287 },
288
289 Some(_) = intervals.next() => {
290 for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
291 timer.update_utilization();
292 }
293 },
294
295 _ = &mut shutdown => {
296 break
297 }
298 }
299 }
300 }
301}
302
303pub(crate) struct OutputUtilization<S> {
312 timer_tx: UtilizationComponentSender,
313 inner: S,
314}
315
316impl<S> Stream for OutputUtilization<S>
317where
318 S: Stream + Unpin,
319{
320 type Item = S::Item;
321
322 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
323 let this = self.get_mut();
324 this.timer_tx.try_send_stop_wait();
325 let result = ready!(this.inner.poll_next_unpin(cx));
326 if result.is_some() {
327 this.timer_tx.try_send_start_wait();
328 }
329 Poll::Ready(result)
330 }
331}
332
333impl<S> OutputUtilization<S> {
334 pub(crate) const fn new(timer_tx: UtilizationComponentSender, inner: S) -> Self {
339 Self { timer_tx, inner }
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use mock_instant::global::MockClock;
346 use serial_test::serial;
347 use strum::IntoEnumIterator;
348 use vector_lib::gauge;
349 use vector_lib::internal_event::GaugeName;
350
351 use super::*;
352
353 fn setup_timer() -> Timer {
355 MockClock::set_time(Duration::from_secs(100));
357
358 Timer::new(
359 gauge!(GaugeName::iter().next().unwrap()),
360 #[cfg(debug_assertions)]
361 "test_component".into(),
362 )
363 }
364
365 const TOLERANCE: f64 = 0.01;
366
367 fn assert_approx_eq(actual: f64, expected: f64, description: &str) {
370 assert!(
371 (0.0..=1.0).contains(&actual),
372 "Utilization {actual} is outside [0, 1]"
373 );
374 assert!(
375 (actual - expected).abs() < TOLERANCE,
376 "Expected utilization {description}, got {actual}"
377 );
378 }
379
380 #[test]
381 #[serial]
382 fn test_utilization_in_bounds_on_late_start() {
383 let mut timer = setup_timer();
384
385 MockClock::advance(Duration::from_secs(5));
386
387 timer.update_utilization();
388
389 let avg = timer.ewma.average().unwrap();
390 assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
391
392 timer.start_wait(Instant::now() - Duration::from_secs(1));
394 MockClock::advance(Duration::from_secs(5));
395
396 timer.update_utilization();
397 let avg = timer.ewma.average().unwrap();
398 assert_approx_eq(avg, 0.1, "~0.1");
399 }
400
401 #[test]
402 #[serial]
403 fn test_utilization_in_bounds_on_late_stop() {
404 let mut timer = setup_timer();
405
406 MockClock::advance(Duration::from_secs(5));
407
408 timer.waiting = true;
409 timer.update_utilization();
410
411 let avg = timer.ewma.average().unwrap();
412 assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
413
414 timer.stop_wait(Instant::now() - Duration::from_secs(4));
416 MockClock::advance(Duration::from_secs(5));
417
418 timer.update_utilization();
419 let avg = timer.ewma.average().unwrap();
420 assert_approx_eq(avg, 0.9, "~0.9");
421 }
422
423 #[test]
424 #[serial]
425 fn test_normal_utilization_within_bounds() {
426 let mut timer = setup_timer();
427
428 MockClock::advance(Duration::from_secs(1));
430 timer.start_wait(Instant::now());
431
432 MockClock::advance(Duration::from_secs(2));
434 timer.stop_wait(Instant::now());
435
436 MockClock::advance(Duration::from_secs(2));
438 timer.update_utilization();
439
440 let avg = timer.ewma.average().unwrap();
443 assert_approx_eq(avg, 0.6, "~0.6");
444 }
445
446 #[test]
447 #[serial]
448 fn test_always_waiting_utilization() {
449 let mut timer = setup_timer();
450
451 timer.start_wait(Instant::now());
453
454 MockClock::advance(Duration::from_secs(5));
456 timer.update_utilization();
457
458 let avg = timer.ewma.average().unwrap();
461 assert_approx_eq(avg, 0.0, "near 0 (always waiting)");
462 }
463
464 #[test]
465 #[serial]
466 fn test_never_waiting_utilization() {
467 let mut timer = setup_timer();
468
469 MockClock::advance(Duration::from_secs(5));
471 timer.update_utilization();
472
473 let avg = timer.ewma.average().unwrap();
476 assert_approx_eq(avg, 1.0, "near 1.0 (never waiting)");
477 }
478
479 struct MockTaskTransform {
482 processing_time: Duration,
483 }
484
485 use crate::event::EventArray;
486 use crate::transforms::TaskTransform;
487
488 impl TaskTransform<EventArray> for MockTaskTransform {
489 fn transform(
490 self: Box<Self>,
491 task: Pin<Box<dyn Stream<Item = EventArray> + Send>>,
492 ) -> Pin<Box<dyn Stream<Item = EventArray> + Send>> {
493 let processing_time = self.processing_time;
494 task.map(move |events| {
495 MockClock::advance(processing_time);
496 events
497 })
498 .boxed()
499 }
500 }
501
502 #[tokio::test]
516 #[serial]
517 async fn test_task_transform_utilization_end_to_end() {
518 use crate::event::{EventArray, LogEvent};
519 use crate::transforms::TaskTransform;
520 use futures::SinkExt;
521 use futures::channel::mpsc as futures_mpsc;
522
523 MockClock::set_time(Duration::from_secs(100));
524
525 let (mut emitter, registry) = UtilizationEmitter::new();
526 let key = ComponentKey::from("test_transform");
527
528 let sender = registry.add_component(key.clone(), gauge!(GaugeName::Utilization));
529 let output_sender = sender.clone();
530
531 let (mut input_tx, input_rx) = futures_mpsc::channel::<EventArray>(10);
533
534 let input_wrapped = Utilization::new(sender, key.clone(), input_rx);
537 let transform = Box::new(MockTaskTransform {
538 processing_time: Duration::from_secs(2),
539 });
540 let transform_output = transform.transform(Box::pin(input_wrapped));
541 let mut pipeline = OutputUtilization::new(output_sender, transform_output);
542
543 let waker = futures::task::noop_waker();
544 let mut cx = std::task::Context::from_waker(&waker);
545
546 assert!(Pin::new(&mut pipeline).poll_next(&mut cx).is_pending());
549 MockClock::advance(Duration::from_secs(3));
550 check_timer_utilization(&mut emitter, &key, 0.0, "0.0 (phase 1: all input wait)");
552
553 input_tx
555 .send(EventArray::from(LogEvent::default()))
556 .await
557 .unwrap();
558 assert!(Pin::new(&mut pipeline).poll_next(&mut cx).is_ready());
559 check_timer_utilization(&mut emitter, &key, 0.9, "0.9 (phase 2: all work)");
563
564 input_tx
568 .send(EventArray::from(LogEvent::default()))
569 .await
570 .unwrap();
571 MockClock::advance(Duration::from_secs(3));
572 check_timer_utilization(
575 &mut emitter,
576 &key,
577 0.09,
578 "0.09 (phase 3: downstream wait despite buffered input)",
579 );
580
581 assert!(Pin::new(&mut pipeline).poll_next(&mut cx).is_ready());
583 check_timer_utilization(
586 &mut emitter,
587 &key,
588 0.909,
589 "0.909 (phase 4: queued event processed)",
590 );
591 }
592
593 fn check_timer_utilization(
596 emitter: &mut UtilizationEmitter,
597 key: &ComponentKey,
598 expected: f64,
599 description: &str,
600 ) {
601 drain_emitter_messages(emitter);
602 let mut timers = emitter.timers.lock().expect("mutex poisoned");
603 let timer = timers.get_mut(key).expect("timer should exist");
604 timer.update_utilization();
605 let avg = timer.ewma.average().unwrap();
606 assert_approx_eq(avg, expected, description);
607 }
608
609 fn drain_emitter_messages(emitter: &mut UtilizationEmitter) {
612 while let Ok(message) = emitter.timer_rx.try_recv() {
613 let mut timers = emitter.timers.lock().expect("mutex poisoned");
614 match message {
615 UtilizationTimerMessage::StartWait(key, at) => {
616 if let Some(timer) = timers.get_mut(&key) {
617 timer.start_wait(at);
618 }
619 }
620 UtilizationTimerMessage::StopWait(key, at) => {
621 if let Some(timer) = timers.get_mut(&key) {
622 timer.stop_wait(at);
623 }
624 }
625 }
626 }
627 }
628}