vector_buffers/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
//! The Vector Core buffer
//!
//! This library implements a channel like functionality, one variant which is
//! solely in-memory and the other that is on-disk. Both variants are bounded.
#![deny(warnings)]
#![deny(clippy::all)]
#![deny(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::type_complexity)] // long-types happen, especially in async code
#![allow(clippy::must_use_candidate)]
#![allow(async_fn_in_trait)]
#[macro_use]
extern crate tracing;
mod buffer_usage_data;
pub mod config;
pub use config::{BufferConfig, BufferType};
use encoding::Encodable;
use vector_config::configurable_component;
pub(crate) use vector_common::Result;
pub mod encoding;
mod internal_events;
#[cfg(test)]
pub mod test;
pub mod topology;
pub(crate) mod variants;
use std::fmt::Debug;
#[cfg(test)]
use quickcheck::{Arbitrary, Gen};
use vector_common::{byte_size_of::ByteSizeOf, finalization::AddBatchNotifier};
/// Event handling behavior when a buffer is full.
#[configurable_component]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum WhenFull {
/// Wait for free space in the buffer.
///
/// This applies backpressure up the topology, signalling that sources should slow down
/// the acceptance/consumption of events. This means that while no data is lost, data will pile
/// up at the edge.
#[default]
Block,
/// Drops the event instead of waiting for free space in buffer.
///
/// The event will be intentionally dropped. This mode is typically used when performance is the
/// highest priority, and it is preferable to temporarily lose events rather than cause a
/// slowdown in the acceptance/consumption of events.
DropNewest,
/// Overflows to the next stage in the buffer topology.
///
/// If the current buffer stage is full, attempt to send this event to the next buffer stage.
/// That stage may also be configured overflow, and so on, but ultimately the last stage in a
/// buffer topology must use one of the other handling behaviors. This means that next stage may
/// potentially be able to buffer the event, but it may also block or drop the event.
///
/// This mode can only be used when two or more buffer stages are configured.
#[configurable(metadata(docs::hidden))]
Overflow,
}
#[cfg(test)]
impl Arbitrary for WhenFull {
fn arbitrary(g: &mut Gen) -> Self {
// TODO: We explicitly avoid generating "overflow" as a possible value because nothing yet
// supports handling it, and will be defaulted to using "block" if they encounter
// "overflow". Thus, there's no reason to emit it here... yet.
if bool::arbitrary(g) {
WhenFull::Block
} else {
WhenFull::DropNewest
}
}
}
/// An item that can be buffered in memory.
///
/// This supertrait serves as the base trait for any item that can be pushed into a memory buffer.
/// It is a relaxed version of `Bufferable` that allows for items that are not `Encodable` (e.g., `Instant`),
/// which is an unnecessary constraint for memory buffers.
pub trait InMemoryBufferable:
AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
{
}
// Blanket implementation for anything that is already in-memory bufferable.
impl<T> InMemoryBufferable for T where
T: AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
{
}
/// An item that can be buffered.
///
/// This supertrait serves as the base trait for any item that can be pushed into a buffer.
pub trait Bufferable: InMemoryBufferable + Encodable {}
// Blanket implementation for anything that is already bufferable.
impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}
pub trait EventCount {
fn event_count(&self) -> usize;
}
impl<T> EventCount for Vec<T>
where
T: EventCount,
{
fn event_count(&self) -> usize {
self.iter().map(EventCount::event_count).sum()
}
}
impl<T> EventCount for &T
where
T: EventCount,
{
fn event_count(&self) -> usize {
(*self).event_count()
}
}
#[track_caller]
pub(crate) fn spawn_named<T>(
task: impl std::future::Future<Output = T> + Send + 'static,
_name: &str,
) -> tokio::task::JoinHandle<T>
where
T: Send + 'static,
{
#[cfg(tokio_unstable)]
return tokio::task::Builder::new()
.name(_name)
.spawn(task)
.expect("tokio task should spawn");
#[cfg(not(tokio_unstable))]
tokio::spawn(task)
}