1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
use std::{error::Error, num::NonZeroUsize};

use async_trait::async_trait;
use snafu::{ResultExt, Snafu};
use tracing::Span;

use super::channel::{ReceiverAdapter, SenderAdapter};
use crate::{
    buffer_usage_data::{BufferUsage, BufferUsageHandle},
    topology::channel::{BufferReceiver, BufferSender},
    variants::MemoryBuffer,
    Bufferable, WhenFull,
};

/// Value that can be used as a stage in a buffer topology.
#[async_trait]
pub trait IntoBuffer<T: Bufferable>: Send {
    /// Gets whether or not this buffer stage provides its own instrumentation, or if it should be
    /// instrumented from the outside.
    ///
    /// As some buffer stages, like the in-memory channel, never have a chance to catch the values
    /// in the middle of the channel without introducing an unnecessary hop, [`BufferSender`] and
    /// [`BufferReceiver`] can be configured to instrument all events flowing through directly.
    ///
    /// When instrumentation is provided in this way, [`vector_common::byte_size_of::ByteSizeOf`]
    ///  is used to calculate the size of the event going both into and out of the buffer.
    fn provides_instrumentation(&self) -> bool {
        false
    }

    /// Converts this value into a sender and receiver pair suitable for use in a buffer topology.
    async fn into_buffer_parts(
        self: Box<Self>,
        usage_handle: BufferUsageHandle,
    ) -> Result<(SenderAdapter<T>, ReceiverAdapter<T>), Box<dyn Error + Send + Sync>>;
}

#[derive(Debug, Snafu)]
pub enum TopologyError {
    #[snafu(display("buffer topology cannot be empty"))]
    EmptyTopology,
    #[snafu(display(
        "stage {} configured with block/drop newest behavior in front of subsequent stage",
        stage_idx
    ))]
    NextStageNotUsed { stage_idx: usize },
    #[snafu(display("last stage in buffer topology cannot be set to overflow mode"))]
    OverflowWhenLast,
    #[snafu(display("failed to build individual stage {}: {}", stage_idx, source))]
    FailedToBuildStage {
        stage_idx: usize,
        source: Box<dyn Error + Send + Sync>,
    },
    #[snafu(display(
        "multiple components with segmented acknowledgements cannot be used in the same buffer"
    ))]
    StackedAcks,
}

struct TopologyStage<T: Bufferable> {
    untransformed: Box<dyn IntoBuffer<T>>,
    when_full: WhenFull,
}

/// Builder for constructing buffer topologies.
pub struct TopologyBuilder<T: Bufferable> {
    stages: Vec<TopologyStage<T>>,
}

impl<T: Bufferable> TopologyBuilder<T> {
    /// Adds a new stage to the buffer topology.
    ///
    /// The "when full" behavior can be optionally configured here.  If no behavior is specified,
    /// and an overflow buffer is _not_ added to the topology after this, then the "when full"
    /// behavior will use a default value of "block".  If a "when full" behavior is specified, and
    /// an overflow buffer is added to the topology after this, then the specified "when full"
    /// behavior will be ignored and will be set to "overflow" mode.
    ///
    /// Callers can configure what to do when a buffer is full by setting `when_full`.  Three modes
    /// are available -- block, drop newest, and overflow -- which are documented in more detail by
    /// [`BufferSender`].
    ///
    /// Two notes about what modes are not valid in certain scenarios:
    /// - the innermost stage (the last stage given to the builder) cannot be set to "overflow" mode,
    ///   as there is no other stage to overflow to
    /// - a stage cannot use the "block" or "drop newest" mode when there is a subsequent stage, and
    ///   must user the "overflow" mode
    ///
    /// Any occurrence of either of these scenarios will result in an error during build.
    pub fn stage<S>(&mut self, stage: S, when_full: WhenFull) -> &mut Self
    where
        S: IntoBuffer<T> + 'static,
    {
        self.stages.push(TopologyStage {
            untransformed: Box::new(stage),
            when_full,
        });
        self
    }

