vector/internal_events/
adaptive_concurrency.rs1use std::time::Duration;
2
3use metrics::{histogram, Histogram};
4
5#[derive(Clone, Copy)]
6pub struct AdaptiveConcurrencyLimitData {
7 pub concurrency: u64,
8 pub reached_limit: bool,
9 pub had_back_pressure: bool,
10 pub current_rtt: Option<Duration>,
11 pub past_rtt: Duration,
12 pub past_rtt_deviation: Duration,
13}
14
15registered_event! {
16 AdaptiveConcurrencyLimit => {
17 limit: Histogram = histogram!("adaptive_concurrency_limit"),
21 reached_limit: Histogram = histogram!("adaptive_concurrency_reached_limit"),
22 back_pressure: Histogram = histogram!("adaptive_concurrency_back_pressure"),
23 past_rtt_mean: Histogram = histogram!("adaptive_concurrency_past_rtt_mean"),
24 }
25
26 fn emit(&self, data: AdaptiveConcurrencyLimitData) {
27 self.limit.record(data.concurrency as f64);
28 let reached_limit = data.reached_limit.then_some(1.0).unwrap_or_default();
29 self.reached_limit.record(reached_limit);
30 let back_pressure = data.had_back_pressure.then_some(1.0).unwrap_or_default();
31 self.back_pressure.record(back_pressure);
32 self.past_rtt_mean.record(data.past_rtt);
33 }
35}
36
37registered_event! {
38 AdaptiveConcurrencyInFlight => {
39 in_flight: Histogram = histogram!("adaptive_concurrency_in_flight"),
40 }
41
42 fn emit(&self, in_flight: u64) {
43 self.in_flight.record(in_flight as f64);
44 }
45}
46
47registered_event! {
48 AdaptiveConcurrencyObservedRtt => {
49 observed_rtt: Histogram = histogram!("adaptive_concurrency_observed_rtt"),
50 }
51
52 fn emit(&self, rtt: Duration) {
53 self.observed_rtt.record(rtt);
54 }
55}
56
57registered_event! {
58 AdaptiveConcurrencyAveragedRtt => {
59 averaged_rtt: Histogram = histogram!("adaptive_concurrency_averaged_rtt"),
60 }
61
62 fn emit(&self, rtt: Duration) {
63 self.averaged_rtt.record(rtt);
64 }
65}