vector_buffers/
config.rs

1use std::{
2    fmt,
3    num::{NonZeroU64, NonZeroUsize},
4    path::{Path, PathBuf},
5    slice,
6};
7
8use serde::{de, Deserialize, Deserializer, Serialize};
9use snafu::{ResultExt, Snafu};
10use tracing::Span;
11use vector_common::{config::ComponentKey, finalization::Finalizable};
12use vector_config::configurable_component;
13
14use crate::{
15    topology::{
16        builder::{TopologyBuilder, TopologyError},
17        channel::{BufferReceiver, BufferSender},
18    },
19    variants::{DiskV2Buffer, MemoryBuffer},
20    Bufferable, WhenFull,
21};
22
23#[derive(Debug, Snafu)]
24pub enum BufferBuildError {
25    #[snafu(display("the configured buffer type requires `data_dir` be specified"))]
26    RequiresDataDir,
27    #[snafu(display("error occurred when building buffer: {}", source))]
28    FailedToBuildTopology { source: TopologyError },
29    #[snafu(display("`max_events` must be greater than zero"))]
30    InvalidMaxEvents,
31}
32
33#[derive(Deserialize, Serialize)]
34enum BufferTypeKind {
35    #[serde(rename = "memory")]
36    Memory,
37    #[serde(rename = "disk")]
38    DiskV2,
39}
40
41const ALL_FIELDS: [&str; 4] = ["type", "max_events", "max_size", "when_full"];
42
43struct BufferTypeVisitor;
44
45impl BufferTypeVisitor {
46    fn visit_map_impl<'de, A>(mut map: A) -> Result<BufferType, A::Error>
47    where
48        A: de::MapAccess<'de>,
49    {
50        let mut kind: Option<BufferTypeKind> = None;
51        let mut max_events: Option<NonZeroUsize> = None;
52        let mut max_size: Option<NonZeroU64> = None;
53        let mut when_full: Option<WhenFull> = None;
54        while let Some(key) = map.next_key::<String>()? {
55            match key.as_str() {
56                "type" => {
57                    if kind.is_some() {
58                        return Err(de::Error::duplicate_field("type"));
59                    }
60                    kind = Some(map.next_value()?);
61                }
62                "max_events" => {
63                    if max_events.is_some() {
64                        return Err(de::Error::duplicate_field("max_events"));
65                    }
66                    max_events = Some(map.next_value()?);
67                }
68                "max_size" => {
69                    if max_size.is_some() {
70                        return Err(de::Error::duplicate_field("max_size"));
71                    }
72                    max_size = Some(map.next_value()?);
73                }
74                "when_full" => {
75                    if when_full.is_some() {
76                        return Err(de::Error::duplicate_field("when_full"));
77                    }
78                    when_full = Some(map.next_value()?);
79                }
80                other => {
81                    return Err(de::Error::unknown_field(other, &ALL_FIELDS));
82                }
83            }
84        }
85        let kind = kind.unwrap_or(BufferTypeKind::Memory);
86        let when_full = when_full.unwrap_or_default();
87        match kind {
88            BufferTypeKind::Memory => {
89                let size = match (max_events, max_size) {
90                    (Some(_), Some(_)) => {
91                        return Err(de::Error::unknown_field(
92                            "max_events",
93                            &["type", "max_size", "when_full"],
94                        ));
95                    }
96                    (_, Some(max_size)) => {
97                        if let Ok(bounded_max_bytes) = usize::try_from(max_size.get()) {
98                            MemoryBufferSize::MaxSize(NonZeroUsize::new(bounded_max_bytes).unwrap())
99                        } else {
100                            return Err(de::Error::invalid_value(
101                                de::Unexpected::Unsigned(max_size.into()),
102                                &format!(
103                                    "Value for max_bytes must be a positive integer <= {}",
104                                    usize::MAX
105                                )
106                                .as_str(),
107                            ));
108                        }
109                    }
110                    _ => MemoryBufferSize::MaxEvents(
111                        max_events.unwrap_or_else(memory_buffer_default_max_events),
112                    ),
113                };
114                Ok(BufferType::Memory { size, when_full })
115            }
116            BufferTypeKind::DiskV2 => {
117                if max_events.is_some() {
118                    return Err(de::Error::unknown_field(
119                        "max_events",
120                        &["type", "max_size", "when_full"],
121                    ));
122                }
123                Ok(BufferType::DiskV2 {
124                    max_size: max_size.ok_or_else(|| de::Error::missing_field("max_size"))?,
125                    when_full,
126                })
127            }
128        }
129    }
130}
131
132impl<'de> de::Visitor<'de> for BufferTypeVisitor {
133    type Value = BufferType;
134
135    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
136        formatter.write_str("enum BufferType")
137    }
138
139    fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
140    where
141        A: de::MapAccess<'de>,
142    {
143        BufferTypeVisitor::visit_map_impl(map)
144    }
145}
146
147impl<'de> Deserialize<'de> for BufferType {
148    fn deserialize<D>(deserializer: D) -> Result<BufferType, D::Error>
149    where
150        D: Deserializer<'de>,
151    {
152        deserializer.deserialize_map(BufferTypeVisitor)
153    }
154}
155
156pub const fn memory_buffer_default_max_events() -> NonZeroUsize {
157    unsafe { NonZeroUsize::new_unchecked(500) }
158}
159
160/// Disk usage configuration for disk-backed buffers.
161#[derive(Debug)]
162pub struct DiskUsage {
163    id: ComponentKey,
164    data_dir: PathBuf,
165    max_size: NonZeroU64,
166}
167
168impl DiskUsage {
169    /// Creates a new `DiskUsage` with the given usage configuration.
170    pub fn new(id: ComponentKey, data_dir: PathBuf, max_size: NonZeroU64) -> Self {
171        Self {
172            id,
173            data_dir,
174            max_size,
175        }
176    }
177
178    /// Gets the component key for the component this buffer is attached to.
179    pub fn id(&self) -> &ComponentKey {
180        &self.id
181    }
182
183    /// Gets the maximum size, in bytes, that this buffer can consume on disk.
184    pub fn max_size(&self) -> u64 {
185        self.max_size.get()
186    }
187
188    /// Gets the data directory path that this buffer will store its files on disk.
189    pub fn data_dir(&self) -> &Path {
190        self.data_dir.as_path()
191    }
192}
193
194/// Enumeration to define exactly what terms the bounds of the buffer is expressed in: length, or
195/// `byte_size`.
196#[configurable_component(no_deser)]
197#[serde(rename_all = "snake_case")]
198#[derive(Clone, Copy, Debug, PartialEq, Eq)]
199pub enum MemoryBufferSize {
200    /// The maximum number of events allowed in the buffer.
201    MaxEvents(#[serde(default = "memory_buffer_default_max_events")] NonZeroUsize),
202
203    // Doc string is duplicated here as a workaround due to a name collision with the `max_size`
204    // field with the DiskV2 variant of `BufferType`.
205    /// The maximum allowed amount of allocated memory the buffer can hold.
206    ///
207    /// If `type = "disk"` then must be at least ~256 megabytes (268435488 bytes).
208    MaxSize(#[configurable(metadata(docs::type_unit = "bytes"))] NonZeroUsize),
209}
210
211/// A specific type of buffer stage.
212#[configurable_component(no_deser)]
213#[derive(Clone, Copy, Debug, PartialEq, Eq)]
214#[serde(rename_all = "snake_case", tag = "type")]
215#[configurable(metadata(docs::enum_tag_description = "The type of buffer to use."))]
216pub enum BufferType {
217    /// A buffer stage backed by an in-memory channel provided by `tokio`.
218    ///
219    /// This is more performant, but less durable. Data will be lost if Vector is restarted
220    /// forcefully or crashes.
221    #[configurable(title = "Events are buffered in memory.")]
222    Memory {
223        /// The terms around how to express buffering limits, can be in size or bytes_size.
224        #[serde(flatten)]
225        size: MemoryBufferSize,
226
227        #[configurable(derived)]
228        #[serde(default)]
229        when_full: WhenFull,
230    },
231
232    /// A buffer stage backed by disk.
233    ///
234    /// This is less performant, but more durable. Data that has been synchronized to disk will not
235    /// be lost if Vector is restarted forcefully or crashes.
236    ///
237    /// Data is synchronized to disk every 500ms.
238    #[configurable(title = "Events are buffered on disk.")]
239    #[serde(rename = "disk")]
240    DiskV2 {
241        /// The maximum size of the buffer on disk.
242        ///
243        /// Must be at least ~256 megabytes (268435488 bytes).
244        #[configurable(
245            validation(range(min = 268435488)),
246            metadata(docs::type_unit = "bytes")
247        )]
248        max_size: NonZeroU64,
249
250        #[configurable(derived)]
251        #[serde(default)]
252        when_full: WhenFull,
253    },
254}
255
256impl BufferType {
257    /// Gets the metadata around disk usage by the buffer, if supported.
258    ///
259    /// For buffer types that write to disk, `Some(value)` is returned with their usage metadata,
260    /// such as maximum size and data directory path.
261    ///
262    /// Otherwise, `None` is returned.
263    pub fn disk_usage(
264        &self,
265        global_data_dir: Option<PathBuf>,
266        id: &ComponentKey,
267    ) -> Option<DiskUsage> {
268        // All disk-backed buffers require the global data directory to be specified, and
269        // non-disk-backed buffers do not require it to be set... so if it's not set here, we ignore
270        // it because either:
271        // - it's a non-disk-backed buffer, in which case we can just ignore, or
272        // - this method is being called at a point before we actually check that a global data
273        //   directory is specified because we have a disk buffer present
274        //
275        // Since we're not able to emit/surface errors about a lack of a global data directory from
276        // where this method is called, we simply return `None` to let it reach the code that _does_
277        // emit/surface those errors... and once those errors are fixed, this code can return valid
278        // disk usage information, which will then be validated and emit any errors for _that_
279        // aspect.
280        match global_data_dir {
281            None => None,
282            Some(global_data_dir) => match self {
283                Self::Memory { .. } => None,
284                Self::DiskV2 { max_size, .. } => {
285                    let data_dir = crate::variants::disk_v2::get_disk_v2_data_dir_path(
286                        &global_data_dir,
287                        id.id(),
288                    );
289
290                    Some(DiskUsage::new(id.clone(), data_dir, *max_size))
291                }
292            },
293        }
294    }
295
296    /// Adds this buffer type as a stage to an existing [`TopologyBuilder`].
297    ///
298    /// # Errors
299    ///
300    /// If a required parameter is missing, or if there is an error building the topology itself, an
301    /// error variant will be returned describing the error
302    pub fn add_to_builder<T>(
303        &self,
304        builder: &mut TopologyBuilder<T>,
305        data_dir: Option<PathBuf>,
306        id: String,
307    ) -> Result<(), BufferBuildError>
308    where
309        T: Bufferable + Clone + Finalizable,
310    {
311        match *self {
312            BufferType::Memory { size, when_full } => {
313                builder.stage(MemoryBuffer::new(size), when_full);
314            }
315            BufferType::DiskV2 {
316                when_full,
317                max_size,
318            } => {
319                let data_dir = data_dir.ok_or(BufferBuildError::RequiresDataDir)?;
320                builder.stage(DiskV2Buffer::new(id, data_dir, max_size), when_full);
321            }
322        }
323
324        Ok(())
325    }
326}
327
328/// Buffer configuration.
329///
330/// Buffers are compromised of stages(*) that form a buffer _topology_, with input items being
331/// subject to configurable behavior when each stage reaches configured limits.  Buffers are
332/// configured for sinks, where backpressure from the sink can be handled by the buffer.  This
333/// allows absorbing temporary load, or potentially adding write-ahead-log behavior to a sink to
334/// increase the durability of a given Vector pipeline.
335///
336/// While we use the term "buffer topology" here, a buffer topology is referred to by the more
337/// common "buffer" or "buffers" shorthand.  This is related to buffers originally being a single
338/// component, where you could only choose which buffer type to use.  As we expand buffer
339/// functionality to allow chaining buffers together, you'll see "buffer topology" used in internal
340/// documentation to correctly reflect the internal structure.
341///
342// TODO: We need to limit chained buffers to only allowing a single copy of each buffer type to be
343// defined, otherwise, for example, two instances of the same disk buffer type in a single chained
344// buffer topology would try to both open the same buffer files on disk, which wouldn't work or
345// would go horribly wrong.
346#[configurable_component]
347#[derive(Clone, Debug, PartialEq, Eq)]
348#[serde(untagged)]
349#[configurable(
350    title = "Configures the buffering behavior for this sink.",
351    description = r#"More information about the individual buffer types, and buffer behavior, can be found in the
352[Buffering Model][buffering_model] section.
353
354[buffering_model]: /docs/architecture/buffering-model/"#
355)]
356pub enum BufferConfig {
357    /// A single stage buffer topology.
358    Single(BufferType),
359
360    /// A chained buffer topology.
361    Chained(Vec<BufferType>),
362}
363
364impl Default for BufferConfig {
365    fn default() -> Self {
366        Self::Single(BufferType::Memory {
367            size: MemoryBufferSize::MaxEvents(memory_buffer_default_max_events()),
368            when_full: WhenFull::default(),
369        })
370    }
371}
372
373impl BufferConfig {
374    /// Gets all of the configured stages for this buffer.
375    pub fn stages(&self) -> &[BufferType] {
376        match self {
377            Self::Single(stage) => slice::from_ref(stage),
378            Self::Chained(stages) => stages.as_slice(),
379        }
380    }
381
382    /// Builds the buffer components represented by this configuration.
383    ///
384    /// The caller gets back a `Sink` and `Stream` implementation that represent a way to push items
385    /// into the buffer, as well as pop items out of the buffer, respectively.
386    ///
387    /// # Errors
388    ///
389    /// If the buffer is configured with anything other than a single stage, an error variant will
390    /// be thrown.
391    ///
392    /// If a disk buffer stage is configured and the data directory provided is `None`, an error
393    /// variant will be thrown.
394    #[allow(clippy::needless_pass_by_value)]
395    pub async fn build<T>(
396        &self,
397        data_dir: Option<PathBuf>,
398        buffer_id: String,
399        span: Span,
400    ) -> Result<(BufferSender<T>, BufferReceiver<T>), BufferBuildError>
401    where
402        T: Bufferable + Clone + Finalizable,
403    {
404        let mut builder = TopologyBuilder::default();
405
406        for stage in self.stages() {
407            stage.add_to_builder(&mut builder, data_dir.clone(), buffer_id.clone())?;
408        }
409
410        builder
411            .build(buffer_id, span)
412            .await
413            .context(FailedToBuildTopologySnafu)
414    }
415}
416
417#[cfg(test)]
418mod test {
419    use std::num::{NonZeroU64, NonZeroUsize};
420
421    use crate::{BufferConfig, BufferType, MemoryBufferSize, WhenFull};
422
423    fn check_single_stage(source: &str, expected: BufferType) {
424        let config: BufferConfig = serde_yaml::from_str(source).unwrap();
425        assert_eq!(config.stages().len(), 1);
426        let actual = config.stages().first().unwrap();
427        assert_eq!(actual, &expected);
428    }
429
430    fn check_multiple_stages(source: &str, expected_stages: &[BufferType]) {
431        let config: BufferConfig = serde_yaml::from_str(source).unwrap();
432        assert_eq!(config.stages().len(), expected_stages.len());
433        for (actual, expected) in config.stages().iter().zip(expected_stages) {
434            assert_eq!(actual, expected);
435        }
436    }
437
438    const BUFFER_CONFIG_NO_MATCH_ERR: &str =
439        "data did not match any variant of untagged enum BufferConfig";
440
441    #[test]
442    fn parse_empty() {
443        let source = "";
444        let error = serde_yaml::from_str::<BufferConfig>(source).unwrap_err();
445        assert_eq!(error.to_string(), BUFFER_CONFIG_NO_MATCH_ERR);
446    }
447
448    #[test]
449    fn parse_only_invalid_keys() {
450        let source = "foo: 314";
451        let error = serde_yaml::from_str::<BufferConfig>(source).unwrap_err();
452        assert_eq!(error.to_string(), BUFFER_CONFIG_NO_MATCH_ERR);
453    }
454
455    #[test]
456    fn parse_partial_invalid_keys() {
457        let source = r"max_size: 100
458max_events: 42
459";
460        let error = serde_yaml::from_str::<BufferConfig>(source).unwrap_err();
461        assert_eq!(error.to_string(), BUFFER_CONFIG_NO_MATCH_ERR);
462    }
463
464    #[test]
465    fn parse_without_type_tag() {
466        check_single_stage(
467            r"
468          max_events: 100
469          ",
470            BufferType::Memory {
471                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
472                when_full: WhenFull::Block,
473            },
474        );
475    }
476
477    #[test]
478    fn parse_memory_with_byte_size_option() {
479        check_single_stage(
480            r"
481        max_size: 4096
482        ",
483            BufferType::Memory {
484                size: MemoryBufferSize::MaxSize(NonZeroUsize::new(4096).unwrap()),
485                when_full: WhenFull::Block,
486            },
487        );
488    }
489
490    #[test]
491    fn parse_multiple_stages() {
492        check_multiple_stages(
493            r"
494          - max_events: 42
495          - max_events: 100
496            when_full: drop_newest
497          ",
498            &[
499                BufferType::Memory {
500                    size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(42).unwrap()),
501                    when_full: WhenFull::Block,
502                },
503                BufferType::Memory {
504                    size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
505                    when_full: WhenFull::DropNewest,
506                },
507            ],
508        );
509    }
510
511    #[test]
512    fn ensure_field_defaults_for_all_types() {
513        check_single_stage(
514            r"
515          type: memory
516          ",
517            BufferType::Memory {
518                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(500).unwrap()),
519                when_full: WhenFull::Block,
520            },
521        );
522
523        check_single_stage(
524            r"
525          type: memory
526          max_events: 100
527          ",
528            BufferType::Memory {
529                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
530                when_full: WhenFull::Block,
531            },
532        );
533
534        check_single_stage(
535            r"
536          type: memory
537          when_full: drop_newest
538          ",
539            BufferType::Memory {
540                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(500).unwrap()),
541                when_full: WhenFull::DropNewest,
542            },
543        );
544
545        check_single_stage(
546            r"
547          type: memory
548          when_full: overflow
549          ",
550            BufferType::Memory {
551                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(500).unwrap()),
552                when_full: WhenFull::Overflow,
553            },
554        );
555
556        check_single_stage(
557            r"
558          type: disk
559          max_size: 1024
560          ",
561            BufferType::DiskV2 {
562                max_size: NonZeroU64::new(1024).unwrap(),
563                when_full: WhenFull::Block,
564            },
565        );
566    }
567}