1use std::{hash::Hash, marker::PhantomData, num::NonZeroU64, pin::Pin, sync::Arc, time::Duration};
2
3use futures_util::stream::{self, BoxStream};
4use serde_with::serde_as;
5use tower::{
6 balance::p2c::Balance,
7 buffer::{Buffer, BufferLayer},
8 discover::Change,
9 layer::{util::Stack, Layer},
10 limit::RateLimit,
11 retry::Retry,
12 timeout::Timeout,
13 Service, ServiceBuilder,
14};
15use vector_lib::configurable::configurable_component;
16
17pub use crate::sinks::util::service::{
18 concurrency::Concurrency,
19 health::{HealthConfig, HealthLogic, HealthService},
20 map::Map,
21};
22use crate::{
23 internal_events::OpenGauge,
24 sinks::util::{
25 adaptive_concurrency::{
26 AdaptiveConcurrencyLimit, AdaptiveConcurrencyLimitLayer, AdaptiveConcurrencySettings,
27 },
28 retries::{FibonacciRetryPolicy, JitterMode, RetryLogic},
29 service::map::MapLayer,
30 sink::Response,
31 Batch, BatchSink, Partition, PartitionBatchSink,
32 },
33};
34
35mod concurrency;
36mod health;
37mod map;
38pub mod net;
39
40pub type Svc<S, L> =
41 RateLimit<AdaptiveConcurrencyLimit<Retry<FibonacciRetryPolicy<L>, Timeout<S>>, L>>;
42pub type TowerBatchedSink<S, B, RL> = BatchSink<Svc<S, RL>, B>;
43pub type TowerPartitionSink<S, B, RL, K> = PartitionBatchSink<Svc<S, RL>, B, K>;
44
45pub type DistributedService<S, RL, HL, K, Req> = RateLimit<
47 Retry<
48 FibonacciRetryPolicy<RL>,
49 Buffer<Req, <Balance<DiscoveryService<S, RL, HL, K>, Req> as Service<Req>>::Future>,
50 >,
51>;
52pub type DiscoveryService<S, RL, HL, K> =
53 BoxStream<'static, Result<Change<K, SingleDistributedService<S, RL, HL>>, crate::Error>>;
54pub type SingleDistributedService<S, RL, HL> =
55 AdaptiveConcurrencyLimit<HealthService<Timeout<S>, HL>, RL>;
56
57pub trait ServiceBuilderExt<L> {
58 fn map<R1, R2, F>(self, f: F) -> ServiceBuilder<Stack<MapLayer<R1, R2>, L>>
59 where
60 F: Fn(R1) -> R2 + Send + Sync + 'static;
61
62 fn settings<RL, Request>(
63 self,
64 settings: TowerRequestSettings,
65 retry_logic: RL,
66 ) -> ServiceBuilder<Stack<TowerRequestLayer<RL, Request>, L>>;
67}
68
69impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
70 fn map<R1, R2, F>(self, f: F) -> ServiceBuilder<Stack<MapLayer<R1, R2>, L>>
71 where
72 F: Fn(R1) -> R2 + Send + Sync + 'static,
73 {
74 self.layer(MapLayer::new(Arc::new(f)))
75 }
76
77 fn settings<RL, Request>(
78 self,
79 settings: TowerRequestSettings,
80 retry_logic: RL,
81 ) -> ServiceBuilder<Stack<TowerRequestLayer<RL, Request>, L>> {
82 self.layer(TowerRequestLayer {
83 settings,
84 retry_logic,
85 _pd: std::marker::PhantomData,
86 })
87 }
88}
89
90pub trait TowerRequestConfigDefaults {
91 const CONCURRENCY: Concurrency = Concurrency::Adaptive;
92 const TIMEOUT_SECS: u64 = 60;
93 const RATE_LIMIT_DURATION_SECS: u64 = 1;
94 const RATE_LIMIT_NUM: u64 = i64::MAX as u64; const RETRY_ATTEMPTS: usize = isize::MAX as usize; const RETRY_MAX_DURATION_SECS: NonZeroU64 = NonZeroU64::new(30).unwrap();
97 const RETRY_INITIAL_BACKOFF_SECS: NonZeroU64 = NonZeroU64::new(1).unwrap();
98}
99
100#[derive(Clone, Copy, Debug)]
101pub struct GlobalTowerRequestConfigDefaults;
102
103impl TowerRequestConfigDefaults for GlobalTowerRequestConfigDefaults {}
104
105#[serde_as]
111#[configurable_component]
112#[configurable(metadata(docs::advanced))]
113#[derive(Clone, Copy, Debug)]
114pub struct TowerRequestConfig<D: TowerRequestConfigDefaults = GlobalTowerRequestConfigDefaults> {
115 #[configurable(derived)]
116 #[serde(default = "default_concurrency::<D>")]
117 #[serde(skip_serializing_if = "concurrency_is_default::<D>")]
118 pub concurrency: Concurrency,
119
120 #[configurable(metadata(docs::type_unit = "seconds"))]
125 #[configurable(metadata(docs::human_name = "Timeout"))]
126 #[serde(default = "default_timeout_secs::<D>")]
127 pub timeout_secs: u64,
128
129 #[configurable(metadata(docs::type_unit = "seconds"))]
131 #[configurable(metadata(docs::human_name = "Rate Limit Duration"))]
132 #[serde(default = "default_rate_limit_duration_secs::<D>")]
133 pub rate_limit_duration_secs: u64,
134
135 #[configurable(metadata(docs::type_unit = "requests"))]
137 #[configurable(metadata(docs::human_name = "Rate Limit Number"))]
138 #[serde(default = "default_rate_limit_num::<D>")]
139 pub rate_limit_num: u64,
140
141 #[configurable(metadata(docs::type_unit = "retries"))]
143 #[serde(default = "default_retry_attempts::<D>")]
144 pub retry_attempts: usize,
145
146 #[configurable(metadata(docs::type_unit = "seconds"))]
148 #[configurable(metadata(docs::human_name = "Max Retry Duration"))]
149 #[serde(default = "default_retry_max_duration_secs::<D>")]
150 pub retry_max_duration_secs: NonZeroU64,
151
152 #[configurable(metadata(docs::type_unit = "seconds"))]
156 #[configurable(metadata(docs::human_name = "Retry Initial Backoff"))]
157 #[serde(default = "default_retry_initial_backoff_secs::<D>")]
158 pub retry_initial_backoff_secs: NonZeroU64,
159
160 #[configurable(derived)]
161 #[serde(default)]
162 pub retry_jitter_mode: JitterMode,
163
164 #[configurable(derived)]
165 #[serde(default)]
166 pub adaptive_concurrency: AdaptiveConcurrencySettings,
167
168 #[serde(skip)]
169 pub _d: PhantomData<D>,
170}
171
172const fn default_concurrency<D: TowerRequestConfigDefaults>() -> Concurrency {
173 D::CONCURRENCY
174}
175
176fn concurrency_is_default<D: TowerRequestConfigDefaults>(concurrency: &Concurrency) -> bool {
177 *concurrency == D::CONCURRENCY
178}
179
180const fn default_timeout_secs<D: TowerRequestConfigDefaults>() -> u64 {
181 D::TIMEOUT_SECS
182}
183
184const fn default_rate_limit_duration_secs<D: TowerRequestConfigDefaults>() -> u64 {
185 D::RATE_LIMIT_DURATION_SECS
186}
187
188const fn default_rate_limit_num<D: TowerRequestConfigDefaults>() -> u64 {
189 D::RATE_LIMIT_NUM
190}
191
192const fn default_retry_attempts<D: TowerRequestConfigDefaults>() -> usize {
193 D::RETRY_ATTEMPTS
194}
195
196const fn default_retry_max_duration_secs<D: TowerRequestConfigDefaults>() -> NonZeroU64 {
197 D::RETRY_MAX_DURATION_SECS
198}
199
200const fn default_retry_initial_backoff_secs<D: TowerRequestConfigDefaults>() -> NonZeroU64 {
201 D::RETRY_INITIAL_BACKOFF_SECS
202}
203
204impl<D: TowerRequestConfigDefaults> Default for TowerRequestConfig<D> {
205 fn default() -> Self {
206 Self {
207 concurrency: default_concurrency::<D>(),
208 timeout_secs: default_timeout_secs::<D>(),
209 rate_limit_duration_secs: default_rate_limit_duration_secs::<D>(),
210 rate_limit_num: default_rate_limit_num::<D>(),
211 retry_attempts: default_retry_attempts::<D>(),
212 retry_max_duration_secs: default_retry_max_duration_secs::<D>(),
213 retry_initial_backoff_secs: default_retry_initial_backoff_secs::<D>(),
214 adaptive_concurrency: AdaptiveConcurrencySettings::default(),
215 retry_jitter_mode: JitterMode::default(),
216
217 _d: PhantomData,
218 }
219 }
220}
221
222impl<D: TowerRequestConfigDefaults> TowerRequestConfig<D> {
223 pub const fn into_settings(&self) -> TowerRequestSettings {
224 TowerRequestSettings {
226 concurrency: self.concurrency.parse_concurrency(),
227 timeout: Duration::from_secs(self.timeout_secs),
228 rate_limit_duration: Duration::from_secs(self.rate_limit_duration_secs),
229 rate_limit_num: self.rate_limit_num,
230 retry_attempts: self.retry_attempts,
231 retry_max_duration: Duration::from_secs(self.retry_max_duration_secs.get()),
232 retry_initial_backoff: Duration::from_secs(self.retry_initial_backoff_secs.get()),
233 adaptive_concurrency: self.adaptive_concurrency,
234 retry_jitter_mode: self.retry_jitter_mode,
235 }
236 }
237}
238
239#[derive(Debug, Clone)]
240pub struct TowerRequestSettings {
241 pub concurrency: Option<usize>,
242 pub timeout: Duration,
243 pub rate_limit_duration: Duration,
244 pub rate_limit_num: u64,
245 pub retry_attempts: usize,
246 pub retry_max_duration: Duration,
247 pub retry_initial_backoff: Duration,
248 pub adaptive_concurrency: AdaptiveConcurrencySettings,
249 pub retry_jitter_mode: JitterMode,
250}
251
252impl TowerRequestSettings {
253 pub fn retry_policy<L: RetryLogic>(&self, logic: L) -> FibonacciRetryPolicy<L> {
254 FibonacciRetryPolicy::new(
255 self.retry_attempts,
256 self.retry_initial_backoff,
257 self.retry_max_duration,
258 logic,
259 self.retry_jitter_mode,
260 )
261 }
262
263 pub fn partition_sink<B, RL, S, K>(
265 &self,
266 retry_logic: RL,
267 service: S,
268 batch: B,
269 batch_timeout: Duration,
270 ) -> TowerPartitionSink<S, B, RL, K>
271 where
272 RL: RetryLogic<Request = <B as Batch>::Output, Response = S::Response>,
273 S: Service<B::Output> + Clone + Send + 'static,
274 S::Error: Into<crate::Error> + Send + Sync + 'static,
275 S::Response: Send + Response,
276 S::Future: Send + 'static,
277 B: Batch,
278 B::Input: Partition<K>,
279 B::Output: Send + Clone + 'static,
280 K: Hash + Eq + Clone + Send + 'static,
281 {
282 let service = ServiceBuilder::new()
283 .settings(self.clone(), retry_logic)
284 .service(service);
285 PartitionBatchSink::new(service, batch, batch_timeout)
286 }
287
288 pub fn batch_sink<B, RL, S>(
290 &self,
291 retry_logic: RL,
292 service: S,
293 batch: B,
294 batch_timeout: Duration,
295 ) -> TowerBatchedSink<S, B, RL>
296 where
297 RL: RetryLogic<Request = <B as Batch>::Output, Response = S::Response>,
298 S: Service<B::Output> + Clone + Send + 'static,
299 S::Error: Into<crate::Error> + Send + Sync + 'static,
300 S::Response: Send + Response,
301 S::Future: Send + 'static,
302 B: Batch,
303 B::Output: Send + Clone + 'static,
304 {
305 let service = ServiceBuilder::new()
306 .settings(self.clone(), retry_logic)
307 .service(service);
308 BatchSink::new(service, batch, batch_timeout)
309 }
310
311 pub fn distributed_service<Req, RL, HL, S>(
316 self,
317 retry_logic: RL,
318 services: Vec<(String, S)>,
319 health_config: HealthConfig,
320 health_logic: HL,
321 buffer_bound: usize,
322 ) -> DistributedService<S, RL, HL, usize, Req>
323 where
324 Req: Clone + Send + 'static,
325 RL: RetryLogic<Response = S::Response>,
326 HL: HealthLogic<Response = S::Response, Error = crate::Error>,
327 S: Service<Req> + Clone + Send + 'static,
328 S::Error: Into<crate::Error> + Send + Sync + 'static,
329 S::Response: Send,
330 S::Future: Send + 'static,
331 {
332 let policy = self.retry_policy(retry_logic.clone());
333
334 let open = OpenGauge::new();
336 let services = services
337 .into_iter()
338 .map(|(endpoint, inner)| {
339 ServiceBuilder::new()
341 .layer(AdaptiveConcurrencyLimitLayer::new(
342 self.concurrency,
343 self.adaptive_concurrency,
344 retry_logic.clone(),
345 ))
346 .service(
347 health_config.build(
348 health_logic.clone(),
349 ServiceBuilder::new().timeout(self.timeout).service(inner),
350 open.clone(),
351 endpoint,
352 ), )
356 })
357 .enumerate()
358 .map(|(i, service)| Ok::<_, S::Error>(Change::Insert(i, service)))
359 .collect::<Vec<_>>();
360
361 ServiceBuilder::new()
363 .rate_limit(self.rate_limit_num, self.rate_limit_duration)
364 .retry(policy)
365 .layer(BufferLayer::new(buffer_bound))
367 .service(Balance::new(Box::pin(stream::iter(services)) as Pin<Box<_>>))
368 }
369}
370
371#[derive(Debug, Clone)]
372pub struct TowerRequestLayer<L, Request> {
373 settings: TowerRequestSettings,
374 retry_logic: L,
375 _pd: PhantomData<Request>,
376}
377
378impl<S, RL, Request> Layer<S> for TowerRequestLayer<RL, Request>
379where
380 S: Service<Request> + Send + 'static,
381 S::Response: Send + 'static,
382 S::Error: Into<crate::Error> + Send + Sync + 'static,
383 S::Future: Send + 'static,
384 RL: RetryLogic<Response = S::Response> + Send + 'static,
385 Request: Clone + Send + 'static,
386{
387 type Service = Svc<S, RL>;
388
389 fn layer(&self, inner: S) -> Self::Service {
390 let policy = self.settings.retry_policy(self.retry_logic.clone());
391 ServiceBuilder::new()
392 .rate_limit(
393 self.settings.rate_limit_num,
394 self.settings.rate_limit_duration,
395 )
396 .layer(AdaptiveConcurrencyLimitLayer::new(
397 self.settings.concurrency,
398 self.settings.adaptive_concurrency,
399 self.retry_logic.clone(),
400 ))
401 .retry(policy)
402 .timeout(self.settings.timeout)
403 .service(inner)
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use std::sync::{
410 atomic::{AtomicBool, Ordering::AcqRel},
411 Arc, Mutex,
412 };
413
414 use futures::{future, stream, FutureExt, SinkExt, StreamExt};
415 use tokio::time::Duration;
416 use vector_lib::json_size::JsonSize;
417
418 use super::*;
419 use crate::sinks::util::{
420 retries::{RetryAction, RetryLogic},
421 BatchSettings, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, VecBuffer,
422 };
423
424 const TIMEOUT: Duration = Duration::from_secs(10);
425
426 #[test]
427 fn concurrency_param_works() {
428 let cfg = TowerRequestConfig::<GlobalTowerRequestConfigDefaults>::default();
429 let toml = toml::to_string(&cfg).unwrap();
430 toml::from_str::<TowerRequestConfig>(&toml).expect("Default config failed");
431
432 let cfg = toml::from_str::<TowerRequestConfig>("").expect("Empty config failed");
433 assert_eq!(cfg.concurrency, Concurrency::Adaptive);
434
435 let cfg = toml::from_str::<TowerRequestConfig>("concurrency = 10")
436 .expect("Fixed concurrency failed");
437 assert_eq!(cfg.concurrency, Concurrency::Fixed(10));
438
439 let cfg = toml::from_str::<TowerRequestConfig>(r#"concurrency = "adaptive""#)
440 .expect("Adaptive concurrency setting failed");
441 assert_eq!(cfg.concurrency, Concurrency::Adaptive);
442
443 let cfg = toml::from_str::<TowerRequestConfig>(r#"concurrency = "none""#)
444 .expect("None concurrency setting failed");
445 assert_eq!(cfg.concurrency, Concurrency::None);
446
447 toml::from_str::<TowerRequestConfig>(r#"concurrency = "broken""#)
448 .expect_err("Invalid concurrency setting didn't fail");
449
450 toml::from_str::<TowerRequestConfig>(r"concurrency = 0")
451 .expect_err("Invalid concurrency setting didn't fail on zero");
452
453 toml::from_str::<TowerRequestConfig>(r"concurrency = -9")
454 .expect_err("Invalid concurrency setting didn't fail on negative number");
455 }
456
457 #[test]
458 fn into_settings_with_global_defaults() {
459 let cfg = TowerRequestConfig::<GlobalTowerRequestConfigDefaults>::default();
460 let settings = cfg.into_settings();
461
462 assert_eq!(settings.concurrency, None);
463 assert_eq!(settings.timeout, Duration::from_secs(60));
464 assert_eq!(settings.rate_limit_duration, Duration::from_secs(1));
465 assert_eq!(settings.rate_limit_num, i64::MAX as u64);
466 assert_eq!(settings.retry_attempts, isize::MAX as usize);
467 assert_eq!(settings.retry_max_duration, Duration::from_secs(30));
468 assert_eq!(settings.retry_initial_backoff, Duration::from_secs(1));
469 }
470
471 #[derive(Clone, Copy, Debug)]
472 pub struct TestTowerRequestConfigDefaults;
473
474 impl TowerRequestConfigDefaults for TestTowerRequestConfigDefaults {
475 const CONCURRENCY: Concurrency = Concurrency::None;
476 const TIMEOUT_SECS: u64 = 1;
477 const RATE_LIMIT_DURATION_SECS: u64 = 2;
478 const RATE_LIMIT_NUM: u64 = 3;
479 const RETRY_ATTEMPTS: usize = 4;
480 const RETRY_MAX_DURATION_SECS: NonZeroU64 = NonZeroU64::new(5).unwrap();
481 const RETRY_INITIAL_BACKOFF_SECS: NonZeroU64 = NonZeroU64::new(6).unwrap();
482 }
483
484 #[test]
485 fn into_settings_with_overridden_defaults() {
486 let cfg = TowerRequestConfig::<TestTowerRequestConfigDefaults>::default();
487 let settings = cfg.into_settings();
488
489 assert_eq!(settings.concurrency, Some(1));
490 assert_eq!(settings.timeout, Duration::from_secs(1));
491 assert_eq!(settings.rate_limit_duration, Duration::from_secs(2));
492 assert_eq!(settings.rate_limit_num, 3);
493 assert_eq!(settings.retry_attempts, 4);
494 assert_eq!(settings.retry_max_duration, Duration::from_secs(5));
495 assert_eq!(settings.retry_initial_backoff, Duration::from_secs(6));
496 }
497
498 #[test]
499 fn into_settings_with_populated_config() {
500 let cfg = toml::from_str::<TowerRequestConfig>(
502 r" concurrency = 16
503 timeout_secs = 1
504 rate_limit_duration_secs = 2
505 rate_limit_num = 3
506 retry_attempts = 4
507 retry_max_duration_secs = 5
508 retry_initial_backoff_secs = 6
509 ",
510 )
511 .expect("Config failed to parse");
512
513 let settings = cfg.into_settings();
515 assert_eq!(
516 settings.concurrency,
517 Concurrency::Fixed(16).parse_concurrency()
518 );
519 assert_eq!(settings.timeout, Duration::from_secs(1));
520 assert_eq!(settings.rate_limit_duration, Duration::from_secs(2));
521 assert_eq!(settings.rate_limit_num, 3);
522 assert_eq!(settings.retry_attempts, 4);
523 assert_eq!(settings.retry_max_duration, Duration::from_secs(5));
524 assert_eq!(settings.retry_initial_backoff, Duration::from_secs(6));
525 }
526
527 #[tokio::test]
528 async fn partition_sink_retry_concurrency() {
529 let cfg: TowerRequestConfig<GlobalTowerRequestConfigDefaults> = TowerRequestConfig {
530 concurrency: Concurrency::Fixed(1),
531 ..TowerRequestConfig::default()
532 };
533 let settings = cfg.into_settings();
534
535 let sent_requests = Arc::new(Mutex::new(Vec::new()));
536
537 let svc = {
538 let sent_requests = Arc::clone(&sent_requests);
539 let delay = Arc::new(AtomicBool::new(true));
540 tower::service_fn(move |req: PartitionInnerBuffer<Vec<usize>, Vec<usize>>| {
541 let (req, _) = req.into_parts();
542 if delay.swap(false, AcqRel) {
543 future::err::<(), _>(std::io::Error::other("")).boxed()
545 } else {
546 sent_requests.lock().unwrap().push(req);
547 future::ok::<_, std::io::Error>(()).boxed()
548 }
549 })
550 };
551
552 let mut batch_settings = BatchSettings::default();
553 batch_settings.size.bytes = 9999;
554 batch_settings.size.events = 10;
555
556 let mut sink = settings.partition_sink(
557 RetryAlways,
558 svc,
559 PartitionBuffer::new(VecBuffer::new(batch_settings.size)),
560 TIMEOUT,
561 );
562 sink.ordered();
563
564 let input = (0..20).map(|i| PartitionInnerBuffer::new(i, vec![0]));
565 sink.sink_map_err(drop)
566 .send_all(
567 &mut stream::iter(input)
568 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
569 )
570 .await
571 .unwrap();
572
573 let output = sent_requests.lock().unwrap();
574 assert_eq!(
575 &*output,
576 &vec![(0..10).collect::<Vec<_>>(), (10..20).collect::<Vec<_>>(),]
577 );
578 }
579
580 #[derive(Clone, Debug, Copy)]
581 struct RetryAlways;
582
583 impl RetryLogic for RetryAlways {
584 type Error = std::io::Error;
585 type Request = PartitionInnerBuffer<Vec<usize>, Vec<usize>>;
586 type Response = ();
587
588 fn is_retriable_error(&self, _: &Self::Error) -> bool {
589 true
590 }
591
592 fn should_retry_response(&self, _response: &Self::Response) -> RetryAction<Self::Request> {
593 RetryAction::Successful
595 }
596 }
597}