vector_buffers/lib.rs
1//! The Vector Core buffer
2//!
3//! This library implements a channel like functionality, one variant which is
4//! solely in-memory and the other that is on-disk. Both variants are bounded.
5
6#![deny(warnings)]
7#![deny(clippy::all)]
8#![deny(clippy::pedantic)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::type_complexity)] // long-types happen, especially in async code
11#![allow(clippy::must_use_candidate)]
12#![allow(async_fn_in_trait)]
13
14#[macro_use]
15extern crate tracing;
16
17mod buffer_usage_data;
18
19pub mod config;
20pub use config::{BufferConfig, BufferType, MemoryBufferSize};
21use encoding::Encodable;
22pub(crate) use vector_common::Result;
23use vector_config::configurable_component;
24
25pub mod encoding;
26
27mod internal_events;
28
29#[cfg(test)]
30pub mod test;
31pub mod topology;
32
33pub(crate) mod variants;
34
35use std::fmt::Debug;
36
37#[cfg(test)]
38use quickcheck::{Arbitrary, Gen};
39use vector_common::{byte_size_of::ByteSizeOf, finalization::AddBatchNotifier};
40
41/// Event handling behavior when a buffer is full.
42#[configurable_component]
43#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
44#[serde(rename_all = "snake_case")]
45pub enum WhenFull {
46 /// Wait for free space in the buffer.
47 ///
48 /// This applies backpressure up the topology, signalling that sources should slow down
49 /// the acceptance/consumption of events. This means that while no data is lost, data will pile
50 /// up at the edge.
51 #[default]
52 Block,
53
54 /// Drops the event instead of waiting for free space in buffer.
55 ///
56 /// The event will be intentionally dropped. This mode is typically used when performance is the
57 /// highest priority, and it is preferable to temporarily lose events rather than cause a
58 /// slowdown in the acceptance/consumption of events.
59 DropNewest,
60
61 /// Overflows to the next stage in the buffer topology.
62 ///
63 /// If the current buffer stage is full, attempt to send this event to the next buffer stage.
64 /// That stage may also be configured overflow, and so on, but ultimately the last stage in a
65 /// buffer topology must use one of the other handling behaviors. This means that next stage may
66 /// potentially be able to buffer the event, but it may also block or drop the event.
67 ///
68 /// This mode can only be used when two or more buffer stages are configured.
69 #[configurable(metadata(docs::hidden))]
70 Overflow,
71}
72
73#[cfg(test)]
74impl Arbitrary for WhenFull {
75 fn arbitrary(g: &mut Gen) -> Self {
76 // TODO: We explicitly avoid generating "overflow" as a possible value because nothing yet
77 // supports handling it, and will be defaulted to using "block" if they encounter
78 // "overflow". Thus, there's no reason to emit it here... yet.
79 if bool::arbitrary(g) {
80 WhenFull::Block
81 } else {
82 WhenFull::DropNewest
83 }
84 }
85}
86
87/// An item that can be buffered in memory.
88///
89/// This supertrait serves as the base trait for any item that can be pushed into a memory buffer.
90/// It is a relaxed version of `Bufferable` that allows for items that are not `Encodable` (e.g., `Instant`),
91/// which is an unnecessary constraint for memory buffers.
92pub trait InMemoryBufferable:
93 AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
94{
95}
96
97// Blanket implementation for anything that is already in-memory bufferable.
98impl<T> InMemoryBufferable for T where
99 T: AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
100{
101}
102
103/// An item that can be buffered.
104///
105/// This supertrait serves as the base trait for any item that can be pushed into a buffer.
106pub trait Bufferable: InMemoryBufferable + Encodable {}
107
108// Blanket implementation for anything that is already bufferable.
109impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}
110
111pub trait EventCount {
112 fn event_count(&self) -> usize;
113}
114
115impl<T> EventCount for Vec<T>
116where
117 T: EventCount,
118{
119 fn event_count(&self) -> usize {
120 self.iter().map(EventCount::event_count).sum()
121 }
122}
123
124impl<T> EventCount for &T
125where
126 T: EventCount,
127{
128 fn event_count(&self) -> usize {
129 (*self).event_count()
130 }
131}
132
133#[track_caller]
134pub(crate) fn spawn_named<T>(
135 task: impl std::future::Future<Output = T> + Send + 'static,
136 _name: &str,
137) -> tokio::task::JoinHandle<T>
138where
139 T: Send + 'static,
140{
141 #[cfg(tokio_unstable)]
142 return tokio::task::Builder::new()
143 .name(_name)
144 .spawn(task)
145 .expect("tokio task should spawn");
146
147 #[cfg(not(tokio_unstable))]
148 tokio::spawn(task)
149}