vector/sinks/util/adaptive_concurrency/
mod.rs

1//! Limit the max number of requests being concurrently processed.
2
3mod controller;
4mod future;
5mod layer;
6mod semaphore;
7mod service;
8
9#[cfg(test)]
10pub mod tests;
11
12pub(crate) use layer::AdaptiveConcurrencyLimitLayer;
13pub(crate) use service::AdaptiveConcurrencyLimit;
14use vector_lib::configurable::configurable_component;
15
16fn instant_now() -> std::time::Instant {
17    tokio::time::Instant::now().into()
18}
19
20/// Configuration of adaptive concurrency parameters.
21///
22/// These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or
23/// unstable performance and sink behavior. Proceed with caution.
24// The defaults for these values were chosen after running several simulations on a test service that had
25// various responses to load. The values are the best balances found between competing outcomes.
26#[configurable_component]
27#[derive(Clone, Copy, Debug)]
28#[serde(deny_unknown_fields)]
29pub struct AdaptiveConcurrencySettings {
30    /// The initial concurrency limit to use. If not specified, the initial limit is 1 (no concurrency).
31    ///
32    /// Datadog recommends setting this value to your service's average limit if you're seeing that it takes a
33    /// long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
34    /// `adaptive_concurrency_limit` metric.
35    #[configurable(validation(range(min = 1)))]
36    #[serde(default = "default_initial_concurrency")]
37    pub(super) initial_concurrency: usize,
38
39    /// The fraction of the current value to set the new concurrency limit when decreasing the limit.
40    ///
41    /// Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly
42    /// when latency increases.
43    ///
44    /// **Note**: The new limit is rounded down after applying this ratio.
45    #[configurable(validation(range(min = 0.0, max = 1.0)))]
46    #[serde(default = "default_decrease_ratio")]
47    pub(super) decrease_ratio: f64,
48
49    /// The weighting of new measurements compared to older measurements.
50    ///
51    /// Valid values are greater than `0` and less than `1`.
52    ///
53    /// ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with
54    /// the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has
55    /// unusually high response variability.
56    #[configurable(validation(range(min = 0.0, max = 1.0)))]
57    #[serde(default = "default_ewma_alpha")]
58    pub(super) ewma_alpha: f64,
59
60    /// Scale of RTT deviations which are not considered anomalous.
61    ///
62    /// Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`.
63    ///
64    /// When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable
65    /// those values are. We use that deviation when comparing the past RTT average to the current measurements, so we
66    /// can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to
67    /// an appropriate range.  Larger values cause the algorithm to ignore larger increases in the RTT.
68    #[configurable(validation(range(min = 0.0)))]
69    #[serde(default = "default_rtt_deviation_scale")]
70    pub(super) rtt_deviation_scale: f64,
71
72    /// The maximum concurrency limit.
73    ///
74    /// The adaptive request concurrency limit does not go above this bound. This is put in place as a safeguard.
75    #[configurable(validation(range(min = 1)))]
76    #[serde(default = "default_max_concurrency_limit")]
77    pub(super) max_concurrency_limit: usize,
78}
79
80const fn default_initial_concurrency() -> usize {
81    1
82}
83
84const fn default_decrease_ratio() -> f64 {
85    0.9
86}
87
88const fn default_ewma_alpha() -> f64 {
89    0.4
90}
91
92const fn default_rtt_deviation_scale() -> f64 {
93    2.5
94}
95
96const fn default_max_concurrency_limit() -> usize {
97    200
98}
99
100impl Default for AdaptiveConcurrencySettings {
101    fn default() -> Self {
102        Self {
103            initial_concurrency: default_initial_concurrency(),
104            decrease_ratio: default_decrease_ratio(),
105            ewma_alpha: default_ewma_alpha(),
106            rtt_deviation_scale: default_rtt_deviation_scale(),
107            max_concurrency_limit: default_max_concurrency_limit(),
108        }
109    }
110}