1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
use std::{error::Error, num::NonZeroUsize};
use async_trait::async_trait;
use snafu::{ResultExt, Snafu};
use tracing::Span;
use super::channel::{ReceiverAdapter, SenderAdapter};
use crate::{
buffer_usage_data::{BufferUsage, BufferUsageHandle},
topology::channel::{BufferReceiver, BufferSender},
variants::MemoryBuffer,
Bufferable, WhenFull,
};
/// Value that can be used as a stage in a buffer topology.
#[async_trait]
pub trait IntoBuffer<T: Bufferable>: Send {
/// Gets whether or not this buffer stage provides its own instrumentation, or if it should be
/// instrumented from the outside.
///
/// As some buffer stages, like the in-memory channel, never have a chance to catch the values
/// in the middle of the channel without introducing an unnecessary hop, [`BufferSender`] and
/// [`BufferReceiver`] can be configured to instrument all events flowing through directly.
///
/// When instrumentation is provided in this way, [`vector_common::byte_size_of::ByteSizeOf`]
/// is used to calculate the size of the event going both into and out of the buffer.
fn provides_instrumentation(&self) -> bool {
false
}
/// Converts this value into a sender and receiver pair suitable for use in a buffer topology.
async fn into_buffer_parts(
self: Box<Self>,
usage_handle: BufferUsageHandle,
) -> Result<(SenderAdapter<T>, ReceiverAdapter<T>), Box<dyn Error + Send + Sync>>;
}
#[derive(Debug, Snafu)]
pub enum TopologyError {
#[snafu(display("buffer topology cannot be empty"))]
EmptyTopology,
#[snafu(display(
"stage {} configured with block/drop newest behavior in front of subsequent stage",
stage_idx
))]
NextStageNotUsed { stage_idx: usize },
#[snafu(display("last stage in buffer topology cannot be set to overflow mode"))]
OverflowWhenLast,
#[snafu(display("failed to build individual stage {}: {}", stage_idx, source))]
FailedToBuildStage {
stage_idx: usize,
source: Box<dyn Error + Send + Sync>,
},
#[snafu(display(
"multiple components with segmented acknowledgements cannot be used in the same buffer"
))]
StackedAcks,
}
struct TopologyStage<T: Bufferable> {
untransformed: Box<dyn IntoBuffer<T>>,
when_full: WhenFull,
}
/// Builder for constructing buffer topologies.
pub struct TopologyBuilder<T: Bufferable> {
stages: Vec<TopologyStage<T>>,
}
impl<T: Bufferable> TopologyBuilder<T> {
/// Adds a new stage to the buffer topology.
///
/// The "when full" behavior can be optionally configured here. If no behavior is specified,
/// and an overflow buffer is _not_ added to the topology after this, then the "when full"
/// behavior will use a default value of "block". If a "when full" behavior is specified, and
/// an overflow buffer is added to the topology after this, then the specified "when full"
/// behavior will be ignored and will be set to "overflow" mode.
///
/// Callers can configure what to do when a buffer is full by setting `when_full`. Three modes
/// are available -- block, drop newest, and overflow -- which are documented in more detail by
/// [`BufferSender`].
///
/// Two notes about what modes are not valid in certain scenarios:
/// - the innermost stage (the last stage given to the builder) cannot be set to "overflow" mode,
/// as there is no other stage to overflow to
/// - a stage cannot use the "block" or "drop newest" mode when there is a subsequent stage, and
/// must user the "overflow" mode
///
/// Any occurrence of either of these scenarios will result in an error during build.
pub fn stage<S>(&mut self, stage: S, when_full: WhenFull) -> &mut Self
where
S: IntoBuffer<T> + 'static,
{
self.stages.push(TopologyStage {
untransformed: Box::new(stage),
when_full,
});
self
}
/// Consumes this builder, returning the sender and receiver that can be used by components.
///
/// # Errors
///
/// If there was a configuration error with one of the stages, an error variant will be returned
/// explaining the issue.
pub async fn build(
self,
buffer_id: String,
span: Span,
) -> Result<(BufferSender<T>, BufferReceiver<T>), TopologyError> {
// We pop stages off in reverse order to build from the inside out.
let mut buffer_usage = BufferUsage::from_span(span.clone());
let mut current_stage = None;
for (stage_idx, stage) in self.stages.into_iter().enumerate().rev() {
// Make sure the stage is valid for our current builder state.
match stage.when_full {
// The innermost stage can't be set to overflow, there's nothing else to overflow _to_.
WhenFull::Overflow => {
if current_stage.is_none() {
return Err(TopologyError::OverflowWhenLast);
}
}
// If there's already an inner stage, then blocking or dropping the newest events
// doesn't no sense. Overflowing is the only valid transition to another stage.
WhenFull::Block | WhenFull::DropNewest => {
if current_stage.is_some() {
return Err(TopologyError::NextStageNotUsed { stage_idx });
}
}
};
// Create the buffer usage handle for this stage and initialize it as we create the
// sender/receiver. This is slightly awkward since we just end up actually giving
// the handle to the `BufferSender`/`BufferReceiver` wrappers, but that's the price we
// have to pay for letting each stage function in an opaque way when wrapped.
let usage_handle = buffer_usage.add_stage(stage_idx);
let provides_instrumentation = stage.untransformed.provides_instrumentation();
let (sender, receiver) = stage
.untransformed
.into_buffer_parts(usage_handle.clone())
.await
.context(FailedToBuildStageSnafu { stage_idx })?;
let (mut sender, mut receiver) = match current_stage.take() {
None => (
BufferSender::new(sender, stage.when_full),
BufferReceiver::new(receiver),
),
Some((current_sender, current_receiver)) => (
BufferSender::with_overflow(sender, current_sender),
BufferReceiver::with_overflow(receiver, current_receiver),
),
};
sender.with_send_duration_instrumentation(stage_idx, &span);
if !provides_instrumentation {
sender.with_usage_instrumentation(usage_handle.clone());
receiver.with_usage_instrumentation(usage_handle);
}
current_stage = Some((sender, receiver));
}
let (sender, receiver) = current_stage.ok_or(TopologyError::EmptyTopology)?;
// Install the buffer usage handler since we successfully created the buffer topology. This
// spawns it in the background and periodically emits aggregated metrics about each of the
// buffer stages.
buffer_usage.install(buffer_id.as_str());
Ok((sender, receiver))
}
}
impl<T: Bufferable> TopologyBuilder<T> {
/// Creates a memory-only buffer topology.
///
/// The overflow mode (i.e. `WhenFull`) can be configured to either block or drop the newest
/// values, but cannot be configured to use overflow mode. If overflow mode is selected, it
/// will be changed to blocking mode.
///
/// This is a convenience method for `vector` as it is used for inter-transform channels, and we
/// can simplifying needing to require callers to do all the boilerplate to create the builder,
/// create the stage, installing buffer usage metrics that aren't required, and so on.
///
#[allow(clippy::print_stderr)]
pub async fn standalone_memory(
max_events: NonZeroUsize,
when_full: WhenFull,
receiver_span: &Span,
) -> (BufferSender<T>, BufferReceiver<T>) {
let usage_handle = BufferUsageHandle::noop();
let memory_buffer = Box::new(MemoryBuffer::new(max_events));
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
m => m,
};
let mut sender = BufferSender::new(sender, mode);
sender.with_send_duration_instrumentation(0, receiver_span);
let receiver = BufferReceiver::new(receiver);
(sender, receiver)
}
/// Creates a memory-only buffer topology with the given buffer usage handle.
///
/// This is specifically required for the tests that occur under `buffers`, as we assert things
/// like channel capacity left, which cannot be done on in-memory v1 buffers as they use the
/// more abstract `Sink`-based adapters.
///
/// The overflow mode (i.e. `WhenFull`) can be configured to either block or drop the newest
/// values, but cannot be configured to use overflow mode. If overflow mode is selected, it
/// will be changed to blocking mode.
///
/// This is a convenience method for `vector` as it is used for inter-transform channels, and we
/// can simplifying needing to require callers to do all the boilerplate to create the builder,
/// create the stage, installing buffer usage metrics that aren't required, and so on.
#[cfg(test)]
pub async fn standalone_memory_test(
max_events: NonZeroUsize,
when_full: WhenFull,
usage_handle: BufferUsageHandle,
) -> (BufferSender<T>, BufferReceiver<T>) {
let memory_buffer = Box::new(MemoryBuffer::new(max_events));
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
m => m,
};
let mut sender = BufferSender::new(sender, mode);
let mut receiver = BufferReceiver::new(receiver);
sender.with_usage_instrumentation(usage_handle.clone());
receiver.with_usage_instrumentation(usage_handle);
(sender, receiver)
}
}
impl<T: Bufferable> Default for TopologyBuilder<T> {
fn default() -> Self {
Self { stages: Vec::new() }
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;
use tracing::Span;
use super::TopologyBuilder;
use crate::{
topology::builder::TopologyError,
topology::test_util::{assert_current_send_capacity, Sample},
variants::MemoryBuffer,
WhenFull,
};
#[tokio::test]
async fn single_stage_topology_block() {
let mut builder = TopologyBuilder::<Sample>::default();
builder.stage(
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
WhenFull::Block,
);
let result = builder.build(String::from("test"), Span::none()).await;
assert!(result.is_ok());
let (mut sender, _) = result.unwrap();
assert_current_send_capacity(&mut sender, Some(1), None);
}
#[tokio::test]
async fn single_stage_topology_drop_newest() {
let mut builder = TopologyBuilder::<Sample>::default();
builder.stage(
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
WhenFull::DropNewest,
);
let result = builder.build(String::from("test"), Span::none()).await;
assert!(result.is_ok());
let (mut sender, _) = result.unwrap();
assert_current_send_capacity(&mut sender, Some(1), None);
}
#[tokio::test]
async fn single_stage_topology_overflow() {
let mut builder = TopologyBuilder::<Sample>::default();
builder.stage(
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
WhenFull::Overflow,
);
let result = builder.build(String::from("test"), Span::none()).await;
match result {
Err(TopologyError::OverflowWhenLast) => {}
r => panic!("unexpected build result: {r:?}"),
}
}
#[tokio::test]
async fn two_stage_topology_block() {
let mut builder = TopologyBuilder::<Sample>::default();
builder.stage(
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
WhenFull::Block,
);
builder.stage(
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
WhenFull::Block,
);
let result = builder.build(String::from("test"), Span::none()).await;
match result {
Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
r => panic!("unexpected build result: {r:?}"),
}
}
#[tokio::test]
async fn two_stage_topology_drop_newest() {
let mut builder = TopologyBuilder::<Sample>::default();
builder.stage(
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
WhenFull::DropNewest,
);
builder.stage(
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
WhenFull::Block,
);
let result = builder.build(String::from("test"), Span::none()).await;
match result {
Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
r => panic!("unexpected build result: {r:?}"),
}
}
#[tokio::test]
async fn two_stage_topology_overflow() {
let mut builder = TopologyBuilder::<Sample>::default();
builder.stage(
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
WhenFull::Overflow,
);
builder.stage(
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
WhenFull::Block,
);
let result = builder.build(String::from("test"), Span::none()).await;
assert!(result.is_ok());
let (mut sender, _) = result.unwrap();
assert_current_send_capacity(&mut sender, Some(1), Some(1));
}
}