vector_buffers/topology/
builder.rs

1use std::{error::Error, num::NonZeroUsize};
2
3use async_trait::async_trait;
4use snafu::{ResultExt, Snafu};
5use tracing::Span;
6
7use super::channel::{ReceiverAdapter, SenderAdapter};
8use crate::{
9    buffer_usage_data::{BufferUsage, BufferUsageHandle},
10    topology::channel::{BufferReceiver, BufferSender},
11    variants::MemoryBuffer,
12    Bufferable, WhenFull,
13};
14
15/// Value that can be used as a stage in a buffer topology.
16#[async_trait]
17pub trait IntoBuffer<T: Bufferable>: Send {
18    /// Gets whether or not this buffer stage provides its own instrumentation, or if it should be
19    /// instrumented from the outside.
20    ///
21    /// As some buffer stages, like the in-memory channel, never have a chance to catch the values
22    /// in the middle of the channel without introducing an unnecessary hop, [`BufferSender`] and
23    /// [`BufferReceiver`] can be configured to instrument all events flowing through directly.
24    ///
25    /// When instrumentation is provided in this way, [`vector_common::byte_size_of::ByteSizeOf`]
26    ///  is used to calculate the size of the event going both into and out of the buffer.
27    fn provides_instrumentation(&self) -> bool {
28        false
29    }
30
31    /// Converts this value into a sender and receiver pair suitable for use in a buffer topology.
32    async fn into_buffer_parts(
33        self: Box<Self>,
34        usage_handle: BufferUsageHandle,
35    ) -> Result<(SenderAdapter<T>, ReceiverAdapter<T>), Box<dyn Error + Send + Sync>>;
36}
37
38#[derive(Debug, Snafu)]
39pub enum TopologyError {
40    #[snafu(display("buffer topology cannot be empty"))]
41    EmptyTopology,
42    #[snafu(display(
43        "stage {} configured with block/drop newest behavior in front of subsequent stage",
44        stage_idx
45    ))]
46    NextStageNotUsed { stage_idx: usize },
47    #[snafu(display("last stage in buffer topology cannot be set to overflow mode"))]
48    OverflowWhenLast,
49    #[snafu(display("failed to build individual stage {}: {}", stage_idx, source))]
50    FailedToBuildStage {
51        stage_idx: usize,
52        source: Box<dyn Error + Send + Sync>,
53    },
54    #[snafu(display(
55        "multiple components with segmented acknowledgements cannot be used in the same buffer"
56    ))]
57    StackedAcks,
58}
59
60struct TopologyStage<T: Bufferable> {
61    untransformed: Box<dyn IntoBuffer<T>>,
62    when_full: WhenFull,
63}
64
65/// Builder for constructing buffer topologies.
66pub struct TopologyBuilder<T: Bufferable> {
67    stages: Vec<TopologyStage<T>>,
68}
69
70impl<T: Bufferable> TopologyBuilder<T> {
71    /// Adds a new stage to the buffer topology.
72    ///
73    /// The "when full" behavior can be optionally configured here.  If no behavior is specified,
74    /// and an overflow buffer is _not_ added to the topology after this, then the "when full"
75    /// behavior will use a default value of "block".  If a "when full" behavior is specified, and
76    /// an overflow buffer is added to the topology after this, then the specified "when full"
77    /// behavior will be ignored and will be set to "overflow" mode.
78    ///
79    /// Callers can configure what to do when a buffer is full by setting `when_full`.  Three modes
80    /// are available -- block, drop newest, and overflow -- which are documented in more detail by
81    /// [`BufferSender`].
82    ///
83    /// Two notes about what modes are not valid in certain scenarios:
84    /// - the innermost stage (the last stage given to the builder) cannot be set to "overflow" mode,
85    ///   as there is no other stage to overflow to
86    /// - a stage cannot use the "block" or "drop newest" mode when there is a subsequent stage, and
87    ///   must user the "overflow" mode
88    ///
89    /// Any occurrence of either of these scenarios will result in an error during build.
90    pub fn stage<S>(&mut self, stage: S, when_full: WhenFull) -> &mut Self
91    where
92        S: IntoBuffer<T> + 'static,
93    {
94        self.stages.push(TopologyStage {
95            untransformed: Box::new(stage),
96            when_full,
97        });
98        self
99    }
100
101    /// Consumes this builder, returning the sender and receiver that can be used by components.
102    ///
103    /// # Errors
104    ///
105    /// If there was a configuration error with one of the stages, an error variant will be returned
106    /// explaining the issue.
107    pub async fn build(
108        self,
109        buffer_id: String,
110        span: Span,
111    ) -> Result<(BufferSender<T>, BufferReceiver<T>), TopologyError> {
112        // We pop stages off in reverse order to build from the inside out.
113        let mut buffer_usage = BufferUsage::from_span(span.clone());
114        let mut current_stage = None;
115
116        for (stage_idx, stage) in self.stages.into_iter().enumerate().rev() {
117            // Make sure the stage is valid for our current builder state.
118            match stage.when_full {
119                // The innermost stage can't be set to overflow, there's nothing else to overflow _to_.
120                WhenFull::Overflow => {
121                    if current_stage.is_none() {
122                        return Err(TopologyError::OverflowWhenLast);
123                    }
124                }
125                // If there's already an inner stage, then blocking or dropping the newest events
126                // doesn't no sense.  Overflowing is the only valid transition to another stage.
127                WhenFull::Block | WhenFull::DropNewest => {
128                    if current_stage.is_some() {
129                        return Err(TopologyError::NextStageNotUsed { stage_idx });
130                    }
131                }
132            }
133
134            // Create the buffer usage handle for this stage and initialize it as we create the
135            // sender/receiver.  This is slightly awkward since we just end up actually giving
136            // the handle to the `BufferSender`/`BufferReceiver` wrappers, but that's the price we
137            // have to pay for letting each stage function in an opaque way when wrapped.
138            let usage_handle = buffer_usage.add_stage(stage_idx);
139            let provides_instrumentation = stage.untransformed.provides_instrumentation();
140            let (sender, receiver) = stage
141                .untransformed
142                .into_buffer_parts(usage_handle.clone())
143                .await
144                .context(FailedToBuildStageSnafu { stage_idx })?;
145
146            let (mut sender, mut receiver) = match current_stage.take() {
147                None => (
148                    BufferSender::new(sender, stage.when_full),
149                    BufferReceiver::new(receiver),
150                ),
151                Some((current_sender, current_receiver)) => (
152                    BufferSender::with_overflow(sender, current_sender),
153                    BufferReceiver::with_overflow(receiver, current_receiver),
154                ),
155            };
156
157            sender.with_send_duration_instrumentation(stage_idx, &span);
158            if !provides_instrumentation {
159                sender.with_usage_instrumentation(usage_handle.clone());
160                receiver.with_usage_instrumentation(usage_handle);
161            }
162
163            current_stage = Some((sender, receiver));
164        }
165
166        let (sender, receiver) = current_stage.ok_or(TopologyError::EmptyTopology)?;
167
168        // Install the buffer usage handler since we successfully created the buffer topology.  This
169        // spawns it in the background and periodically emits aggregated metrics about each of the
170        // buffer stages.
171        buffer_usage.install(buffer_id.as_str());
172
173        Ok((sender, receiver))
174    }
175}
176
177impl<T: Bufferable> TopologyBuilder<T> {
178    /// Creates a memory-only buffer topology.
179    ///
180    /// The overflow mode (i.e. `WhenFull`) can be configured to either block or drop the newest
181    /// values, but cannot be configured to use overflow mode.  If overflow mode is selected, it
182    /// will be changed to blocking mode.
183    ///
184    /// This is a convenience method for `vector` as it is used for inter-transform channels, and we
185    /// can simplifying needing to require callers to do all the boilerplate to create the builder,
186    /// create the stage, installing buffer usage metrics that aren't required, and so on.
187    ///
188    #[allow(clippy::print_stderr)]
189    pub async fn standalone_memory(
190        max_events: NonZeroUsize,
191        when_full: WhenFull,
192        receiver_span: &Span,
193    ) -> (BufferSender<T>, BufferReceiver<T>) {
194        let usage_handle = BufferUsageHandle::noop();
195
196        let memory_buffer = Box::new(MemoryBuffer::with_max_events(max_events));
197        let (sender, receiver) = memory_buffer
198            .into_buffer_parts(usage_handle.clone())
199            .await
200            .unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
201
202        let mode = match when_full {
203            WhenFull::Overflow => WhenFull::Block,
204            m => m,
205        };
206        let mut sender = BufferSender::new(sender, mode);
207        sender.with_send_duration_instrumentation(0, receiver_span);
208        let receiver = BufferReceiver::new(receiver);
209
210        (sender, receiver)
211    }
212
213    /// Creates a memory-only buffer topology with the given buffer usage handle.
214    ///
215    /// This is specifically required for the tests that occur under `buffers`, as we assert things
216    /// like channel capacity left, which cannot be done on in-memory v1 buffers as they use the
217    /// more abstract `Sink`-based adapters.
218    ///
219    /// The overflow mode (i.e. `WhenFull`) can be configured to either block or drop the newest
220    /// values, but cannot be configured to use overflow mode.  If overflow mode is selected, it
221    /// will be changed to blocking mode.
222    ///
223    /// This is a convenience method for `vector` as it is used for inter-transform channels, and we
224    /// can simplifying needing to require callers to do all the boilerplate to create the builder,
225    /// create the stage, installing buffer usage metrics that aren't required, and so on.
226    #[cfg(test)]
227    pub async fn standalone_memory_test(
228        max_events: NonZeroUsize,
229        when_full: WhenFull,
230        usage_handle: BufferUsageHandle,
231    ) -> (BufferSender<T>, BufferReceiver<T>) {
232        let memory_buffer = Box::new(MemoryBuffer::with_max_events(max_events));
233        let (sender, receiver) = memory_buffer
234            .into_buffer_parts(usage_handle.clone())
235            .await
236            .unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
237
238        let mode = match when_full {
239            WhenFull::Overflow => WhenFull::Block,
240            m => m,
241        };
242        let mut sender = BufferSender::new(sender, mode);
243        let mut receiver = BufferReceiver::new(receiver);
244
245        sender.with_usage_instrumentation(usage_handle.clone());
246        receiver.with_usage_instrumentation(usage_handle);
247
248        (sender, receiver)
249    }
250}
251
252impl<T: Bufferable> Default for TopologyBuilder<T> {
253    fn default() -> Self {
254        Self { stages: Vec::new() }
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use std::num::NonZeroUsize;
261
262    use tracing::Span;
263
264    use super::TopologyBuilder;
265    use crate::{
266        topology::{
267            builder::TopologyError,
268            test_util::{assert_current_send_capacity, Sample},
269        },
270        variants::MemoryBuffer,
271        WhenFull,
272    };
273
274    #[tokio::test]
275    async fn single_stage_topology_block() {
276        let mut builder = TopologyBuilder::<Sample>::default();
277        builder.stage(
278            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
279            WhenFull::Block,
280        );
281        let result = builder.build(String::from("test"), Span::none()).await;
282        assert!(result.is_ok());
283
284        let (mut sender, _) = result.unwrap();
285        assert_current_send_capacity(&mut sender, Some(1), None);
286    }
287
288    #[tokio::test]
289    async fn single_stage_topology_drop_newest() {
290        let mut builder = TopologyBuilder::<Sample>::default();
291        builder.stage(
292            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
293            WhenFull::DropNewest,
294        );
295        let result = builder.build(String::from("test"), Span::none()).await;
296        assert!(result.is_ok());
297
298        let (mut sender, _) = result.unwrap();
299        assert_current_send_capacity(&mut sender, Some(1), None);
300    }
301
302    #[tokio::test]
303    async fn single_stage_topology_overflow() {
304        let mut builder = TopologyBuilder::<Sample>::default();
305        builder.stage(
306            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
307            WhenFull::Overflow,
308        );
309        let result = builder.build(String::from("test"), Span::none()).await;
310        match result {
311            Err(TopologyError::OverflowWhenLast) => {}
312            r => panic!("unexpected build result: {r:?}"),
313        }
314    }
315
316    #[tokio::test]
317    async fn two_stage_topology_block() {
318        let mut builder = TopologyBuilder::<Sample>::default();
319        builder.stage(
320            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
321            WhenFull::Block,
322        );
323        builder.stage(
324            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
325            WhenFull::Block,
326        );
327        let result = builder.build(String::from("test"), Span::none()).await;
328        match result {
329            Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
330            r => panic!("unexpected build result: {r:?}"),
331        }
332    }
333
334    #[tokio::test]
335    async fn two_stage_topology_drop_newest() {
336        let mut builder = TopologyBuilder::<Sample>::default();
337        builder.stage(
338            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
339            WhenFull::DropNewest,
340        );
341        builder.stage(
342            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
343            WhenFull::Block,
344        );
345        let result = builder.build(String::from("test"), Span::none()).await;
346        match result {
347            Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
348            r => panic!("unexpected build result: {r:?}"),
349        }
350    }
351
352    #[tokio::test]
353    async fn two_stage_topology_overflow() {
354        let mut builder = TopologyBuilder::<Sample>::default();
355        builder.stage(
356            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
357            WhenFull::Overflow,
358        );
359        builder.stage(
360            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
361            WhenFull::Block,
362        );
363
364        let result = builder.build(String::from("test"), Span::none()).await;
365        assert!(result.is_ok());
366
367        let (mut sender, _) = result.unwrap();
368        assert_current_send_capacity(&mut sender, Some(1), Some(1));
369    }
370}