vector/internal_events/
adaptive_concurrency.rs

1use std::time::Duration;
2
3use metrics::{histogram, Histogram};
4
5#[derive(Clone, Copy)]
6pub struct AdaptiveConcurrencyLimitData {
7    pub concurrency: u64,
8    pub reached_limit: bool,
9    pub had_back_pressure: bool,
10    pub current_rtt: Option<Duration>,
11    pub past_rtt: Duration,
12    pub past_rtt_deviation: Duration,
13}
14
15registered_event! {
16    AdaptiveConcurrencyLimit => {
17        // These are histograms, as they may have a number of different
18        // values over each reporting interval, and each of those values
19        // is valuable for diagnosis.
20        limit: Histogram = histogram!("adaptive_concurrency_limit"),
21        reached_limit: Histogram = histogram!("adaptive_concurrency_reached_limit"),
22        back_pressure: Histogram = histogram!("adaptive_concurrency_back_pressure"),
23        past_rtt_mean: Histogram = histogram!("adaptive_concurrency_past_rtt_mean"),
24    }
25
26    fn emit(&self, data: AdaptiveConcurrencyLimitData) {
27        self.limit.record(data.concurrency as f64);
28        let reached_limit = data.reached_limit.then_some(1.0).unwrap_or_default();
29        self.reached_limit.record(reached_limit);
30        let back_pressure = data.had_back_pressure.then_some(1.0).unwrap_or_default();
31        self.back_pressure.record(back_pressure);
32        self.past_rtt_mean.record(data.past_rtt);
33        // past_rtt_deviation is unrecorded
34    }
35}
36
37registered_event! {
38    AdaptiveConcurrencyInFlight => {
39        in_flight: Histogram = histogram!("adaptive_concurrency_in_flight"),
40    }
41
42    fn emit(&self, in_flight: u64) {
43        self.in_flight.record(in_flight as f64);
44    }
45}
46
47registered_event! {
48    AdaptiveConcurrencyObservedRtt => {
49        observed_rtt: Histogram = histogram!("adaptive_concurrency_observed_rtt"),
50    }
51
52    fn emit(&self, rtt: Duration) {
53        self.observed_rtt.record(rtt);
54    }
55}
56
57registered_event! {
58    AdaptiveConcurrencyAveragedRtt => {
59        averaged_rtt: Histogram = histogram!("adaptive_concurrency_averaged_rtt"),
60    }
61
62    fn emit(&self, rtt: Duration) {
63        self.averaged_rtt.record(rtt);
64    }
65}