vector/sinks/util/
batch.rs

1use std::{marker::PhantomData, num::NonZeroUsize, time::Duration};
2
3use derivative::Derivative;
4use serde_with::serde_as;
5use snafu::Snafu;
6use vector_lib::configurable::configurable_component;
7use vector_lib::json_size::JsonSize;
8use vector_lib::stream::BatcherSettings;
9
10use super::EncodedEvent;
11use crate::{event::EventFinalizers, internal_events::LargeEventDroppedError};
12
13// * Provide sensible sink default 10 MB with 1s timeout. Don't allow chaining builder methods on
14//   that.
15
16#[derive(Debug, Snafu, PartialEq, Eq)]
17pub enum BatchError {
18    #[snafu(display("This sink does not allow setting `max_bytes`"))]
19    BytesNotAllowed,
20    #[snafu(display("`max_bytes` must be greater than zero"))]
21    InvalidMaxBytes,
22    #[snafu(display("`max_events` must be greater than zero"))]
23    InvalidMaxEvents,
24    #[snafu(display("`timeout_secs` must be greater than zero"))]
25    InvalidTimeout,
26    #[snafu(display("provided `max_bytes` exceeds the maximum limit of {}", limit))]
27    MaxBytesExceeded { limit: usize },
28    #[snafu(display("provided `max_events` exceeds the maximum limit of {}", limit))]
29    MaxEventsExceeded { limit: usize },
30}
31
32pub trait SinkBatchSettings {
33    const MAX_EVENTS: Option<usize>;
34    const MAX_BYTES: Option<usize>;
35    const TIMEOUT_SECS: f64;
36}
37
38/// Reasonable default batch settings for sinks with timeliness concerns, limited by event count.
39#[derive(Clone, Copy, Debug, Default)]
40pub struct RealtimeEventBasedDefaultBatchSettings;
41
42impl SinkBatchSettings for RealtimeEventBasedDefaultBatchSettings {
43    const MAX_EVENTS: Option<usize> = Some(1000);
44    const MAX_BYTES: Option<usize> = None;
45    const TIMEOUT_SECS: f64 = 1.0;
46}
47
48/// Reasonable default batch settings for sinks with timeliness concerns, limited by byte size.
49#[derive(Clone, Copy, Debug, Default)]
50pub struct RealtimeSizeBasedDefaultBatchSettings;
51
52impl SinkBatchSettings for RealtimeSizeBasedDefaultBatchSettings {
53    const MAX_EVENTS: Option<usize> = None;
54    const MAX_BYTES: Option<usize> = Some(10_000_000);
55    const TIMEOUT_SECS: f64 = 1.0;
56}
57
58/// Reasonable default batch settings for sinks focused on shipping fewer-but-larger batches,
59/// limited by byte size.
60#[derive(Clone, Copy, Debug, Default)]
61pub struct BulkSizeBasedDefaultBatchSettings;
62
63impl SinkBatchSettings for BulkSizeBasedDefaultBatchSettings {
64    const MAX_EVENTS: Option<usize> = None;
65    const MAX_BYTES: Option<usize> = Some(10_000_000);
66    const TIMEOUT_SECS: f64 = 300.0;
67}
68
69/// "Default" batch settings when a sink handles batch settings entirely on its own.
70///
71/// This has very few usages, but can be notably seen in the Kafka sink, where the values are used
72/// to configure `librdkafka` itself rather than being passed as `BatchSettings`/`BatcherSettings`
73/// to components in the sink itself.
74#[derive(Clone, Copy, Debug, Default)]
75pub struct NoDefaultsBatchSettings;
76
77impl SinkBatchSettings for NoDefaultsBatchSettings {
78    const MAX_EVENTS: Option<usize> = None;
79    const MAX_BYTES: Option<usize> = None;
80    const TIMEOUT_SECS: f64 = 1.0;
81}
82
83#[derive(Clone, Copy, Debug, Default)]
84pub struct Merged;
85
86#[derive(Clone, Copy, Debug, Default)]
87pub struct Unmerged;
88
89/// Event batching behavior.
90// NOTE: the default values are extracted from the consts in `D`. This generates correct defaults
91// in automatic cue docs generation. Implementations of `SinkBatchSettings` should not specify
92// defaults, since that is satisfied here.
93#[serde_as]
94#[configurable_component]
95#[configurable(metadata(docs::advanced))]
96#[derive(Clone, Copy, Debug, Default)]
97pub struct BatchConfig<D: SinkBatchSettings + Clone, S = Unmerged>
98where
99    S: Clone,
100{
101    /// The maximum size of a batch that is processed by a sink.
102    ///
103    /// This is based on the uncompressed size of the batched events, before they are
104    /// serialized or compressed.
105    #[serde(default = "default_max_bytes::<D>")]
106    #[configurable(metadata(docs::type_unit = "bytes"))]
107    pub max_bytes: Option<usize>,
108
109    /// The maximum size of a batch before it is flushed.
110    #[serde(default = "default_max_events::<D>")]
111    #[configurable(metadata(docs::type_unit = "events"))]
112    pub max_events: Option<usize>,
113
114    /// The maximum age of a batch before it is flushed.
115    #[serde(default = "default_timeout::<D>")]
116    #[configurable(metadata(docs::type_unit = "seconds"))]
117    #[configurable(metadata(docs::human_name = "Timeout"))]
118    pub timeout_secs: Option<f64>,
119
120    #[serde(skip)]
121    _d: PhantomData<D>,
122    #[serde(skip)]
123    _s: PhantomData<S>,
124}
125
126const fn default_max_bytes<D: SinkBatchSettings>() -> Option<usize> {
127    D::MAX_BYTES
128}
129
130const fn default_max_events<D: SinkBatchSettings>() -> Option<usize> {
131    D::MAX_EVENTS
132}
133
134const fn default_timeout<D: SinkBatchSettings>() -> Option<f64> {
135    Some(D::TIMEOUT_SECS)
136}
137
138impl<D: SinkBatchSettings + Clone> BatchConfig<D, Unmerged> {
139    pub fn validate(self) -> Result<BatchConfig<D, Merged>, BatchError> {
140        let config = BatchConfig {
141            max_bytes: self.max_bytes.or(D::MAX_BYTES),
142            max_events: self.max_events.or(D::MAX_EVENTS),
143            timeout_secs: self.timeout_secs.or(Some(D::TIMEOUT_SECS)),
144            _d: PhantomData,
145            _s: PhantomData,
146        };
147
148        match (config.max_bytes, config.max_events, config.timeout_secs) {
149            // TODO: what logic do we want to check that we have the minimum number of settings?
150            // for example, we always assert that timeout_secs from D is greater than zero, but
151            // technically we could end up with max bytes or max events being none, since we just
152            // chain options... but asserting that they're set isn't really doable either, because
153            // you dont always set both of those fields, etc..
154            (Some(0), _, _) => Err(BatchError::InvalidMaxBytes),
155            (_, Some(0), _) => Err(BatchError::InvalidMaxEvents),
156            (_, _, Some(timeout)) if timeout <= 0.0 => Err(BatchError::InvalidTimeout),
157
158            _ => Ok(config),
159        }
160    }
161
162    pub fn into_batch_settings<T: Batch>(self) -> Result<BatchSettings<T>, BatchError> {
163        let config = self.validate()?;
164        config.into_batch_settings()
165    }
166
167    /// Converts these settings into [`BatcherSettings`].
168    ///
169    /// `BatcherSettings` is effectively the `vector_core` spiritual successor of
170    /// [`BatchSettings<B>`].  Once all sinks are rewritten in the new stream-based style and we can
171    /// eschew customized batch buffer types, we can de-genericify `BatchSettings` and move it into
172    /// `vector_core`, and use that instead of `BatcherSettings`.
173    pub fn into_batcher_settings(self) -> Result<BatcherSettings, BatchError> {
174        let config = self.validate()?;
175        config.into_batcher_settings()
176    }
177}
178
179impl<D: SinkBatchSettings + Clone> BatchConfig<D, Merged> {
180    pub const fn validate(self) -> Result<BatchConfig<D, Merged>, BatchError> {
181        Ok(self)
182    }
183
184    pub const fn disallow_max_bytes(self) -> Result<Self, BatchError> {
185        // Sinks that used `max_size` for an event count cannot count
186        // bytes, so err if `max_bytes` is set.
187        match self.max_bytes {
188            Some(_) => Err(BatchError::BytesNotAllowed),
189            None => Ok(self),
190        }
191    }
192
193    pub const fn limit_max_bytes(self, limit: usize) -> Result<Self, BatchError> {
194        match self.max_bytes {
195            Some(n) if n > limit => Err(BatchError::MaxBytesExceeded { limit }),
196            _ => Ok(self),
197        }
198    }
199
200    pub const fn limit_max_events(self, limit: usize) -> Result<Self, BatchError> {
201        match self.max_events {
202            Some(n) if n > limit => Err(BatchError::MaxEventsExceeded { limit }),
203            _ => Ok(self),
204        }
205    }
206
207    pub fn into_batch_settings<T: Batch>(self) -> Result<BatchSettings<T>, BatchError> {
208        let adjusted = T::get_settings_defaults(self)?;
209
210        // This is unfortunate since we technically have already made sure this isn't possible in
211        // `validate`, but alas.
212        let timeout_secs = adjusted.timeout_secs.ok_or(BatchError::InvalidTimeout)?;
213
214        Ok(BatchSettings {
215            size: BatchSize {
216                bytes: adjusted.max_bytes.unwrap_or(usize::MAX),
217                events: adjusted.max_events.unwrap_or(usize::MAX),
218                _type_marker: PhantomData,
219            },
220            timeout: Duration::from_secs_f64(timeout_secs),
221        })
222    }
223
224    /// Converts these settings into [`BatcherSettings`].
225    ///
226    /// `BatcherSettings` is effectively the `vector_core` spiritual successor of
227    /// [`BatchSettings<B>`].  Once all sinks are rewritten in the new stream-based style and we can
228    /// eschew customized batch buffer types, we can de-genericify `BatchSettings` and move it into
229    /// `vector_core`, and use that instead of `BatcherSettings`.
230    pub fn into_batcher_settings(self) -> Result<BatcherSettings, BatchError> {
231        let max_bytes = self
232            .max_bytes
233            .and_then(NonZeroUsize::new)
234            .or_else(|| NonZeroUsize::new(usize::MAX))
235            .expect("`max_bytes` should already be validated");
236
237        let max_events = self
238            .max_events
239            .and_then(NonZeroUsize::new)
240            .or_else(|| NonZeroUsize::new(usize::MAX))
241            .expect("`max_bytes` should already be validated");
242
243        // This is unfortunate since we technically have already made sure this isn't possible in
244        // `validate`, but alas.
245        let timeout_secs = self.timeout_secs.ok_or(BatchError::InvalidTimeout)?;
246
247        Ok(BatcherSettings::new(
248            Duration::from_secs_f64(timeout_secs),
249            max_bytes,
250            max_events,
251        ))
252    }
253}
254
255// Going from a merged to unmerged configuration is fine, because we know it already had to have
256// been validated/limited.
257impl<D1, D2> From<BatchConfig<D1, Merged>> for BatchConfig<D2, Unmerged>
258where
259    D1: SinkBatchSettings + Clone,
260    D2: SinkBatchSettings + Clone,
261{
262    fn from(config: BatchConfig<D1, Merged>) -> Self {
263        BatchConfig {
264            max_bytes: config.max_bytes,
265            max_events: config.max_events,
266            timeout_secs: config.timeout_secs,
267            _d: PhantomData,
268            _s: PhantomData,
269        }
270    }
271}
272
273#[derive(Debug, Derivative)]
274#[derivative(Clone(bound = ""))]
275#[derivative(Copy(bound = ""))]
276pub struct BatchSize<B> {
277    pub bytes: usize,
278    pub events: usize,
279    // This type marker is used to drive type inference, which allows us
280    // to call the right Batch::get_settings_defaults without explicitly
281    // naming the type in BatchSettings::parse_config.
282    _type_marker: PhantomData<B>,
283}
284
285impl<B> BatchSize<B> {
286    pub const fn const_default() -> Self {
287        BatchSize {
288            bytes: usize::MAX,
289            events: usize::MAX,
290            _type_marker: PhantomData,
291        }
292    }
293}
294
295impl<B> Default for BatchSize<B> {
296    fn default() -> Self {
297        BatchSize::const_default()
298    }
299}
300
301#[derive(Debug, Derivative)]
302#[derivative(Clone(bound = ""))]
303#[derivative(Copy(bound = ""))]
304pub struct BatchSettings<B> {
305    pub size: BatchSize<B>,
306    pub timeout: Duration,
307}
308
309impl<B> Default for BatchSettings<B> {
310    fn default() -> Self {
311        BatchSettings {
312            size: BatchSize {
313                bytes: 10_000_000,
314                events: usize::MAX,
315                _type_marker: PhantomData,
316            },
317            timeout: Duration::from_secs(1),
318        }
319    }
320}
321
322pub(super) fn err_event_too_large<T>(length: usize, max_length: usize) -> PushResult<T> {
323    emit!(LargeEventDroppedError { length, max_length });
324    PushResult::Ok(false)
325}
326
327/// This enum provides the result of a push operation, indicating if the
328/// event was added and the fullness state of the buffer.
329#[must_use]
330#[derive(Debug, Eq, PartialEq)]
331pub enum PushResult<T> {
332    /// Event was added, with an indicator if the buffer is now full
333    Ok(bool),
334    /// Event could not be added because it would overflow the
335    /// buffer. Since push takes ownership of the event, it must be
336    /// returned here.
337    Overflow(T),
338}
339
340pub trait Batch: Sized {
341    type Input;
342    type Output;
343
344    /// Turn the batch configuration into an actualized set of settings,
345    /// and deal with the proper behavior of `max_size` and if
346    /// `max_bytes` may be set. This is in the trait to ensure all batch
347    /// buffers implement it.
348    fn get_settings_defaults<D: SinkBatchSettings + Clone>(
349        config: BatchConfig<D, Merged>,
350    ) -> Result<BatchConfig<D, Merged>, BatchError> {
351        Ok(config)
352    }
353
354    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input>;
355    fn is_empty(&self) -> bool;
356    fn fresh(&self) -> Self;
357    fn finish(self) -> Self::Output;
358    fn num_items(&self) -> usize;
359}
360
361#[derive(Debug)]
362pub struct EncodedBatch<I> {
363    pub items: I,
364    pub finalizers: EventFinalizers,
365    pub count: usize,
366    pub byte_size: usize,
367    pub json_byte_size: JsonSize,
368}
369
370/// This is a batch construct that stores an set of event finalizers alongside the batch itself.
371#[derive(Clone, Debug)]
372pub struct FinalizersBatch<B> {
373    inner: B,
374    finalizers: EventFinalizers,
375    // The count of items inserted into this batch is distinct from the
376    // number of items recorded by the inner batch, as that inner count
377    // could be smaller due to aggregated items (ie metrics).
378    count: usize,
379    byte_size: usize,
380    json_byte_size: JsonSize,
381}
382
383impl<B: Batch> From<B> for FinalizersBatch<B> {
384    fn from(inner: B) -> Self {
385        Self {
386            inner,
387            finalizers: Default::default(),
388            count: 0,
389            byte_size: 0,
390            json_byte_size: JsonSize::zero(),
391        }
392    }
393}
394
395impl<B: Batch> Batch for FinalizersBatch<B> {
396    type Input = EncodedEvent<B::Input>;
397    type Output = EncodedBatch<B::Output>;
398
399    fn get_settings_defaults<D: SinkBatchSettings + Clone>(
400        config: BatchConfig<D, Merged>,
401    ) -> Result<BatchConfig<D, Merged>, BatchError> {
402        B::get_settings_defaults(config)
403    }
404
405    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
406        let EncodedEvent {
407            item,
408            finalizers,
409            byte_size,
410            json_byte_size,
411        } = item;
412        match self.inner.push(item) {
413            PushResult::Ok(full) => {
414                self.finalizers.merge(finalizers);
415                self.count += 1;
416                self.byte_size += byte_size;
417                self.json_byte_size += json_byte_size;
418                PushResult::Ok(full)
419            }
420            PushResult::Overflow(item) => PushResult::Overflow(EncodedEvent {
421                item,
422                finalizers,
423                byte_size,
424                json_byte_size,
425            }),
426        }
427    }
428
429    fn is_empty(&self) -> bool {
430        self.inner.is_empty()
431    }
432
433    fn fresh(&self) -> Self {
434        Self {
435            inner: self.inner.fresh(),
436            finalizers: Default::default(),
437            count: 0,
438            byte_size: 0,
439            json_byte_size: JsonSize::zero(),
440        }
441    }
442
443    fn finish(self) -> Self::Output {
444        EncodedBatch {
445            items: self.inner.finish(),
446            finalizers: self.finalizers,
447            count: self.count,
448            byte_size: self.byte_size,
449            json_byte_size: self.json_byte_size,
450        }
451    }
452
453    fn num_items(&self) -> usize {
454        self.inner.num_items()
455    }
456}
457
458#[derive(Clone, Debug)]
459pub struct StatefulBatch<B> {
460    inner: B,
461    was_full: bool,
462}
463
464impl<B: Batch> From<B> for StatefulBatch<B> {
465    fn from(inner: B) -> Self {
466        Self {
467            inner,
468            was_full: false,
469        }
470    }
471}
472
473impl<B> StatefulBatch<B> {
474    pub const fn was_full(&self) -> bool {
475        self.was_full
476    }
477
478    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
479    pub fn into_inner(self) -> B {
480        self.inner
481    }
482}
483
484impl<B: Batch> Batch for StatefulBatch<B> {
485    type Input = B::Input;
486    type Output = B::Output;
487
488    fn get_settings_defaults<D: SinkBatchSettings + Clone>(
489        config: BatchConfig<D, Merged>,
490    ) -> Result<BatchConfig<D, Merged>, BatchError> {
491        B::get_settings_defaults(config)
492    }
493
494    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
495        if self.was_full {
496            PushResult::Overflow(item)
497        } else {
498            let result = self.inner.push(item);
499            self.was_full =
500                matches!(result, PushResult::Overflow(_)) || matches!(result, PushResult::Ok(true));
501            result
502        }
503    }
504
505    fn is_empty(&self) -> bool {
506        !self.was_full && self.inner.is_empty()
507    }
508
509    fn fresh(&self) -> Self {
510        Self {
511            inner: self.inner.fresh(),
512            was_full: false,
513        }
514    }
515
516    fn finish(self) -> Self::Output {
517        self.inner.finish()
518    }
519
520    fn num_items(&self) -> usize {
521        self.inner.num_items()
522    }
523}