vector_buffers/
lib.rs

1//! The Vector Core buffer
2//!
3//! This library implements a channel like functionality, one variant which is
4//! solely in-memory and the other that is on-disk. Both variants are bounded.
5
6#![deny(warnings)]
7#![deny(clippy::all)]
8#![deny(clippy::pedantic)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::type_complexity)] // long-types happen, especially in async code
11#![allow(clippy::must_use_candidate)]
12#![allow(async_fn_in_trait)]
13
14#[macro_use]
15extern crate tracing;
16
17mod buffer_usage_data;
18
19pub mod config;
20pub use config::{BufferConfig, BufferType, MemoryBufferSize};
21use encoding::Encodable;
22pub(crate) use vector_common::Result;
23use vector_config::configurable_component;
24
25pub mod encoding;
26
27mod internal_events;
28
29#[cfg(test)]
30pub mod test;
31pub mod topology;
32
33pub(crate) mod variants;
34
35use std::fmt::Debug;
36
37#[cfg(test)]
38use quickcheck::{Arbitrary, Gen};
39use vector_common::{byte_size_of::ByteSizeOf, finalization::AddBatchNotifier};
40
41/// Event handling behavior when a buffer is full.
42#[configurable_component]
43#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
44#[serde(rename_all = "snake_case")]
45pub enum WhenFull {
46    /// Wait for free space in the buffer.
47    ///
48    /// This applies backpressure up the topology, signalling that sources should slow down
49    /// the acceptance/consumption of events. This means that while no data is lost, data will pile
50    /// up at the edge.
51    #[default]
52    Block,
53
54    /// Drops the event instead of waiting for free space in buffer.
55    ///
56    /// The event will be intentionally dropped. This mode is typically used when performance is the
57    /// highest priority, and it is preferable to temporarily lose events rather than cause a
58    /// slowdown in the acceptance/consumption of events.
59    DropNewest,
60
61    /// Overflows to the next stage in the buffer topology.
62    ///
63    /// If the current buffer stage is full, attempt to send this event to the next buffer stage.
64    /// That stage may also be configured overflow, and so on, but ultimately the last stage in a
65    /// buffer topology must use one of the other handling behaviors. This means that next stage may
66    /// potentially be able to buffer the event, but it may also block or drop the event.
67    ///
68    /// This mode can only be used when two or more buffer stages are configured.
69    #[configurable(metadata(docs::hidden))]
70    Overflow,
71}
72
73#[cfg(test)]
74impl Arbitrary for WhenFull {
75    fn arbitrary(g: &mut Gen) -> Self {
76        // TODO: We explicitly avoid generating "overflow" as a possible value because nothing yet
77        // supports handling it, and will be defaulted to using "block" if they encounter
78        // "overflow".  Thus, there's no reason to emit it here... yet.
79        if bool::arbitrary(g) {
80            WhenFull::Block
81        } else {
82            WhenFull::DropNewest
83        }
84    }
85}
86
87/// An item that can be buffered in memory.
88///
89/// This supertrait serves as the base trait for any item that can be pushed into a memory buffer.
90/// It is a relaxed version of `Bufferable` that allows for items that are not `Encodable` (e.g., `Instant`),
91/// which is an unnecessary constraint for memory buffers.
92pub trait InMemoryBufferable:
93    AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
94{
95}
96
97// Blanket implementation for anything that is already in-memory bufferable.
98impl<T> InMemoryBufferable for T where
99    T: AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
100{
101}
102
103/// An item that can be buffered.
104///
105/// This supertrait serves as the base trait for any item that can be pushed into a buffer.
106pub trait Bufferable: InMemoryBufferable + Encodable {}
107
108// Blanket implementation for anything that is already bufferable.
109impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}
110
111/// Hook for observing items as they are sent into a `BufferSender`.
112pub trait BufferInstrumentation<T: Bufferable>: Send + Sync + 'static {
113    /// Called immediately before the item is emitted to the underlying buffer.
114    /// The underlying type is stored in an `Arc`, so we cannot have `&mut self`.
115    fn on_send(&self, item: &mut T);
116}
117
118pub trait EventCount {
119    fn event_count(&self) -> usize;
120}
121
122impl<T> EventCount for Vec<T>
123where
124    T: EventCount,
125{
126    fn event_count(&self) -> usize {
127        self.iter().map(EventCount::event_count).sum()
128    }
129}
130
131impl<T> EventCount for &T
132where
133    T: EventCount,
134{
135    fn event_count(&self) -> usize {
136        (*self).event_count()
137    }
138}
139
140#[track_caller]
141pub(crate) fn spawn_named<T>(
142    task: impl std::future::Future<Output = T> + Send + 'static,
143    _name: &str,
144) -> tokio::task::JoinHandle<T>
145where
146    T: Send + 'static,
147{
148    #[cfg(tokio_unstable)]
149    return tokio::task::Builder::new()
150        .name(_name)
151        .spawn(task)
152        .expect("tokio task should spawn");
153
154    #[cfg(not(tokio_unstable))]
155    tokio::spawn(task)
156}