1use std::time::Instant;
2
3use metrics::{Histogram, gauge, histogram};
4use vector_common::stats::EwmaGauge;
5
6use crate::event::EventArray;
7
8const COMPONENT_LATENCY: &str = "component_latency_seconds";
9const COMPONENT_LATENCY_MEAN: &str = "component_latency_mean_seconds";
10const DEFAULT_LATENCY_EWMA_ALPHA: f64 = 0.9;
11
12#[derive(Debug)]
13pub struct LatencyRecorder {
14 histogram: Histogram,
15 gauge: EwmaGauge,
16}
17
18impl LatencyRecorder {
19 pub fn new(ewma_alpha: Option<f64>) -> Self {
20 Self {
21 histogram: histogram!(COMPONENT_LATENCY),
22 gauge: EwmaGauge::new(
23 gauge!(COMPONENT_LATENCY_MEAN),
24 ewma_alpha.or(Some(DEFAULT_LATENCY_EWMA_ALPHA)),
25 ),
26 }
27 }
28
29 pub fn on_send(&self, events: &mut EventArray, now: Instant) {
30 let mut sum = 0.0;
31 let mut count = 0usize;
32
33 for mut event in events.iter_events_mut() {
40 let metadata = event.metadata_mut();
41 if let Some(previous) = metadata.last_transform_timestamp() {
42 let latency = now.saturating_duration_since(previous).as_secs_f64();
43 sum += latency;
44 count += 1;
45 self.histogram.record(latency);
46 }
47
48 metadata.set_last_transform_timestamp(now);
49 }
50 if count > 0 {
51 #[expect(
52 clippy::cast_precision_loss,
53 reason = "losing precision is acceptable here"
54 )]
55 let mean = sum / count as f64;
56 self.gauge.record(mean);
57 }
58 }
59}