vector/sinks/util/adaptive_concurrency/
controller.rs

1use std::{
2    future::Future,
3    sync::{Arc, Mutex, MutexGuard},
4    time::{Duration, Instant},
5};
6
7use tokio::sync::OwnedSemaphorePermit;
8use tower::timeout::error::Elapsed;
9use vector_lib::internal_event::{InternalEventHandle as _, Registered};
10
11use super::{instant_now, semaphore::ShrinkableSemaphore, AdaptiveConcurrencySettings};
12#[cfg(test)]
13use crate::test_util::stats::{TimeHistogram, TimeWeightedSum};
14use crate::{
15    http::HttpError,
16    internal_events::{
17        AdaptiveConcurrencyAveragedRtt, AdaptiveConcurrencyInFlight, AdaptiveConcurrencyLimit,
18        AdaptiveConcurrencyLimitData, AdaptiveConcurrencyObservedRtt,
19    },
20    sinks::util::retries::{RetryAction, RetryLogic},
21    stats::{EwmaVar, Mean, MeanVariance},
22};
23
24/// Shared class for `tokio::sync::Semaphore` that manages adjusting the
25/// semaphore size and other associated data.
26#[derive(Clone)]
27pub(super) struct Controller<L> {
28    semaphore: Arc<ShrinkableSemaphore>,
29    concurrency: Option<usize>,
30    settings: AdaptiveConcurrencySettings,
31    logic: L,
32    pub(super) inner: Arc<Mutex<Inner>>,
33    #[cfg(test)]
34    pub(super) stats: Arc<Mutex<ControllerStatistics>>,
35
36    limit: Registered<AdaptiveConcurrencyLimit>,
37    in_flight: Registered<AdaptiveConcurrencyInFlight>,
38    observed_rtt: Registered<AdaptiveConcurrencyObservedRtt>,
39    averaged_rtt: Registered<AdaptiveConcurrencyAveragedRtt>,
40}
41
42#[derive(Debug)]
43pub(super) struct Inner {
44    pub(super) current_limit: usize,
45    in_flight: usize,
46    past_rtt: EwmaVar,
47    next_update: Instant,
48    current_rtt: Mean,
49    had_back_pressure: bool,
50    reached_limit: bool,
51}
52
53#[cfg(test)]
54#[derive(Debug, Default)]
55pub(super) struct ControllerStatistics {
56    pub(super) in_flight: TimeHistogram,
57    pub(super) concurrency_limit: TimeHistogram,
58    pub(super) observed_rtt: TimeWeightedSum,
59    pub(super) averaged_rtt: TimeWeightedSum,
60}
61
62impl<L> Controller<L> {
63    pub(super) fn new(
64        concurrency: Option<usize>,
65        settings: AdaptiveConcurrencySettings,
66        logic: L,
67    ) -> Self {
68        // If a `concurrency` is specified, it becomes both the
69        // current limit and the maximum, effectively bypassing all the
70        // mechanisms. Otherwise, the current limit is set to 1 and the
71        // maximum to `settings.max_concurrency_limit`.
72        let current_limit = concurrency.unwrap_or(settings.initial_concurrency);
73        Self {
74            semaphore: Arc::new(ShrinkableSemaphore::new(current_limit)),
75            concurrency,
76            settings,
77            logic,
78            inner: Arc::new(Mutex::new(Inner {
79                current_limit,
80                in_flight: 0,
81                past_rtt: EwmaVar::new(settings.ewma_alpha),
82                next_update: instant_now(),
83                current_rtt: Default::default(),
84                had_back_pressure: false,
85                reached_limit: false,
86            })),
87            #[cfg(test)]
88            stats: Arc::new(Mutex::new(ControllerStatistics::default())),
89            limit: register!(AdaptiveConcurrencyLimit),
90            in_flight: register!(AdaptiveConcurrencyInFlight),
91            observed_rtt: register!(AdaptiveConcurrencyObservedRtt),
92            averaged_rtt: register!(AdaptiveConcurrencyAveragedRtt),
93        }
94    }
95
96    /// An estimate of current load on service managed by this controller.
97    ///
98    /// 0.0 is no load, while 1.0 is max load.
99    pub(super) fn load(&self) -> f64 {
100        let inner = self.inner.lock().expect("Controller mutex is poisoned");
101        if inner.current_limit > 0 {
102            inner.in_flight as f64 / inner.current_limit as f64
103        } else {
104            1.0
105        }
106    }
107
108    pub(super) fn acquire(
109        &self,
110    ) -> impl Future<Output = OwnedSemaphorePermit> + Send + 'static + use<L> {
111        Arc::clone(&self.semaphore).acquire()
112    }
113
114    pub(super) fn start_request(&self) {
115        let mut inner = self.inner.lock().expect("Controller mutex is poisoned");
116
117        #[cfg(test)]
118        {
119            let mut stats = self.stats.lock().expect("Stats mutex is poisoned");
120            stats.in_flight.add(inner.in_flight, instant_now());
121        }
122
123        inner.in_flight += 1;
124        if inner.in_flight >= inner.current_limit {
125            inner.reached_limit = true;
126        }
127
128        self.in_flight.emit(inner.in_flight as u64);
129    }
130
131    /// Adjust the controller to a response, based on type of response
132    /// given (backpressure or not) and if it should be used as a valid
133    /// RTT measurement.
134    fn adjust_to_response_inner(&self, start: Instant, is_back_pressure: bool, use_rtt: bool) {
135        let now = instant_now();
136        let mut inner = self.inner.lock().expect("Controller mutex is poisoned");
137
138        let rtt = now.saturating_duration_since(start);
139        if use_rtt {
140            self.observed_rtt.emit(rtt);
141        }
142        let rtt = rtt.as_secs_f64();
143
144        if is_back_pressure {
145            inner.had_back_pressure = true;
146        }
147
148        #[cfg(test)]
149        let mut stats = self.stats.lock().expect("Stats mutex is poisoned");
150
151        #[cfg(test)]
152        {
153            if use_rtt {
154                stats.observed_rtt.add(rtt, now);
155            }
156            stats.in_flight.add(inner.in_flight, now);
157        }
158
159        inner.in_flight -= 1;
160        self.in_flight.emit(inner.in_flight as u64);
161
162        if use_rtt {
163            inner.current_rtt.update(rtt);
164        }
165        let current_rtt = inner.current_rtt.average();
166
167        // When the RTT values are all exactly the same, as for the
168        // "constant link" test, the average calculation above produces
169        // results either the exact value or that value plus epsilon,
170        // depending on the number of samples. This ends up throttling
171        // aggressively due to the high side falling outside of the
172        // calculated deviance. Rounding these values forces the
173        // differences to zero.
174        #[cfg(test)]
175        let current_rtt = current_rtt.map(|c| (c * 1000000.0).round() / 1000000.0);
176
177        match inner.past_rtt.state() {
178            None => {
179                // No past measurements, set up initial values.
180                if let Some(current_rtt) = current_rtt {
181                    inner.past_rtt.update(current_rtt);
182                    inner.next_update = now + Duration::from_secs_f64(current_rtt);
183                }
184            }
185            Some(mut past_rtt) => {
186                if now >= inner.next_update {
187                    #[cfg(test)]
188                    {
189                        if let Some(current_rtt) = current_rtt {
190                            stats.averaged_rtt.add(current_rtt, now);
191                        }
192                        stats.concurrency_limit.add(inner.current_limit, now);
193                        drop(stats); // Drop the stats lock a little earlier on this path
194                    }
195
196                    if let Some(current_rtt) = current_rtt {
197                        self.averaged_rtt.emit(Duration::from_secs_f64(current_rtt));
198                    }
199
200                    // Only manage the concurrency if `concurrency` was set to "adaptive"
201                    if self.concurrency.is_none() {
202                        self.manage_limit(&mut inner, past_rtt, current_rtt);
203                    }
204
205                    // Reset values for next interval
206                    if let Some(current_rtt) = current_rtt {
207                        past_rtt = inner.past_rtt.update(current_rtt);
208                    }
209                    inner.next_update = now + Duration::from_secs_f64(past_rtt.mean);
210                    inner.current_rtt = Default::default();
211                    inner.had_back_pressure = false;
212                    inner.reached_limit = false;
213                }
214            }
215        }
216    }
217
218    fn manage_limit(
219        &self,
220        inner: &mut MutexGuard<Inner>,
221        past_rtt: MeanVariance,
222        current_rtt: Option<f64>,
223    ) {
224        let past_rtt_deviation = past_rtt.variance.sqrt();
225        let threshold = past_rtt_deviation * self.settings.rtt_deviation_scale;
226
227        // Normal quick responses trigger an increase in the
228        // concurrency limit. Note that we only check this if we had
229        // requests to go beyond the current limit to prevent
230        // increasing the limit beyond what we have evidence for.
231        if inner.current_limit < self.settings.max_concurrency_limit
232            && inner.reached_limit
233            && !inner.had_back_pressure
234            && current_rtt.is_some()
235            && current_rtt.unwrap() <= past_rtt.mean
236        {
237            // Increase (additive) the current concurrency limit
238            self.semaphore.add_permits(1);
239            inner.current_limit += 1;
240        }
241        // Back pressure responses, either explicit or implicit due
242        // to increasing response times, trigger a decrease in the
243        // concurrency limit.
244        else if inner.current_limit > 1
245            && (inner.had_back_pressure || current_rtt.unwrap_or(0.0) >= past_rtt.mean + threshold)
246        {
247            // Decrease (multiplicative) the current concurrency limit. The floor rounding in the
248            // `usize` conversion guarantees the new limit is smaller than the current limit, and
249            // the `.max` ensures the new limit is above zero.
250            let new_limit =
251                ((inner.current_limit as f64 * self.settings.decrease_ratio) as usize).max(1);
252            self.semaphore
253                .forget_permits(inner.current_limit - new_limit);
254            inner.current_limit = new_limit;
255        }
256        self.limit.emit(AdaptiveConcurrencyLimitData {
257            concurrency: inner.current_limit as u64,
258            reached_limit: inner.reached_limit,
259            had_back_pressure: inner.had_back_pressure,
260            current_rtt: current_rtt.map(Duration::from_secs_f64),
261            past_rtt: Duration::from_secs_f64(past_rtt.mean),
262            past_rtt_deviation: Duration::from_secs_f64(past_rtt_deviation),
263        });
264    }
265}
266
267impl<L> Controller<L>
268where
269    L: RetryLogic,
270{
271    pub(super) fn adjust_to_response(
272        &self,
273        start: Instant,
274        response: &Result<L::Response, crate::Error>,
275    ) {
276        // It would be better to avoid generating the string in Retry(_)
277        // just to throw it away here, but it's probably not worth the
278        // effort.
279        let response_action = response
280            .as_ref()
281            .map(|resp| self.logic.should_retry_response(resp));
282        let is_back_pressure = match &response_action {
283            Ok(action) => matches!(action, RetryAction::Retry(_)),
284            Err(error) => {
285                if let Some(error) = error.downcast_ref::<L::Error>() {
286                    self.logic.is_retriable_error(error)
287                } else if error.downcast_ref::<Elapsed>().is_some() {
288                    true
289                } else if error.downcast_ref::<HttpError>().is_some() {
290                    // HTTP protocol-level errors are not backpressure
291                    false
292                } else {
293                    warn!(
294                        message = "Unhandled error response.",
295                        %error,
296                        internal_log_rate_limit = true
297                    );
298                    false
299                }
300            }
301        };
302        // Only adjust to the RTT when the request was successfully processed.
303        let use_rtt = matches!(response_action, Ok(RetryAction::Successful));
304        self.adjust_to_response_inner(start, is_back_pressure, use_rtt)
305    }
306}