vector/sinks/util/adaptive_concurrency/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
//! Limit the max number of requests being concurrently processed.
mod controller;
mod future;
mod layer;
mod semaphore;
mod service;
#[cfg(test)]
pub mod tests;
pub(crate) use layer::AdaptiveConcurrencyLimitLayer;
pub(crate) use service::AdaptiveConcurrencyLimit;
use vector_lib::configurable::configurable_component;
fn instant_now() -> std::time::Instant {
tokio::time::Instant::now().into()
}
/// Configuration of adaptive concurrency parameters.
///
/// These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or
/// unstable performance and sink behavior. Proceed with caution.
// The defaults for these values were chosen after running several simulations on a test service that had
// various responses to load. The values are the best balances found between competing outcomes.
#[configurable_component]
#[derive(Clone, Copy, Debug)]
#[serde(deny_unknown_fields)]
pub struct AdaptiveConcurrencySettings {
/// The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
///
/// It is recommended to set this value to your service's average limit if you're seeing that it takes a
/// long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
/// `adaptive_concurrency_limit` metric.
#[configurable(validation(range(min = 1)))]
#[serde(default = "default_initial_concurrency")]
pub(super) initial_concurrency: usize,
/// The fraction of the current value to set the new concurrency limit when decreasing the limit.
///
/// Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly
/// when latency increases.
///
/// Note that the new limit is rounded down after applying this ratio.
#[configurable(validation(range(min = 0.0, max = 1.0)))]
#[serde(default = "default_decrease_ratio")]
pub(super) decrease_ratio: f64,
/// The weighting of new measurements compared to older measurements.
///
/// Valid values are greater than `0` and less than `1`.
///
/// ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with
/// the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has
/// unusually high response variability.
#[configurable(validation(range(min = 0.0, max = 1.0)))]
#[serde(default = "default_ewma_alpha")]
pub(super) ewma_alpha: f64,
/// Scale of RTT deviations which are not considered anomalous.
///
/// Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`.
///
/// When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable
/// those values are. We use that deviation when comparing the past RTT average to the current measurements, so we
/// can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to
/// an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT.
#[configurable(validation(range(min = 0.0)))]
#[serde(default = "default_rtt_deviation_scale")]
pub(super) rtt_deviation_scale: f64,
/// The maximum concurrency limit.
///
/// The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard.
#[configurable(validation(range(min = 1)))]
#[serde(default = "default_max_concurrency_limit")]
pub(super) max_concurrency_limit: usize,
}
const fn default_initial_concurrency() -> usize {
1
}
const fn default_decrease_ratio() -> f64 {
0.9
}
const fn default_ewma_alpha() -> f64 {
0.4
}
const fn default_rtt_deviation_scale() -> f64 {
2.5
}
const fn default_max_concurrency_limit() -> usize {
200
}
impl Default for AdaptiveConcurrencySettings {
fn default() -> Self {
Self {
initial_concurrency: default_initial_concurrency(),
decrease_ratio: default_decrease_ratio(),
ewma_alpha: default_ewma_alpha(),
rtt_deviation_scale: default_rtt_deviation_scale(),
max_concurrency_limit: default_max_concurrency_limit(),
}
}
}