vector/sinks/util/adaptive_concurrency/
controller.rs1use std::{
2 future::Future,
3 sync::{Arc, Mutex, MutexGuard},
4 time::{Duration, Instant},
5};
6
7use tokio::sync::OwnedSemaphorePermit;
8use tower::timeout::error::Elapsed;
9use vector_lib::internal_event::{InternalEventHandle as _, Registered};
10
11use super::{instant_now, semaphore::ShrinkableSemaphore, AdaptiveConcurrencySettings};
12#[cfg(test)]
13use crate::test_util::stats::{TimeHistogram, TimeWeightedSum};
14use crate::{
15 http::HttpError,
16 internal_events::{
17 AdaptiveConcurrencyAveragedRtt, AdaptiveConcurrencyInFlight, AdaptiveConcurrencyLimit,
18 AdaptiveConcurrencyLimitData, AdaptiveConcurrencyObservedRtt,
19 },
20 sinks::util::retries::{RetryAction, RetryLogic},
21 stats::{EwmaVar, Mean, MeanVariance},
22};
23
24#[derive(Clone)]
27pub(super) struct Controller<L> {
28 semaphore: Arc<ShrinkableSemaphore>,
29 concurrency: Option<usize>,
30 settings: AdaptiveConcurrencySettings,
31 logic: L,
32 pub(super) inner: Arc<Mutex<Inner>>,
33 #[cfg(test)]
34 pub(super) stats: Arc<Mutex<ControllerStatistics>>,
35
36 limit: Registered<AdaptiveConcurrencyLimit>,
37 in_flight: Registered<AdaptiveConcurrencyInFlight>,
38 observed_rtt: Registered<AdaptiveConcurrencyObservedRtt>,
39 averaged_rtt: Registered<AdaptiveConcurrencyAveragedRtt>,
40}
41
42#[derive(Debug)]
43pub(super) struct Inner {
44 pub(super) current_limit: usize,
45 in_flight: usize,
46 past_rtt: EwmaVar,
47 next_update: Instant,
48 current_rtt: Mean,
49 had_back_pressure: bool,
50 reached_limit: bool,
51}
52
53#[cfg(test)]
54#[derive(Debug, Default)]
55pub(super) struct ControllerStatistics {
56 pub(super) in_flight: TimeHistogram,
57 pub(super) concurrency_limit: TimeHistogram,
58 pub(super) observed_rtt: TimeWeightedSum,
59 pub(super) averaged_rtt: TimeWeightedSum,
60}
61
62impl<L> Controller<L> {
63 pub(super) fn new(
64 concurrency: Option<usize>,
65 settings: AdaptiveConcurrencySettings,
66 logic: L,
67 ) -> Self {
68 let current_limit = concurrency.unwrap_or(settings.initial_concurrency);
73 Self {
74 semaphore: Arc::new(ShrinkableSemaphore::new(current_limit)),
75 concurrency,
76 settings,
77 logic,
78 inner: Arc::new(Mutex::new(Inner {
79 current_limit,
80 in_flight: 0,
81 past_rtt: EwmaVar::new(settings.ewma_alpha),
82 next_update: instant_now(),
83 current_rtt: Default::default(),
84 had_back_pressure: false,
85 reached_limit: false,
86 })),
87 #[cfg(test)]
88 stats: Arc::new(Mutex::new(ControllerStatistics::default())),
89 limit: register!(AdaptiveConcurrencyLimit),
90 in_flight: register!(AdaptiveConcurrencyInFlight),
91 observed_rtt: register!(AdaptiveConcurrencyObservedRtt),
92 averaged_rtt: register!(AdaptiveConcurrencyAveragedRtt),
93 }
94 }
95
96 pub(super) fn load(&self) -> f64 {
100 let inner = self.inner.lock().expect("Controller mutex is poisoned");
101 if inner.current_limit > 0 {
102 inner.in_flight as f64 / inner.current_limit as f64
103 } else {
104 1.0
105 }
106 }
107
108 pub(super) fn acquire(
109 &self,
110 ) -> impl Future<Output = OwnedSemaphorePermit> + Send + 'static + use<L> {
111 Arc::clone(&self.semaphore).acquire()
112 }
113
114 pub(super) fn start_request(&self) {
115 let mut inner = self.inner.lock().expect("Controller mutex is poisoned");
116
117 #[cfg(test)]
118 {
119 let mut stats = self.stats.lock().expect("Stats mutex is poisoned");
120 stats.in_flight.add(inner.in_flight, instant_now());
121 }
122
123 inner.in_flight += 1;
124 if inner.in_flight >= inner.current_limit {
125 inner.reached_limit = true;
126 }
127
128 self.in_flight.emit(inner.in_flight as u64);
129 }
130
131 fn adjust_to_response_inner(&self, start: Instant, is_back_pressure: bool, use_rtt: bool) {
135 let now = instant_now();
136 let mut inner = self.inner.lock().expect("Controller mutex is poisoned");
137
138 let rtt = now.saturating_duration_since(start);
139 if use_rtt {
140 self.observed_rtt.emit(rtt);
141 }
142 let rtt = rtt.as_secs_f64();
143
144 if is_back_pressure {
145 inner.had_back_pressure = true;
146 }
147
148 #[cfg(test)]
149 let mut stats = self.stats.lock().expect("Stats mutex is poisoned");
150
151 #[cfg(test)]
152 {
153 if use_rtt {
154 stats.observed_rtt.add(rtt, now);
155 }
156 stats.in_flight.add(inner.in_flight, now);
157 }
158
159 inner.in_flight -= 1;
160 self.in_flight.emit(inner.in_flight as u64);
161
162 if use_rtt {
163 inner.current_rtt.update(rtt);
164 }
165 let current_rtt = inner.current_rtt.average();
166
167 #[cfg(test)]
175 let current_rtt = current_rtt.map(|c| (c * 1000000.0).round() / 1000000.0);
176
177 match inner.past_rtt.state() {
178 None => {
179 if let Some(current_rtt) = current_rtt {
181 inner.past_rtt.update(current_rtt);
182 inner.next_update = now + Duration::from_secs_f64(current_rtt);
183 }
184 }
185 Some(mut past_rtt) => {
186 if now >= inner.next_update {
187 #[cfg(test)]
188 {
189 if let Some(current_rtt) = current_rtt {
190 stats.averaged_rtt.add(current_rtt, now);
191 }
192 stats.concurrency_limit.add(inner.current_limit, now);
193 drop(stats); }
195
196 if let Some(current_rtt) = current_rtt {
197 self.averaged_rtt.emit(Duration::from_secs_f64(current_rtt));
198 }
199
200 if self.concurrency.is_none() {
202 self.manage_limit(&mut inner, past_rtt, current_rtt);
203 }
204
205 if let Some(current_rtt) = current_rtt {
207 past_rtt = inner.past_rtt.update(current_rtt);
208 }
209 inner.next_update = now + Duration::from_secs_f64(past_rtt.mean);
210 inner.current_rtt = Default::default();
211 inner.had_back_pressure = false;
212 inner.reached_limit = false;
213 }
214 }
215 }
216 }
217
218 fn manage_limit(
219 &self,
220 inner: &mut MutexGuard<Inner>,
221 past_rtt: MeanVariance,
222 current_rtt: Option<f64>,
223 ) {
224 let past_rtt_deviation = past_rtt.variance.sqrt();
225 let threshold = past_rtt_deviation * self.settings.rtt_deviation_scale;
226
227 if inner.current_limit < self.settings.max_concurrency_limit
232 && inner.reached_limit
233 && !inner.had_back_pressure
234 && current_rtt.is_some()
235 && current_rtt.unwrap() <= past_rtt.mean
236 {
237 self.semaphore.add_permits(1);
239 inner.current_limit += 1;
240 }
241 else if inner.current_limit > 1
245 && (inner.had_back_pressure || current_rtt.unwrap_or(0.0) >= past_rtt.mean + threshold)
246 {
247 let new_limit =
251 ((inner.current_limit as f64 * self.settings.decrease_ratio) as usize).max(1);
252 self.semaphore
253 .forget_permits(inner.current_limit - new_limit);
254 inner.current_limit = new_limit;
255 }
256 self.limit.emit(AdaptiveConcurrencyLimitData {
257 concurrency: inner.current_limit as u64,
258 reached_limit: inner.reached_limit,
259 had_back_pressure: inner.had_back_pressure,
260 current_rtt: current_rtt.map(Duration::from_secs_f64),
261 past_rtt: Duration::from_secs_f64(past_rtt.mean),
262 past_rtt_deviation: Duration::from_secs_f64(past_rtt_deviation),
263 });
264 }
265}
266
267impl<L> Controller<L>
268where
269 L: RetryLogic,
270{
271 pub(super) fn adjust_to_response(
272 &self,
273 start: Instant,
274 response: &Result<L::Response, crate::Error>,
275 ) {
276 let response_action = response
280 .as_ref()
281 .map(|resp| self.logic.should_retry_response(resp));
282 let is_back_pressure = match &response_action {
283 Ok(action) => matches!(action, RetryAction::Retry(_)),
284 Err(error) => {
285 if let Some(error) = error.downcast_ref::<L::Error>() {
286 self.logic.is_retriable_error(error)
287 } else if error.downcast_ref::<Elapsed>().is_some() {
288 true
289 } else if error.downcast_ref::<HttpError>().is_some() {
290 false
292 } else {
293 warn!(
294 message = "Unhandled error response.",
295 %error,
296 internal_log_rate_limit = true
297 );
298 false
299 }
300 }
301 };
302 let use_rtt = matches!(response_action, Ok(RetryAction::Successful));
304 self.adjust_to_response_inner(start, is_back_pressure, use_rtt)
305 }
306}