    /// Consumes this builder, returning the sender and receiver that can be used by components.
    ///
    /// # Errors
    ///
    /// If there was a configuration error with one of the stages, an error variant will be returned
    /// explaining the issue.
    pub async fn build(
        self,
        buffer_id: String,
        span: Span,
    ) -> Result<(BufferSender<T>, BufferReceiver<T>), TopologyError> {
        // We pop stages off in reverse order to build from the inside out.
        let mut buffer_usage = BufferUsage::from_span(span.clone());
        let mut current_stage = None;

        for (stage_idx, stage) in self.stages.into_iter().enumerate().rev() {
            // Make sure the stage is valid for our current builder state.
            match stage.when_full {
                // The innermost stage can't be set to overflow, there's nothing else to overflow _to_.
                WhenFull::Overflow => {
                    if current_stage.is_none() {
                        return Err(TopologyError::OverflowWhenLast);
                    }
                }
                // If there's already an inner stage, then blocking or dropping the newest events
                // doesn't no sense.  Overflowing is the only valid transition to another stage.
                WhenFull::Block | WhenFull::DropNewest => {
                    if current_stage.is_some() {
                        return Err(TopologyError::NextStageNotUsed { stage_idx });
                    }
                }
            };

            // Create the buffer usage handle for this stage and initialize it as we create the
            // sender/receiver.  This is slightly awkward since we just end up actually giving
            // the handle to the `BufferSender`/`BufferReceiver` wrappers, but that's the price we
            // have to pay for letting each stage function in an opaque way when wrapped.
            let usage_handle = buffer_usage.add_stage(stage_idx);
            let provides_instrumentation = stage.untransformed.provides_instrumentation();
            let (sender, receiver) = stage
                .untransformed
                .into_buffer_parts(usage_handle.clone())
                .await
                .context(FailedToBuildStageSnafu { stage_idx })?;

            let (mut sender, mut receiver) = match current_stage.take() {
                None => (
                    BufferSender::new(sender, stage.when_full),
                    BufferReceiver::new(receiver),
                ),
                Some((current_sender, current_receiver)) => (
                    BufferSender::with_overflow(sender, current_sender),
                    BufferReceiver::with_overflow(receiver, current_receiver),
                ),
            };

            sender.with_send_duration_instrumentation(stage_idx, &span);
            if !provides_instrumentation {
                sender.with_usage_instrumentation(usage_handle.clone());
                receiver.with_usage_instrumentation(usage_handle);
            }

            current_stage = Some((sender, receiver));
        }

        let (sender, receiver) = current_stage.ok_or(TopologyError::EmptyTopology)?;

        // Install the buffer usage handler since we successfully created the buffer topology.  This
        // spawns it in the background and periodically emits aggregated metrics about each of the
        // buffer stages.
        buffer_usage.install(buffer_id.as_str());

        Ok((sender, receiver))
    }
}

impl<T: Bufferable> TopologyBuilder<T> {
    /// Creates a memory-only buffer topology.
    ///
    /// The overflow mode (i.e. `WhenFull`) can be configured to either block or drop the newest
    /// values, but cannot be configured to use overflow mode.  If overflow mode is selected, it
    /// will be changed to blocking mode.
    ///
    /// This is a convenience method for `vector` as it is used for inter-transform channels, and we
    /// can simplifying needing to require callers to do all the boilerplate to create the builder,
    /// create the stage, installing buffer usage metrics that aren't required, and so on.
    ///
    #[allow(clippy::print_stderr)]
    pub async fn standalone_memory(
        max_events: NonZeroUsize,
        when_full: WhenFull,
        receiver_span: &Span,
    ) -> (BufferSender<T>, BufferReceiver<T>) {
        let usage_handle = BufferUsageHandle::noop();

        let memory_buffer = Box::new(MemoryBuffer::new(max_events));
        let (sender, receiver) = memory_buffer
            .into_buffer_parts(usage_handle.clone())
            .await
            .unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));

        let mode = match when_full {
            WhenFull::Overflow => WhenFull::Block,
            m => m,
        };
        let mut sender = BufferSender::new(sender, mode);
        sender.with_send_duration_instrumentation(0, receiver_span);
        let receiver = BufferReceiver::new(receiver);

        (sender, receiver)
    }

    /// Creates a memory-only buffer topology with the given buffer usage handle.
    ///
    /// This is specifically required for the tests that occur under `buffers`, as we assert things
    /// like channel capacity left, which cannot be done on in-memory v1 buffers as they use the
    /// more abstract `Sink`-based adapters.
    ///
    /// The overflow mode (i.e. `WhenFull`) can be configured to either block or drop the newest
    /// values, but cannot be configured to use overflow mode.  If overflow mode is selected, it
    /// will be changed to blocking mode.
    ///
    /// This is a convenience method for `vector` as it is used for inter-transform channels, and we
    /// can simplifying needing to require callers to do all the boilerplate to create the builder,
    /// create the stage, installing buffer usage metrics that aren't required, and so on.
    #[cfg(test)]
    pub async fn standalone_memory_test(
        max_events: NonZeroUsize,
        when_full: WhenFull,
        usage_handle: BufferUsageHandle,
    ) -> (BufferSender<T>, BufferReceiver<T>) {
        let memory_buffer = Box::new(MemoryBuffer::new(max_events));
        let (sender, receiver) = memory_buffer
            .into_buffer_parts(usage_handle.clone())
            .await
            .unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));

        let mode = match when_full {
            WhenFull::Overflow => WhenFull::Block,
            m => m,
        };
        let mut sender = BufferSender::new(sender, mode);
        let mut receiver = BufferReceiver::new(receiver);

        sender.with_usage_instrumentation(usage_handle.clone());
        receiver.with_usage_instrumentation(usage_handle);

        (sender, receiver)
    }
}

