1use std::{
2 fmt,
3 num::{NonZeroU64, NonZeroUsize},
4 path::{Path, PathBuf},
5 slice,
6};
7
8use serde::{de, Deserialize, Deserializer, Serialize};
9use snafu::{ResultExt, Snafu};
10use tracing::Span;
11use vector_common::{config::ComponentKey, finalization::Finalizable};
12use vector_config::configurable_component;
13
14use crate::{
15 topology::{
16 builder::{TopologyBuilder, TopologyError},
17 channel::{BufferReceiver, BufferSender},
18 },
19 variants::{DiskV2Buffer, MemoryBuffer},
20 Bufferable, WhenFull,
21};
22
23#[derive(Debug, Snafu)]
24pub enum BufferBuildError {
25 #[snafu(display("the configured buffer type requires `data_dir` be specified"))]
26 RequiresDataDir,
27 #[snafu(display("error occurred when building buffer: {}", source))]
28 FailedToBuildTopology { source: TopologyError },
29 #[snafu(display("`max_events` must be greater than zero"))]
30 InvalidMaxEvents,
31}
32
33#[derive(Deserialize, Serialize)]
34enum BufferTypeKind {
35 #[serde(rename = "memory")]
36 Memory,
37 #[serde(rename = "disk")]
38 DiskV2,
39}
40
41const ALL_FIELDS: [&str; 4] = ["type", "max_events", "max_size", "when_full"];
42
43struct BufferTypeVisitor;
44
45impl BufferTypeVisitor {
46 fn visit_map_impl<'de, A>(mut map: A) -> Result<BufferType, A::Error>
47 where
48 A: de::MapAccess<'de>,
49 {
50 let mut kind: Option<BufferTypeKind> = None;
51 let mut max_events: Option<NonZeroUsize> = None;
52 let mut max_size: Option<NonZeroU64> = None;
53 let mut when_full: Option<WhenFull> = None;
54 while let Some(key) = map.next_key::<String>()? {
55 match key.as_str() {
56 "type" => {
57 if kind.is_some() {
58 return Err(de::Error::duplicate_field("type"));
59 }
60 kind = Some(map.next_value()?);
61 }
62 "max_events" => {
63 if max_events.is_some() {
64 return Err(de::Error::duplicate_field("max_events"));
65 }
66 max_events = Some(map.next_value()?);
67 }
68 "max_size" => {
69 if max_size.is_some() {
70 return Err(de::Error::duplicate_field("max_size"));
71 }
72 max_size = Some(map.next_value()?);
73 }
74 "when_full" => {
75 if when_full.is_some() {
76 return Err(de::Error::duplicate_field("when_full"));
77 }
78 when_full = Some(map.next_value()?);
79 }
80 other => {
81 return Err(de::Error::unknown_field(other, &ALL_FIELDS));
82 }
83 }
84 }
85 let kind = kind.unwrap_or(BufferTypeKind::Memory);
86 let when_full = when_full.unwrap_or_default();
87 match kind {
88 BufferTypeKind::Memory => {
89 let size = match (max_events, max_size) {
90 (Some(_), Some(_)) => {
91 return Err(de::Error::unknown_field(
92 "max_events",
93 &["type", "max_size", "when_full"],
94 ));
95 }
96 (_, Some(max_size)) => {
97 if let Ok(bounded_max_bytes) = usize::try_from(max_size.get()) {
98 MemoryBufferSize::MaxSize(NonZeroUsize::new(bounded_max_bytes).unwrap())
99 } else {
100 return Err(de::Error::invalid_value(
101 de::Unexpected::Unsigned(max_size.into()),
102 &format!(
103 "Value for max_bytes must be a positive integer <= {}",
104 usize::MAX
105 )
106 .as_str(),
107 ));
108 }
109 }
110 _ => MemoryBufferSize::MaxEvents(
111 max_events.unwrap_or_else(memory_buffer_default_max_events),
112 ),
113 };
114 Ok(BufferType::Memory { size, when_full })
115 }
116 BufferTypeKind::DiskV2 => {
117 if max_events.is_some() {
118 return Err(de::Error::unknown_field(
119 "max_events",
120 &["type", "max_size", "when_full"],
121 ));
122 }
123 Ok(BufferType::DiskV2 {
124 max_size: max_size.ok_or_else(|| de::Error::missing_field("max_size"))?,
125 when_full,
126 })
127 }
128 }
129 }
130}
131
132impl<'de> de::Visitor<'de> for BufferTypeVisitor {
133 type Value = BufferType;
134
135 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
136 formatter.write_str("enum BufferType")
137 }
138
139 fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
140 where
141 A: de::MapAccess<'de>,
142 {
143 BufferTypeVisitor::visit_map_impl(map)
144 }
145}
146
147impl<'de> Deserialize<'de> for BufferType {
148 fn deserialize<D>(deserializer: D) -> Result<BufferType, D::Error>
149 where
150 D: Deserializer<'de>,
151 {
152 deserializer.deserialize_map(BufferTypeVisitor)
153 }
154}
155
156pub const fn memory_buffer_default_max_events() -> NonZeroUsize {
157 unsafe { NonZeroUsize::new_unchecked(500) }
158}
159
160#[derive(Debug)]
162pub struct DiskUsage {
163 id: ComponentKey,
164 data_dir: PathBuf,
165 max_size: NonZeroU64,
166}
167
168impl DiskUsage {
169 pub fn new(id: ComponentKey, data_dir: PathBuf, max_size: NonZeroU64) -> Self {
171 Self {
172 id,
173 data_dir,
174 max_size,
175 }
176 }
177
178 pub fn id(&self) -> &ComponentKey {
180 &self.id
181 }
182
183 pub fn max_size(&self) -> u64 {
185 self.max_size.get()
186 }
187
188 pub fn data_dir(&self) -> &Path {
190 self.data_dir.as_path()
191 }
192}
193
194#[configurable_component(no_deser)]
197#[serde(rename_all = "snake_case")]
198#[derive(Clone, Copy, Debug, PartialEq, Eq)]
199pub enum MemoryBufferSize {
200 MaxEvents(#[serde(default = "memory_buffer_default_max_events")] NonZeroUsize),
202
203 MaxSize(#[configurable(metadata(docs::type_unit = "bytes"))] NonZeroUsize),
209}
210
211#[configurable_component(no_deser)]
213#[derive(Clone, Copy, Debug, PartialEq, Eq)]
214#[serde(rename_all = "snake_case", tag = "type")]
215#[configurable(metadata(docs::enum_tag_description = "The type of buffer to use."))]
216pub enum BufferType {
217 #[configurable(title = "Events are buffered in memory.")]
222 Memory {
223 #[serde(flatten)]
225 size: MemoryBufferSize,
226
227 #[configurable(derived)]
228 #[serde(default)]
229 when_full: WhenFull,
230 },
231
232 #[configurable(title = "Events are buffered on disk.")]
239 #[serde(rename = "disk")]
240 DiskV2 {
241 #[configurable(
245 validation(range(min = 268435488)),
246 metadata(docs::type_unit = "bytes")
247 )]
248 max_size: NonZeroU64,
249
250 #[configurable(derived)]
251 #[serde(default)]
252 when_full: WhenFull,
253 },
254}
255
256impl BufferType {
257 pub fn disk_usage(
264 &self,
265 global_data_dir: Option<PathBuf>,
266 id: &ComponentKey,
267 ) -> Option<DiskUsage> {
268 match global_data_dir {
281 None => None,
282 Some(global_data_dir) => match self {
283 Self::Memory { .. } => None,
284 Self::DiskV2 { max_size, .. } => {
285 let data_dir = crate::variants::disk_v2::get_disk_v2_data_dir_path(
286 &global_data_dir,
287 id.id(),
288 );
289
290 Some(DiskUsage::new(id.clone(), data_dir, *max_size))
291 }
292 },
293 }
294 }
295
296 pub fn add_to_builder<T>(
303 &self,
304 builder: &mut TopologyBuilder<T>,
305 data_dir: Option<PathBuf>,
306 id: String,
307 ) -> Result<(), BufferBuildError>
308 where
309 T: Bufferable + Clone + Finalizable,
310 {
311 match *self {
312 BufferType::Memory { size, when_full } => {
313 builder.stage(MemoryBuffer::new(size), when_full);
314 }
315 BufferType::DiskV2 {
316 when_full,
317 max_size,
318 } => {
319 let data_dir = data_dir.ok_or(BufferBuildError::RequiresDataDir)?;
320 builder.stage(DiskV2Buffer::new(id, data_dir, max_size), when_full);
321 }
322 }
323
324 Ok(())
325 }
326}
327
328#[configurable_component]
347#[derive(Clone, Debug, PartialEq, Eq)]
348#[serde(untagged)]
349#[configurable(
350 title = "Configures the buffering behavior for this sink.",
351 description = r#"More information about the individual buffer types, and buffer behavior, can be found in the
352[Buffering Model][buffering_model] section.
353
354[buffering_model]: /docs/architecture/buffering-model/"#
355)]
356pub enum BufferConfig {
357 Single(BufferType),
359
360 Chained(Vec<BufferType>),
362}
363
364impl Default for BufferConfig {
365 fn default() -> Self {
366 Self::Single(BufferType::Memory {
367 size: MemoryBufferSize::MaxEvents(memory_buffer_default_max_events()),
368 when_full: WhenFull::default(),
369 })
370 }
371}
372
373impl BufferConfig {
374 pub fn stages(&self) -> &[BufferType] {
376 match self {
377 Self::Single(stage) => slice::from_ref(stage),
378 Self::Chained(stages) => stages.as_slice(),
379 }
380 }
381
382 #[allow(clippy::needless_pass_by_value)]
395 pub async fn build<T>(
396 &self,
397 data_dir: Option<PathBuf>,
398 buffer_id: String,
399 span: Span,
400 ) -> Result<(BufferSender<T>, BufferReceiver<T>), BufferBuildError>
401 where
402 T: Bufferable + Clone + Finalizable,
403 {
404 let mut builder = TopologyBuilder::default();
405
406 for stage in self.stages() {
407 stage.add_to_builder(&mut builder, data_dir.clone(), buffer_id.clone())?;
408 }
409
410 builder
411 .build(buffer_id, span)
412 .await
413 .context(FailedToBuildTopologySnafu)
414 }
415}
416
417#[cfg(test)]
418mod test {
419 use std::num::{NonZeroU64, NonZeroUsize};
420
421 use crate::{BufferConfig, BufferType, MemoryBufferSize, WhenFull};
422
423 fn check_single_stage(source: &str, expected: BufferType) {
424 let config: BufferConfig = serde_yaml::from_str(source).unwrap();
425 assert_eq!(config.stages().len(), 1);
426 let actual = config.stages().first().unwrap();
427 assert_eq!(actual, &expected);
428 }
429
430 fn check_multiple_stages(source: &str, expected_stages: &[BufferType]) {
431 let config: BufferConfig = serde_yaml::from_str(source).unwrap();
432 assert_eq!(config.stages().len(), expected_stages.len());
433 for (actual, expected) in config.stages().iter().zip(expected_stages) {
434 assert_eq!(actual, expected);
435 }
436 }
437
438 const BUFFER_CONFIG_NO_MATCH_ERR: &str =
439 "data did not match any variant of untagged enum BufferConfig";
440
441 #[test]
442 fn parse_empty() {
443 let source = "";
444 let error = serde_yaml::from_str::<BufferConfig>(source).unwrap_err();
445 assert_eq!(error.to_string(), BUFFER_CONFIG_NO_MATCH_ERR);
446 }
447
448 #[test]
449 fn parse_only_invalid_keys() {
450 let source = "foo: 314";
451 let error = serde_yaml::from_str::<BufferConfig>(source).unwrap_err();
452 assert_eq!(error.to_string(), BUFFER_CONFIG_NO_MATCH_ERR);
453 }
454
455 #[test]
456 fn parse_partial_invalid_keys() {
457 let source = r"max_size: 100
458max_events: 42
459";
460 let error = serde_yaml::from_str::<BufferConfig>(source).unwrap_err();
461 assert_eq!(error.to_string(), BUFFER_CONFIG_NO_MATCH_ERR);
462 }
463
464 #[test]
465 fn parse_without_type_tag() {
466 check_single_stage(
467 r"
468 max_events: 100
469 ",
470 BufferType::Memory {
471 size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
472 when_full: WhenFull::Block,
473 },
474 );
475 }
476
477 #[test]
478 fn parse_memory_with_byte_size_option() {
479 check_single_stage(
480 r"
481 max_size: 4096
482 ",
483 BufferType::Memory {
484 size: MemoryBufferSize::MaxSize(NonZeroUsize::new(4096).unwrap()),
485 when_full: WhenFull::Block,
486 },
487 );
488 }
489
490 #[test]
491 fn parse_multiple_stages() {
492 check_multiple_stages(
493 r"
494 - max_events: 42
495 - max_events: 100
496 when_full: drop_newest
497 ",
498 &[
499 BufferType::Memory {
500 size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(42).unwrap()),
501 when_full: WhenFull::Block,
502 },
503 BufferType::Memory {
504 size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
505 when_full: WhenFull::DropNewest,
506 },
507 ],
508 );
509 }
510
511 #[test]
512 fn ensure_field_defaults_for_all_types() {
513 check_single_stage(
514 r"
515 type: memory
516 ",
517 BufferType::Memory {
518 size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(500).unwrap()),
519 when_full: WhenFull::Block,
520 },
521 );
522
523 check_single_stage(
524 r"
525 type: memory
526 max_events: 100
527 ",
528 BufferType::Memory {
529 size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
530 when_full: WhenFull::Block,
531 },
532 );
533
534 check_single_stage(
535 r"
536 type: memory
537 when_full: drop_newest
538 ",
539 BufferType::Memory {
540 size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(500).unwrap()),
541 when_full: WhenFull::DropNewest,
542 },
543 );
544
545 check_single_stage(
546 r"
547 type: memory
548 when_full: overflow
549 ",
550 BufferType::Memory {
551 size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(500).unwrap()),
552 when_full: WhenFull::Overflow,
553 },
554 );
555
556 check_single_stage(
557 r"
558 type: disk
559 max_size: 1024
560 ",
561 BufferType::DiskV2 {
562 max_size: NonZeroU64::new(1024).unwrap(),
563 when_full: WhenFull::Block,
564 },
565 );
566 }
567}