vector_core/
latency.rs

1use std::time::Instant;
2
3use metrics::{Histogram, gauge, histogram};
4use vector_common::stats::EwmaGauge;
5
6use crate::event::EventArray;
7
8const COMPONENT_LATENCY: &str = "component_latency_seconds";
9const COMPONENT_LATENCY_MEAN: &str = "component_latency_mean_seconds";
10const DEFAULT_LATENCY_EWMA_ALPHA: f64 = 0.9;
11
12#[derive(Debug)]
13pub struct LatencyRecorder {
14    histogram: Histogram,
15    gauge: EwmaGauge,
16}
17
18impl LatencyRecorder {
19    pub fn new(ewma_alpha: Option<f64>) -> Self {
20        Self {
21            histogram: histogram!(COMPONENT_LATENCY),
22            gauge: EwmaGauge::new(
23                gauge!(COMPONENT_LATENCY_MEAN),
24                ewma_alpha.or(Some(DEFAULT_LATENCY_EWMA_ALPHA)),
25            ),
26        }
27    }
28
29    pub fn on_send(&self, events: &mut EventArray, now: Instant) {
30        let mut sum = 0.0;
31        let mut count = 0usize;
32
33        // Since all of the events in the array will most likely have entered and exited the
34        // component at close to the same time, we average all the latencies over the entire array
35        // and record it just once in the EWMA-backed gauge. If we were to record each latency
36        // individually, the gauge would effectively just reflect the latest array's latency,
37        // eliminating the utility of the EWMA averaging. However, we record the individual
38        // latencies in the histogram to get a more granular view of the latency distribution.
39        for mut event in events.iter_events_mut() {
40            let metadata = event.metadata_mut();
41            if let Some(previous) = metadata.last_transform_timestamp() {
42                let latency = now.saturating_duration_since(previous).as_secs_f64();
43                sum += latency;
44                count += 1;
45                self.histogram.record(latency);
46            }
47
48            metadata.set_last_transform_timestamp(now);
49        }
50        if count > 0 {
51            #[expect(
52                clippy::cast_precision_loss,
53                reason = "losing precision is acceptable here"
54            )]
55            let mean = sum / count as f64;
56            self.gauge.record(mean);
57        }
58    }
59}