vector_buffers/
lib.rs

1//! The Vector Core buffer
2//!
3//! This library implements a channel like functionality, one variant which is
4//! solely in-memory and the other that is on-disk. Both variants are bounded.
5
6#![deny(warnings)]
7#![deny(clippy::all)]
8#![deny(clippy::pedantic)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::type_complexity)] // long-types happen, especially in async code
11#![allow(clippy::must_use_candidate)]
12#![allow(async_fn_in_trait)]
13
14#[macro_use]
15extern crate tracing;
16
17mod buffer_usage_data;
18
19pub mod config;
20pub use config::{BufferConfig, BufferType, MemoryBufferSize};
21use encoding::Encodable;
22use vector_config::configurable_component;
23
24pub(crate) use vector_common::Result;
25
26pub mod encoding;
27
28mod internal_events;
29
30#[cfg(test)]
31pub mod test;
32pub mod topology;
33
34mod cast_utils;
35pub(crate) mod variants;
36
37use std::fmt::Debug;
38
39#[cfg(test)]
40use quickcheck::{Arbitrary, Gen};
41use vector_common::{byte_size_of::ByteSizeOf, finalization::AddBatchNotifier};
42
43/// Event handling behavior when a buffer is full.
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
46#[serde(rename_all = "snake_case")]
47pub enum WhenFull {
48    /// Wait for free space in the buffer.
49    ///
50    /// This applies backpressure up the topology, signalling that sources should slow down
51    /// the acceptance/consumption of events. This means that while no data is lost, data will pile
52    /// up at the edge.
53    #[default]
54    Block,
55
56    /// Drops the event instead of waiting for free space in buffer.
57    ///
58    /// The event will be intentionally dropped. This mode is typically used when performance is the
59    /// highest priority, and it is preferable to temporarily lose events rather than cause a
60    /// slowdown in the acceptance/consumption of events.
61    DropNewest,
62
63    /// Overflows to the next stage in the buffer topology.
64    ///
65    /// If the current buffer stage is full, attempt to send this event to the next buffer stage.
66    /// That stage may also be configured overflow, and so on, but ultimately the last stage in a
67    /// buffer topology must use one of the other handling behaviors. This means that next stage may
68    /// potentially be able to buffer the event, but it may also block or drop the event.
69    ///
70    /// This mode can only be used when two or more buffer stages are configured.
71    #[configurable(metadata(docs::hidden))]
72    Overflow,
73}
74
75#[cfg(test)]
76impl Arbitrary for WhenFull {
77    fn arbitrary(g: &mut Gen) -> Self {
78        // TODO: We explicitly avoid generating "overflow" as a possible value because nothing yet
79        // supports handling it, and will be defaulted to using "block" if they encounter
80        // "overflow".  Thus, there's no reason to emit it here... yet.
81        if bool::arbitrary(g) {
82            WhenFull::Block
83        } else {
84            WhenFull::DropNewest
85        }
86    }
87}
88
89/// An item that can be buffered in memory.
90///
91/// This supertrait serves as the base trait for any item that can be pushed into a memory buffer.
92/// It is a relaxed version of `Bufferable` that allows for items that are not `Encodable` (e.g., `Instant`),
93/// which is an unnecessary constraint for memory buffers.
94pub trait InMemoryBufferable:
95    AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
96{
97}
98
99// Blanket implementation for anything that is already in-memory bufferable.
100impl<T> InMemoryBufferable for T where
101    T: AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
102{
103}
104
105/// An item that can be buffered.
106///
107/// This supertrait serves as the base trait for any item that can be pushed into a buffer.
108pub trait Bufferable: InMemoryBufferable + Encodable {}
109
110// Blanket implementation for anything that is already bufferable.
111impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}
112
113pub trait EventCount {
114    fn event_count(&self) -> usize;
115}
116
117impl<T> EventCount for Vec<T>
118where
119    T: EventCount,
120{
121    fn event_count(&self) -> usize {
122        self.iter().map(EventCount::event_count).sum()
123    }
124}
125
126impl<T> EventCount for &T
127where
128    T: EventCount,
129{
130    fn event_count(&self) -> usize {
131        (*self).event_count()
132    }
133}
134
135#[track_caller]
136pub(crate) fn spawn_named<T>(
137    task: impl std::future::Future<Output = T> + Send + 'static,
138    _name: &str,
139) -> tokio::task::JoinHandle<T>
140where
141    T: Send + 'static,
142{
143    #[cfg(tokio_unstable)]
144    return tokio::task::Builder::new()
145        .name(_name)
146        .spawn(task)
147        .expect("tokio task should spawn");
148
149    #[cfg(not(tokio_unstable))]
150    tokio::spawn(task)
151}