impl<T: Bufferable> Default for TopologyBuilder<T> {
    fn default() -> Self {
        Self { stages: Vec::new() }
    }
}

#[cfg(test)]
mod tests {
    use std::num::NonZeroUsize;

    use tracing::Span;

    use super::TopologyBuilder;
    use crate::{
        topology::builder::TopologyError,
        topology::test_util::{assert_current_send_capacity, Sample},
        variants::MemoryBuffer,
        WhenFull,
    };

    #[tokio::test]
    async fn single_stage_topology_block() {
        let mut builder = TopologyBuilder::<Sample>::default();
        builder.stage(
            MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
            WhenFull::Block,
        );
        let result = builder.build(String::from("test"), Span::none()).await;
        assert!(result.is_ok());

        let (mut sender, _) = result.unwrap();
        assert_current_send_capacity(&mut sender, Some(1), None);
    }

    #[tokio::test]
    async fn single_stage_topology_drop_newest() {
        let mut builder = TopologyBuilder::<Sample>::default();
        builder.stage(
            MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
            WhenFull::DropNewest,
        );
        let result = builder.build(String::from("test"), Span::none()).await;
        assert!(result.is_ok());

        let (mut sender, _) = result.unwrap();
        assert_current_send_capacity(&mut sender, Some(1), None);
    }

    #[tokio::test]
    async fn single_stage_topology_overflow() {
        let mut builder = TopologyBuilder::<Sample>::default();
        builder.stage(
            MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
            WhenFull::Overflow,
        );
        let result = builder.build(String::from("test"), Span::none()).await;
        match result {
            Err(TopologyError::OverflowWhenLast) => {}
            r => panic!("unexpected build result: {r:?}"),
        }
    }

    #[tokio::test]
    async fn two_stage_topology_block() {
        let mut builder = TopologyBuilder::<Sample>::default();
        builder.stage(
            MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
            WhenFull::Block,
        );
        builder.stage(
            MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
            WhenFull::Block,
        );
        let result = builder.build(String::from("test"), Span::none()).await;
        match result {
            Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
            r => panic!("unexpected build result: {r:?}"),
        }
    }

    #[tokio::test]
    async fn two_stage_topology_drop_newest() {
        let mut builder = TopologyBuilder::<Sample>::default();
        builder.stage(
            MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
            WhenFull::DropNewest,
        );
        builder.stage(
            MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
            WhenFull::Block,
        );
        let result = builder.build(String::from("test"), Span::none()).await;
        match result {
            Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
            r => panic!("unexpected build result: {r:?}"),
        }
    }

    #[tokio::test]
    async fn two_stage_topology_overflow() {
        let mut builder = TopologyBuilder::<Sample>::default();
        builder.stage(
            MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
            WhenFull::Overflow,
        );
        builder.stage(
            MemoryBuffer::new(NonZeroUsize::new(1).unwrap()),
            WhenFull::Block,
        );

        let result = builder.build(String::from("test"), Span::none()).await;
        assert!(result.is_ok());

        let (mut sender, _) = result.unwrap();
        assert_current_send_capacity(&mut sender, Some(1), Some(1));
    }
}