1use std::{marker::PhantomData, num::NonZeroUsize, time::Duration};
2
3use derivative::Derivative;
4use serde_with::serde_as;
5use snafu::Snafu;
6use vector_lib::configurable::configurable_component;
7use vector_lib::json_size::JsonSize;
8use vector_lib::stream::BatcherSettings;
9
10use super::EncodedEvent;
11use crate::{event::EventFinalizers, internal_events::LargeEventDroppedError};
12
13#[derive(Debug, Snafu, PartialEq, Eq)]
17pub enum BatchError {
18 #[snafu(display("This sink does not allow setting `max_bytes`"))]
19 BytesNotAllowed,
20 #[snafu(display("`max_bytes` must be greater than zero"))]
21 InvalidMaxBytes,
22 #[snafu(display("`max_events` must be greater than zero"))]
23 InvalidMaxEvents,
24 #[snafu(display("`timeout_secs` must be greater than zero"))]
25 InvalidTimeout,
26 #[snafu(display("provided `max_bytes` exceeds the maximum limit of {}", limit))]
27 MaxBytesExceeded { limit: usize },
28 #[snafu(display("provided `max_events` exceeds the maximum limit of {}", limit))]
29 MaxEventsExceeded { limit: usize },
30}
31
32pub trait SinkBatchSettings {
33 const MAX_EVENTS: Option<usize>;
34 const MAX_BYTES: Option<usize>;
35 const TIMEOUT_SECS: f64;
36}
37
38#[derive(Clone, Copy, Debug, Default)]
40pub struct RealtimeEventBasedDefaultBatchSettings;
41
42impl SinkBatchSettings for RealtimeEventBasedDefaultBatchSettings {
43 const MAX_EVENTS: Option<usize> = Some(1000);
44 const MAX_BYTES: Option<usize> = None;
45 const TIMEOUT_SECS: f64 = 1.0;
46}
47
48#[derive(Clone, Copy, Debug, Default)]
50pub struct RealtimeSizeBasedDefaultBatchSettings;
51
52impl SinkBatchSettings for RealtimeSizeBasedDefaultBatchSettings {
53 const MAX_EVENTS: Option<usize> = None;
54 const MAX_BYTES: Option<usize> = Some(10_000_000);
55 const TIMEOUT_SECS: f64 = 1.0;
56}
57
58#[derive(Clone, Copy, Debug, Default)]
61pub struct BulkSizeBasedDefaultBatchSettings;
62
63impl SinkBatchSettings for BulkSizeBasedDefaultBatchSettings {
64 const MAX_EVENTS: Option<usize> = None;
65 const MAX_BYTES: Option<usize> = Some(10_000_000);
66 const TIMEOUT_SECS: f64 = 300.0;
67}
68
69#[derive(Clone, Copy, Debug, Default)]
75pub struct NoDefaultsBatchSettings;
76
77impl SinkBatchSettings for NoDefaultsBatchSettings {
78 const MAX_EVENTS: Option<usize> = None;
79 const MAX_BYTES: Option<usize> = None;
80 const TIMEOUT_SECS: f64 = 1.0;
81}
82
83#[derive(Clone, Copy, Debug, Default)]
84pub struct Merged;
85
86#[derive(Clone, Copy, Debug, Default)]
87pub struct Unmerged;
88
89#[serde_as]
94#[configurable_component]
95#[configurable(metadata(docs::advanced))]
96#[derive(Clone, Copy, Debug, Default)]
97pub struct BatchConfig<D: SinkBatchSettings + Clone, S = Unmerged>
98where
99 S: Clone,
100{
101 #[serde(default = "default_max_bytes::<D>")]
106 #[configurable(metadata(docs::type_unit = "bytes"))]
107 pub max_bytes: Option<usize>,
108
109 #[serde(default = "default_max_events::<D>")]
111 #[configurable(metadata(docs::type_unit = "events"))]
112 pub max_events: Option<usize>,
113
114 #[serde(default = "default_timeout::<D>")]
116 #[configurable(metadata(docs::type_unit = "seconds"))]
117 #[configurable(metadata(docs::human_name = "Timeout"))]
118 pub timeout_secs: Option<f64>,
119
120 #[serde(skip)]
121 _d: PhantomData<D>,
122 #[serde(skip)]
123 _s: PhantomData<S>,
124}
125
126const fn default_max_bytes<D: SinkBatchSettings>() -> Option<usize> {
127 D::MAX_BYTES
128}
129
130const fn default_max_events<D: SinkBatchSettings>() -> Option<usize> {
131 D::MAX_EVENTS
132}
133
134const fn default_timeout<D: SinkBatchSettings>() -> Option<f64> {
135 Some(D::TIMEOUT_SECS)
136}
137
138impl<D: SinkBatchSettings + Clone> BatchConfig<D, Unmerged> {
139 pub fn validate(self) -> Result<BatchConfig<D, Merged>, BatchError> {
140 let config = BatchConfig {
141 max_bytes: self.max_bytes.or(D::MAX_BYTES),
142 max_events: self.max_events.or(D::MAX_EVENTS),
143 timeout_secs: self.timeout_secs.or(Some(D::TIMEOUT_SECS)),
144 _d: PhantomData,
145 _s: PhantomData,
146 };
147
148 match (config.max_bytes, config.max_events, config.timeout_secs) {
149 (Some(0), _, _) => Err(BatchError::InvalidMaxBytes),
155 (_, Some(0), _) => Err(BatchError::InvalidMaxEvents),
156 (_, _, Some(timeout)) if timeout <= 0.0 => Err(BatchError::InvalidTimeout),
157
158 _ => Ok(config),
159 }
160 }
161
162 pub fn into_batch_settings<T: Batch>(self) -> Result<BatchSettings<T>, BatchError> {
163 let config = self.validate()?;
164 config.into_batch_settings()
165 }
166
167 pub fn into_batcher_settings(self) -> Result<BatcherSettings, BatchError> {
174 let config = self.validate()?;
175 config.into_batcher_settings()
176 }
177}
178
179impl<D: SinkBatchSettings + Clone> BatchConfig<D, Merged> {
180 pub const fn validate(self) -> Result<BatchConfig<D, Merged>, BatchError> {
181 Ok(self)
182 }
183
184 pub const fn disallow_max_bytes(self) -> Result<Self, BatchError> {
185 match self.max_bytes {
188 Some(_) => Err(BatchError::BytesNotAllowed),
189 None => Ok(self),
190 }
191 }
192
193 pub const fn limit_max_bytes(self, limit: usize) -> Result<Self, BatchError> {
194 match self.max_bytes {
195 Some(n) if n > limit => Err(BatchError::MaxBytesExceeded { limit }),
196 _ => Ok(self),
197 }
198 }
199
200 pub const fn limit_max_events(self, limit: usize) -> Result<Self, BatchError> {
201 match self.max_events {
202 Some(n) if n > limit => Err(BatchError::MaxEventsExceeded { limit }),
203 _ => Ok(self),
204 }
205 }
206
207 pub fn into_batch_settings<T: Batch>(self) -> Result<BatchSettings<T>, BatchError> {
208 let adjusted = T::get_settings_defaults(self)?;
209
210 let timeout_secs = adjusted.timeout_secs.ok_or(BatchError::InvalidTimeout)?;
213
214 Ok(BatchSettings {
215 size: BatchSize {
216 bytes: adjusted.max_bytes.unwrap_or(usize::MAX),
217 events: adjusted.max_events.unwrap_or(usize::MAX),
218 _type_marker: PhantomData,
219 },
220 timeout: Duration::from_secs_f64(timeout_secs),
221 })
222 }
223
224 pub fn into_batcher_settings(self) -> Result<BatcherSettings, BatchError> {
231 let max_bytes = self
232 .max_bytes
233 .and_then(NonZeroUsize::new)
234 .or_else(|| NonZeroUsize::new(usize::MAX))
235 .expect("`max_bytes` should already be validated");
236
237 let max_events = self
238 .max_events
239 .and_then(NonZeroUsize::new)
240 .or_else(|| NonZeroUsize::new(usize::MAX))
241 .expect("`max_bytes` should already be validated");
242
243 let timeout_secs = self.timeout_secs.ok_or(BatchError::InvalidTimeout)?;
246
247 Ok(BatcherSettings::new(
248 Duration::from_secs_f64(timeout_secs),
249 max_bytes,
250 max_events,
251 ))
252 }
253}
254
255impl<D1, D2> From<BatchConfig<D1, Merged>> for BatchConfig<D2, Unmerged>
258where
259 D1: SinkBatchSettings + Clone,
260 D2: SinkBatchSettings + Clone,
261{
262 fn from(config: BatchConfig<D1, Merged>) -> Self {
263 BatchConfig {
264 max_bytes: config.max_bytes,
265 max_events: config.max_events,
266 timeout_secs: config.timeout_secs,
267 _d: PhantomData,
268 _s: PhantomData,
269 }
270 }
271}
272
273#[derive(Debug, Derivative)]
274#[derivative(Clone(bound = ""))]
275#[derivative(Copy(bound = ""))]
276pub struct BatchSize<B> {
277 pub bytes: usize,
278 pub events: usize,
279 _type_marker: PhantomData<B>,
283}
284
285impl<B> BatchSize<B> {
286 pub const fn const_default() -> Self {
287 BatchSize {
288 bytes: usize::MAX,
289 events: usize::MAX,
290 _type_marker: PhantomData,
291 }
292 }
293}
294
295impl<B> Default for BatchSize<B> {
296 fn default() -> Self {
297 BatchSize::const_default()
298 }
299}
300
301#[derive(Debug, Derivative)]
302#[derivative(Clone(bound = ""))]
303#[derivative(Copy(bound = ""))]
304pub struct BatchSettings<B> {
305 pub size: BatchSize<B>,
306 pub timeout: Duration,
307}
308
309impl<B> Default for BatchSettings<B> {
310 fn default() -> Self {
311 BatchSettings {
312 size: BatchSize {
313 bytes: 10_000_000,
314 events: usize::MAX,
315 _type_marker: PhantomData,
316 },
317 timeout: Duration::from_secs(1),
318 }
319 }
320}
321
322pub(super) fn err_event_too_large<T>(length: usize, max_length: usize) -> PushResult<T> {
323 emit!(LargeEventDroppedError { length, max_length });
324 PushResult::Ok(false)
325}
326
327#[must_use]
330#[derive(Debug, Eq, PartialEq)]
331pub enum PushResult<T> {
332 Ok(bool),
334 Overflow(T),
338}
339
340pub trait Batch: Sized {
341 type Input;
342 type Output;
343
344 fn get_settings_defaults<D: SinkBatchSettings + Clone>(
349 config: BatchConfig<D, Merged>,
350 ) -> Result<BatchConfig<D, Merged>, BatchError> {
351 Ok(config)
352 }
353
354 fn push(&mut self, item: Self::Input) -> PushResult<Self::Input>;
355 fn is_empty(&self) -> bool;
356 fn fresh(&self) -> Self;
357 fn finish(self) -> Self::Output;
358 fn num_items(&self) -> usize;
359}
360
361#[derive(Debug)]
362pub struct EncodedBatch<I> {
363 pub items: I,
364 pub finalizers: EventFinalizers,
365 pub count: usize,
366 pub byte_size: usize,
367 pub json_byte_size: JsonSize,
368}
369
370#[derive(Clone, Debug)]
372pub struct FinalizersBatch<B> {
373 inner: B,
374 finalizers: EventFinalizers,
375 count: usize,
379 byte_size: usize,
380 json_byte_size: JsonSize,
381}
382
383impl<B: Batch> From<B> for FinalizersBatch<B> {
384 fn from(inner: B) -> Self {
385 Self {
386 inner,
387 finalizers: Default::default(),
388 count: 0,
389 byte_size: 0,
390 json_byte_size: JsonSize::zero(),
391 }
392 }
393}
394
395impl<B: Batch> Batch for FinalizersBatch<B> {
396 type Input = EncodedEvent<B::Input>;
397 type Output = EncodedBatch<B::Output>;
398
399 fn get_settings_defaults<D: SinkBatchSettings + Clone>(
400 config: BatchConfig<D, Merged>,
401 ) -> Result<BatchConfig<D, Merged>, BatchError> {
402 B::get_settings_defaults(config)
403 }
404
405 fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
406 let EncodedEvent {
407 item,
408 finalizers,
409 byte_size,
410 json_byte_size,
411 } = item;
412 match self.inner.push(item) {
413 PushResult::Ok(full) => {
414 self.finalizers.merge(finalizers);
415 self.count += 1;
416 self.byte_size += byte_size;
417 self.json_byte_size += json_byte_size;
418 PushResult::Ok(full)
419 }
420 PushResult::Overflow(item) => PushResult::Overflow(EncodedEvent {
421 item,
422 finalizers,
423 byte_size,
424 json_byte_size,
425 }),
426 }
427 }
428
429 fn is_empty(&self) -> bool {
430 self.inner.is_empty()
431 }
432
433 fn fresh(&self) -> Self {
434 Self {
435 inner: self.inner.fresh(),
436 finalizers: Default::default(),
437 count: 0,
438 byte_size: 0,
439 json_byte_size: JsonSize::zero(),
440 }
441 }
442
443 fn finish(self) -> Self::Output {
444 EncodedBatch {
445 items: self.inner.finish(),
446 finalizers: self.finalizers,
447 count: self.count,
448 byte_size: self.byte_size,
449 json_byte_size: self.json_byte_size,
450 }
451 }
452
453 fn num_items(&self) -> usize {
454 self.inner.num_items()
455 }
456}
457
458#[derive(Clone, Debug)]
459pub struct StatefulBatch<B> {
460 inner: B,
461 was_full: bool,
462}
463
464impl<B: Batch> From<B> for StatefulBatch<B> {
465 fn from(inner: B) -> Self {
466 Self {
467 inner,
468 was_full: false,
469 }
470 }
471}
472
473impl<B> StatefulBatch<B> {
474 pub const fn was_full(&self) -> bool {
475 self.was_full
476 }
477
478 #[allow(clippy::missing_const_for_fn)] pub fn into_inner(self) -> B {
480 self.inner
481 }
482}
483
484impl<B: Batch> Batch for StatefulBatch<B> {
485 type Input = B::Input;
486 type Output = B::Output;
487
488 fn get_settings_defaults<D: SinkBatchSettings + Clone>(
489 config: BatchConfig<D, Merged>,
490 ) -> Result<BatchConfig<D, Merged>, BatchError> {
491 B::get_settings_defaults(config)
492 }
493
494 fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
495 if self.was_full {
496 PushResult::Overflow(item)
497 } else {
498 let result = self.inner.push(item);
499 self.was_full =
500 matches!(result, PushResult::Overflow(_)) || matches!(result, PushResult::Ok(true));
501 result
502 }
503 }
504
505 fn is_empty(&self) -> bool {
506 !self.was_full && self.inner.is_empty()
507 }
508
509 fn fresh(&self) -> Self {
510 Self {
511 inner: self.inner.fresh(),
512 was_full: false,
513 }
514 }
515
516 fn finish(self) -> Self::Output {
517 self.inner.finish()
518 }
519
520 fn num_items(&self) -> usize {
521 self.inner.num_items()
522 }
523}