vector_buffers/topology/
builder.rs

1use std::{error::Error, num::NonZeroUsize};
2
3use async_trait::async_trait;
4use snafu::{ResultExt, Snafu};
5use tracing::Span;
6
7use super::channel::{ChannelMetricMetadata, ReceiverAdapter, SenderAdapter};
8use crate::{
9    Bufferable, WhenFull,
10    buffer_usage_data::{BufferUsage, BufferUsageHandle},
11    config::MemoryBufferSize,
12    topology::channel::{BufferReceiver, BufferSender, limited},
13};
14
15/// Value that can be used as a stage in a buffer topology.
16#[async_trait]
17pub trait IntoBuffer<T: Bufferable>: Send {
18    /// Gets whether or not this buffer stage provides its own instrumentation, or if it should be
19    /// instrumented from the outside.
20    ///
21    /// As some buffer stages, like the in-memory channel, never have a chance to catch the values
22    /// in the middle of the channel without introducing an unnecessary hop, [`BufferSender`] and
23    /// [`BufferReceiver`] can be configured to instrument all events flowing through directly.
24    ///
25    /// When instrumentation is provided in this way, [`vector_common::byte_size_of::ByteSizeOf`]
26    ///  is used to calculate the size of the event going both into and out of the buffer.
27    fn provides_instrumentation(&self) -> bool {
28        false
29    }
30
31    /// Converts this value into a sender and receiver pair suitable for use in a buffer topology.
32    async fn into_buffer_parts(
33        self: Box<Self>,
34        usage_handle: BufferUsageHandle,
35    ) -> Result<(SenderAdapter<T>, ReceiverAdapter<T>), Box<dyn Error + Send + Sync>>;
36}
37
38#[derive(Debug, Snafu)]
39pub enum TopologyError {
40    #[snafu(display("buffer topology cannot be empty"))]
41    EmptyTopology,
42    #[snafu(display(
43        "stage {} configured with block/drop newest behavior in front of subsequent stage",
44        stage_idx
45    ))]
46    NextStageNotUsed { stage_idx: usize },
47    #[snafu(display("last stage in buffer topology cannot be set to overflow mode"))]
48    OverflowWhenLast,
49    #[snafu(display("failed to build individual stage {}: {}", stage_idx, source))]
50    FailedToBuildStage {
51        stage_idx: usize,
52        source: Box<dyn Error + Send + Sync>,
53    },
54    #[snafu(display(
55        "multiple components with segmented acknowledgements cannot be used in the same buffer"
56    ))]
57    StackedAcks,
58}
59
60struct TopologyStage<T: Bufferable> {
61    untransformed: Box<dyn IntoBuffer<T>>,
62    when_full: WhenFull,
63}
64
65/// Builder for constructing buffer topologies.
66pub struct TopologyBuilder<T: Bufferable> {
67    stages: Vec<TopologyStage<T>>,
68}
69
70impl<T: Bufferable> TopologyBuilder<T> {
71    /// Adds a new stage to the buffer topology.
72    ///
73    /// The "when full" behavior can be optionally configured here.  If no behavior is specified,
74    /// and an overflow buffer is _not_ added to the topology after this, then the "when full"
75    /// behavior will use a default value of "block".  If a "when full" behavior is specified, and
76    /// an overflow buffer is added to the topology after this, then the specified "when full"
77    /// behavior will be ignored and will be set to "overflow" mode.
78    ///
79    /// Callers can configure what to do when a buffer is full by setting `when_full`.  Three modes
80    /// are available -- block, drop newest, and overflow -- which are documented in more detail by
81    /// [`BufferSender`].
82    ///
83    /// Two notes about what modes are not valid in certain scenarios:
84    /// - the innermost stage (the last stage given to the builder) cannot be set to "overflow" mode,
85    ///   as there is no other stage to overflow to
86    /// - a stage cannot use the "block" or "drop newest" mode when there is a subsequent stage, and
87    ///   must user the "overflow" mode
88    ///
89    /// Any occurrence of either of these scenarios will result in an error during build.
90    pub fn stage<S>(&mut self, stage: S, when_full: WhenFull) -> &mut Self
91    where
92        S: IntoBuffer<T> + 'static,
93    {
94        self.stages.push(TopologyStage {
95            untransformed: Box::new(stage),
96            when_full,
97        });
98        self
99    }
100
101    /// Consumes this builder, returning the sender and receiver that can be used by components.
102    ///
103    /// # Errors
104    ///
105    /// If there was a configuration error with one of the stages, an error variant will be returned
106    /// explaining the issue.
107    pub async fn build(
108        self,
109        buffer_id: String,
110        span: Span,
111    ) -> Result<(BufferSender<T>, BufferReceiver<T>), TopologyError> {
112        // We pop stages off in reverse order to build from the inside out.
113        let mut buffer_usage = BufferUsage::from_span(span.clone());
114        let mut current_stage = None;
115
116        for (stage_idx, stage) in self.stages.into_iter().enumerate().rev() {
117            // Make sure the stage is valid for our current builder state.
118            match stage.when_full {
119                // The innermost stage can't be set to overflow, there's nothing else to overflow _to_.
120                WhenFull::Overflow => {
121                    if current_stage.is_none() {
122                        return Err(TopologyError::OverflowWhenLast);
123                    }
124                }
125                // If there's already an inner stage, then blocking or dropping the newest events
126                // doesn't no sense.  Overflowing is the only valid transition to another stage.
127                WhenFull::Block | WhenFull::DropNewest => {
128                    if current_stage.is_some() {
129                        return Err(TopologyError::NextStageNotUsed { stage_idx });
130                    }
131                }
132            }
133
134            // Create the buffer usage handle for this stage and initialize it as we create the
135            // sender/receiver.  This is slightly awkward since we just end up actually giving
136            // the handle to the `BufferSender`/`BufferReceiver` wrappers, but that's the price we
137            // have to pay for letting each stage function in an opaque way when wrapped.
138            let usage_handle = buffer_usage.add_stage(stage_idx);
139            let provides_instrumentation = stage.untransformed.provides_instrumentation();
140            let (sender, receiver) = stage
141                .untransformed
142                .into_buffer_parts(usage_handle.clone())
143                .await
144                .context(FailedToBuildStageSnafu { stage_idx })?;
145
146            let (mut sender, mut receiver) = match current_stage.take() {
147                None => (
148                    BufferSender::new(sender, stage.when_full),
149                    BufferReceiver::new(receiver),
150                ),
151                Some((current_sender, current_receiver)) => (
152                    BufferSender::with_overflow(sender, current_sender),
153                    BufferReceiver::with_overflow(receiver, current_receiver),
154                ),
155            };
156
157            sender.with_send_duration_instrumentation(stage_idx, &span);
158            if !provides_instrumentation {
159                sender.with_usage_instrumentation(usage_handle.clone());
160                receiver.with_usage_instrumentation(usage_handle);
161            }
162
163            current_stage = Some((sender, receiver));
164        }
165
166        let (sender, receiver) = current_stage.ok_or(TopologyError::EmptyTopology)?;
167
168        // Install the buffer usage handler since we successfully created the buffer topology.  This
169        // spawns it in the background and periodically emits aggregated metrics about each of the
170        // buffer stages.
171        buffer_usage.install(buffer_id.as_str());
172
173        Ok((sender, receiver))
174    }
175}
176
177impl<T: Bufferable> TopologyBuilder<T> {
178    /// Creates a memory-only buffer topology.
179    ///
180    /// The overflow mode (i.e. `WhenFull`) can be configured to either block or drop the newest
181    /// values, but cannot be configured to use overflow mode.  If overflow mode is selected, it
182    /// will be changed to blocking mode.
183    ///
184    /// This is a convenience method for `vector` as it is used for inter-transform channels, and we
185    /// can simplifying needing to require callers to do all the boilerplate to create the builder,
186    /// create the stage, installing buffer usage metrics that aren't required, and so on.
187    ///
188    #[allow(clippy::print_stderr)]
189    pub fn standalone_memory(
190        max_events: NonZeroUsize,
191        when_full: WhenFull,
192        receiver_span: &Span,
193        metadata: Option<ChannelMetricMetadata>,
194    ) -> (BufferSender<T>, BufferReceiver<T>) {
195        let usage_handle = BufferUsageHandle::noop();
196        usage_handle.set_buffer_limits(None, Some(max_events.get()));
197
198        let limit = MemoryBufferSize::MaxEvents(max_events);
199        let (sender, receiver) = limited(limit, metadata);
200
201        let mode = match when_full {
202            WhenFull::Overflow => WhenFull::Block,
203            m => m,
204        };
205        let mut sender = BufferSender::new(sender.into(), mode);
206        sender.with_send_duration_instrumentation(0, receiver_span);
207        let receiver = BufferReceiver::new(receiver.into());
208
209        (sender, receiver)
210    }
211
212    /// Creates a memory-only buffer topology with the given buffer usage handle.
213    ///
214    /// This is specifically required for the tests that occur under `buffers`, as we assert things
215    /// like channel capacity left, which cannot be done on in-memory v1 buffers as they use the
216    /// more abstract `Sink`-based adapters.
217    ///
218    /// The overflow mode (i.e. `WhenFull`) can be configured to either block or drop the newest
219    /// values, but cannot be configured to use overflow mode.  If overflow mode is selected, it
220    /// will be changed to blocking mode.
221    ///
222    /// This is a convenience method for `vector` as it is used for inter-transform channels, and we
223    /// can simplifying needing to require callers to do all the boilerplate to create the builder,
224    /// create the stage, installing buffer usage metrics that aren't required, and so on.
225    #[cfg(test)]
226    pub fn standalone_memory_test(
227        max_events: NonZeroUsize,
228        when_full: WhenFull,
229        usage_handle: BufferUsageHandle,
230        metadata: Option<ChannelMetricMetadata>,
231    ) -> (BufferSender<T>, BufferReceiver<T>) {
232        usage_handle.set_buffer_limits(None, Some(max_events.get()));
233
234        let limit = MemoryBufferSize::MaxEvents(max_events);
235        let (sender, receiver) = limited(limit, metadata);
236
237        let mode = match when_full {
238            WhenFull::Overflow => WhenFull::Block,
239            m => m,
240        };
241        let mut sender = BufferSender::new(sender.into(), mode);
242        let mut receiver = BufferReceiver::new(receiver.into());
243
244        sender.with_usage_instrumentation(usage_handle.clone());
245        receiver.with_usage_instrumentation(usage_handle);
246
247        (sender, receiver)
248    }
249}
250
251impl<T: Bufferable> Default for TopologyBuilder<T> {
252    fn default() -> Self {
253        Self { stages: Vec::new() }
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use std::num::NonZeroUsize;
260
261    use tracing::Span;
262
263    use super::TopologyBuilder;
264    use crate::{
265        WhenFull,
266        topology::{
267            builder::TopologyError,
268            test_util::{Sample, assert_current_send_capacity},
269        },
270        variants::MemoryBuffer,
271    };
272
273    #[tokio::test]
274    async fn single_stage_topology_block() {
275        let mut builder = TopologyBuilder::<Sample>::default();
276        builder.stage(
277            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
278            WhenFull::Block,
279        );
280        let result = builder.build(String::from("test"), Span::none()).await;
281        assert!(result.is_ok());
282
283        let (mut sender, _) = result.unwrap();
284        assert_current_send_capacity(&mut sender, Some(1), None);
285    }
286
287    #[tokio::test]
288    async fn single_stage_topology_drop_newest() {
289        let mut builder = TopologyBuilder::<Sample>::default();
290        builder.stage(
291            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
292            WhenFull::DropNewest,
293        );
294        let result = builder.build(String::from("test"), Span::none()).await;
295        assert!(result.is_ok());
296
297        let (mut sender, _) = result.unwrap();
298        assert_current_send_capacity(&mut sender, Some(1), None);
299    }
300
301    #[tokio::test]
302    async fn single_stage_topology_overflow() {
303        let mut builder = TopologyBuilder::<Sample>::default();
304        builder.stage(
305            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
306            WhenFull::Overflow,
307        );
308        let result = builder.build(String::from("test"), Span::none()).await;
309        match result {
310            Err(TopologyError::OverflowWhenLast) => {}
311            r => panic!("unexpected build result: {r:?}"),
312        }
313    }
314
315    #[tokio::test]
316    async fn two_stage_topology_block() {
317        let mut builder = TopologyBuilder::<Sample>::default();
318        builder.stage(
319            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
320            WhenFull::Block,
321        );
322        builder.stage(
323            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
324            WhenFull::Block,
325        );
326        let result = builder.build(String::from("test"), Span::none()).await;
327        match result {
328            Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
329            r => panic!("unexpected build result: {r:?}"),
330        }
331    }
332
333    #[tokio::test]
334    async fn two_stage_topology_drop_newest() {
335        let mut builder = TopologyBuilder::<Sample>::default();
336        builder.stage(
337            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
338            WhenFull::DropNewest,
339        );
340        builder.stage(
341            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
342            WhenFull::Block,
343        );
344        let result = builder.build(String::from("test"), Span::none()).await;
345        match result {
346            Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
347            r => panic!("unexpected build result: {r:?}"),
348        }
349    }
350
351    #[tokio::test]
352    async fn two_stage_topology_overflow() {
353        let mut builder = TopologyBuilder::<Sample>::default();
354        builder.stage(
355            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
356            WhenFull::Overflow,
357        );
358        builder.stage(
359            MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
360            WhenFull::Block,
361        );
362
363        let result = builder.build(String::from("test"), Span::none()).await;
364        assert!(result.is_ok());
365
366        let (mut sender, _) = result.unwrap();
367        assert_current_send_capacity(&mut sender, Some(1), Some(1));
368    }
369}