vector/sinks/util/adaptive_concurrency/mod.rs
1//! Limit the max number of requests being concurrently processed.
2
3mod controller;
4mod future;
5mod layer;
6mod semaphore;
7mod service;
8
9#[cfg(test)]
10pub mod tests;
11
12pub(crate) use layer::AdaptiveConcurrencyLimitLayer;
13pub(crate) use service::AdaptiveConcurrencyLimit;
14use vector_lib::configurable::configurable_component;
15
16fn instant_now() -> std::time::Instant {
17 tokio::time::Instant::now().into()
18}
19
20/// Configuration of adaptive concurrency parameters.
21///
22/// These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or
23/// unstable performance and sink behavior. Proceed with caution.
24// The defaults for these values were chosen after running several simulations on a test service that had
25// various responses to load. The values are the best balances found between competing outcomes.
26#[configurable_component]
27#[derive(Clone, Copy, Debug)]
28#[serde(deny_unknown_fields)]
29pub struct AdaptiveConcurrencySettings {
30 /// The initial concurrency limit to use. If not specified, the initial limit is 1 (no concurrency).
31 ///
32 /// Datadog recommends setting this value to your service's average limit if you're seeing that it takes a
33 /// long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
34 /// `adaptive_concurrency_limit` metric.
35 #[configurable(validation(range(min = 1)))]
36 #[serde(default = "default_initial_concurrency")]
37 pub(super) initial_concurrency: usize,
38
39 /// The fraction of the current value to set the new concurrency limit when decreasing the limit.
40 ///
41 /// Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly
42 /// when latency increases.
43 ///
44 /// **Note**: The new limit is rounded down after applying this ratio.
45 #[configurable(validation(range(min = 0.0, max = 1.0)))]
46 #[serde(default = "default_decrease_ratio")]
47 pub(super) decrease_ratio: f64,
48
49 /// The weighting of new measurements compared to older measurements.
50 ///
51 /// Valid values are greater than `0` and less than `1`.
52 ///
53 /// ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with
54 /// the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has
55 /// unusually high response variability.
56 #[configurable(validation(range(min = 0.0, max = 1.0)))]
57 #[serde(default = "default_ewma_alpha")]
58 pub(super) ewma_alpha: f64,
59
60 /// Scale of RTT deviations which are not considered anomalous.
61 ///
62 /// Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`.
63 ///
64 /// When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable
65 /// those values are. We use that deviation when comparing the past RTT average to the current measurements, so we
66 /// can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to
67 /// an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT.
68 #[configurable(validation(range(min = 0.0)))]
69 #[serde(default = "default_rtt_deviation_scale")]
70 pub(super) rtt_deviation_scale: f64,
71
72 /// The maximum concurrency limit.
73 ///
74 /// The adaptive request concurrency limit does not go above this bound. This is put in place as a safeguard.
75 #[configurable(validation(range(min = 1)))]
76 #[serde(default = "default_max_concurrency_limit")]
77 pub(super) max_concurrency_limit: usize,
78}
79
80const fn default_initial_concurrency() -> usize {
81 1
82}
83
84const fn default_decrease_ratio() -> f64 {
85 0.9
86}
87
88const fn default_ewma_alpha() -> f64 {
89 0.4
90}
91
92const fn default_rtt_deviation_scale() -> f64 {
93 2.5
94}
95
96const fn default_max_concurrency_limit() -> usize {
97 200
98}
99
100impl Default for AdaptiveConcurrencySettings {
101 fn default() -> Self {
102 Self {
103 initial_concurrency: default_initial_concurrency(),
104 decrease_ratio: default_decrease_ratio(),
105 ewma_alpha: default_ewma_alpha(),
106 rtt_deviation_scale: default_rtt_deviation_scale(),
107 max_concurrency_limit: default_max_concurrency_limit(),
108 }
109 }
110}