vector_core/config/
mod.rs

1use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc};
2
3use bitmask_enum::bitmask;
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6
7mod global_options;
8mod log_schema;
9pub(crate) mod metrics_expiration;
10pub mod output_id;
11pub mod proxy;
12mod telemetry;
13
14pub use global_options::{GlobalOptions, WildcardMatching};
15pub use log_schema::{LogSchema, init_log_schema, log_schema};
16use lookup::{PathPrefix, lookup_v2::ValuePath, path};
17pub use output_id::OutputId;
18use serde::{Deserialize, Serialize};
19pub use telemetry::{Tags, Telemetry, init_telemetry, telemetry};
20pub use vector_common::config::ComponentKey;
21use vector_config::configurable_component;
22use vrl::value::Value;
23
24use crate::{event::LogEvent, schema};
25
26pub const MEMORY_BUFFER_DEFAULT_MAX_EVENTS: NonZeroUsize =
27    vector_buffers::config::memory_buffer_default_max_events();
28
29// This enum should be kept alphabetically sorted as the bitmask value is used when
30// sorting sources by data type in the GraphQL API.
31#[bitmask(u8)]
32#[bitmask_config(flags_iter)]
33pub enum DataType {
34    Log,
35    Metric,
36    Trace,
37}
38
39impl fmt::Display for DataType {
40    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41        f.debug_list()
42            .entries(
43                Self::flags().filter_map(|&(name, value)| self.contains(value).then_some(name)),
44            )
45            .finish()
46    }
47}
48
49#[derive(Debug, Clone, PartialEq)]
50pub struct Input {
51    ty: DataType,
52    log_schema_requirement: schema::Requirement,
53}
54
55impl Input {
56    pub fn data_type(&self) -> DataType {
57        self.ty
58    }
59
60    pub fn schema_requirement(&self) -> &schema::Requirement {
61        &self.log_schema_requirement
62    }
63
64    pub fn new(ty: DataType) -> Self {
65        Self {
66            ty,
67            log_schema_requirement: schema::Requirement::empty(),
68        }
69    }
70
71    pub fn log() -> Self {
72        Self {
73            ty: DataType::Log,
74            log_schema_requirement: schema::Requirement::empty(),
75        }
76    }
77
78    pub fn metric() -> Self {
79        Self {
80            ty: DataType::Metric,
81            log_schema_requirement: schema::Requirement::empty(),
82        }
83    }
84
85    pub fn trace() -> Self {
86        Self {
87            ty: DataType::Trace,
88            log_schema_requirement: schema::Requirement::empty(),
89        }
90    }
91
92    pub fn all() -> Self {
93        Self {
94            ty: DataType::all_bits(),
95            log_schema_requirement: schema::Requirement::empty(),
96        }
97    }
98
99    /// Set the schema requirement for this output.
100    #[must_use]
101    pub fn with_schema_requirement(mut self, schema_requirement: schema::Requirement) -> Self {
102        self.log_schema_requirement = schema_requirement;
103        self
104    }
105}
106
107#[derive(Debug, Clone, PartialEq)]
108pub struct SourceOutput {
109    pub port: Option<String>,
110    pub ty: DataType,
111
112    // NOTE: schema definitions are only implemented/supported for log-type events. There is no
113    // inherent blocker to support other types as well, but it'll require additional work to add
114    // the relevant schemas, and store them separately in this type.
115    pub schema_definition: Option<Arc<schema::Definition>>,
116}
117
118impl SourceOutput {
119    /// Create a `SourceOutput` of the given data type that contains a single output `Definition`.
120    /// If the data type does not contain logs, the schema definition will be ignored.
121    /// Designed for use in log sources.
122    #[must_use]
123    pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
124        let schema_definition = ty
125            .contains(DataType::Log)
126            .then(|| Arc::new(schema_definition));
127
128        Self {
129            port: None,
130            ty,
131            schema_definition,
132        }
133    }
134
135    /// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
136    /// Designed for use in metrics sources.
137    ///
138    /// Sets the datatype to be [`DataType::Metric`].
139    #[must_use]
140    pub fn new_metrics() -> Self {
141        Self {
142            port: None,
143            ty: DataType::Metric,
144            schema_definition: None,
145        }
146    }
147
148    /// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
149    /// Designed for use in trace sources.
150    ///
151    /// Sets the datatype to be [`DataType::Trace`].
152    #[must_use]
153    pub fn new_traces() -> Self {
154        Self {
155            port: None,
156            ty: DataType::Trace,
157            schema_definition: None,
158        }
159    }
160
161    /// Return the schema [`schema::Definition`] from this output.
162    ///
163    /// Takes a `schema_enabled` flag to determine if the full definition including the fields
164    /// and associated types should be returned, or if a simple definition should be returned.
165    /// A simple definition is just the default for the namespace. For the Vector namespace the
166    /// meanings are included.
167    /// Schema enabled is set in the users configuration.
168    #[must_use]
169    pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
170        use std::ops::Deref;
171
172        self.schema_definition.as_ref().map(|definition| {
173            if schema_enabled {
174                definition.deref().clone()
175            } else {
176                let mut new_definition =
177                    schema::Definition::default_for_namespace(definition.log_namespaces());
178                new_definition.add_meanings(definition.meanings());
179                new_definition
180            }
181        })
182    }
183}
184
185impl SourceOutput {
186    /// Set the port name for this `SourceOutput`.
187    #[must_use]
188    pub fn with_port(mut self, name: impl Into<String>) -> Self {
189        self.port = Some(name.into());
190        self
191    }
192}
193
194fn fmt_helper(
195    f: &mut fmt::Formatter<'_>,
196    maybe_port: Option<&String>,
197    data_type: DataType,
198) -> fmt::Result {
199    match maybe_port {
200        Some(port) => write!(f, "port: \"{port}\",",),
201        None => write!(f, "port: None,"),
202    }?;
203    write!(f, " types: {data_type}")
204}
205
206impl fmt::Display for SourceOutput {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        fmt_helper(f, self.port.as_ref(), self.ty)
209    }
210}
211
212#[derive(Debug, Clone, PartialEq)]
213pub struct TransformOutput {
214    pub port: Option<String>,
215    pub ty: DataType,
216
217    /// For *transforms* if `Datatype` is [`DataType::Log`], if schema is
218    /// enabled, at least one definition  should be output. If the transform
219    /// has multiple connected sources, it is possible to have multiple output
220    /// definitions - one for each input.
221    pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
222}
223
224impl fmt::Display for TransformOutput {
225    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226        fmt_helper(f, self.port.as_ref(), self.ty)
227    }
228}
229
230impl TransformOutput {
231    /// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
232    /// Designed for use in transforms.
233    #[must_use]
234    pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
235        Self {
236            port: None,
237            ty,
238            log_schema_definitions: schema_definitions,
239        }
240    }
241
242    /// Set the port name for this `Output`.
243    #[must_use]
244    pub fn with_port(mut self, name: impl Into<String>) -> Self {
245        self.port = Some(name.into());
246        self
247    }
248
249    /// Return the schema [`schema::Definition`] from this output.
250    ///
251    /// Takes a `schema_enabled` flag to determine if the full definition including the fields
252    /// and associated types should be returned, or if a simple definition should be returned.
253    /// A simple definition is just the default for the namespace. For the Vector namespace the
254    /// meanings are included.
255    /// Schema enabled is set in the users configuration.
256    #[must_use]
257    pub fn schema_definitions(
258        &self,
259        schema_enabled: bool,
260    ) -> HashMap<OutputId, schema::Definition> {
261        if schema_enabled {
262            self.log_schema_definitions.clone()
263        } else {
264            self.log_schema_definitions
265                .iter()
266                .map(|(output, definition)| {
267                    let mut new_definition =
268                        schema::Definition::default_for_namespace(definition.log_namespaces());
269                    new_definition.add_meanings(definition.meanings());
270                    (output.clone(), new_definition)
271                })
272                .collect()
273        }
274    }
275}
276
277/// Simple utility function that can be used by transforms that make no changes to
278/// the schema definitions of events.
279/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`].
280pub fn clone_input_definitions(
281    input_definitions: &[(OutputId, schema::Definition)],
282) -> HashMap<OutputId, schema::Definition> {
283    input_definitions
284        .iter()
285        .map(|(output, definition)| (output.clone(), definition.clone()))
286        .collect()
287}
288
289/// Source-specific end-to-end acknowledgements configuration.
290///
291/// This type exists solely to provide a source-specific description of the `acknowledgements`
292/// setting, as it is deprecated, and we still need to maintain a way to expose it in the
293/// documentation before it's removed while also making sure people know it shouldn't be used.
294#[configurable_component]
295#[configurable(deprecated)]
296#[configurable(title = "Controls how acknowledgements are handled by this source.")]
297#[configurable(
298    description = "This setting is **deprecated** in favor of enabling `acknowledgements` at the [global][global_acks] or sink level.
299
300Enabling or disabling acknowledgements at the source level has **no effect** on acknowledgement behavior.
301
302See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
303
304[global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
305[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
306)]
307#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
308pub struct SourceAcknowledgementsConfig {
309    /// Whether or not end-to-end acknowledgements are enabled for this source.
310    enabled: Option<bool>,
311}
312
313impl SourceAcknowledgementsConfig {
314    pub const DEFAULT: Self = Self { enabled: None };
315
316    #[must_use]
317    pub fn merge_default(&self, other: &Self) -> Self {
318        let enabled = self.enabled.or(other.enabled);
319        Self { enabled }
320    }
321
322    pub fn enabled(&self) -> bool {
323        self.enabled.unwrap_or(false)
324    }
325}
326
327impl From<Option<bool>> for SourceAcknowledgementsConfig {
328    fn from(enabled: Option<bool>) -> Self {
329        Self { enabled }
330    }
331}
332
333impl From<bool> for SourceAcknowledgementsConfig {
334    fn from(enabled: bool) -> Self {
335        Some(enabled).into()
336    }
337}
338
339impl From<SourceAcknowledgementsConfig> for AcknowledgementsConfig {
340    fn from(config: SourceAcknowledgementsConfig) -> Self {
341        Self {
342            enabled: config.enabled,
343        }
344    }
345}
346
347/// End-to-end acknowledgements configuration.
348#[configurable_component]
349#[configurable(title = "Controls how acknowledgements are handled for this sink.")]
350#[configurable(
351    description = "See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
352
353[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
354)]
355#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
356pub struct AcknowledgementsConfig {
357    /// Controls whether or not end-to-end acknowledgements are enabled.
358    ///
359    /// When enabled for a sink, any source that supports end-to-end
360    /// acknowledgements that is connected to that sink waits for events
361    /// to be acknowledged by **all connected sinks** before acknowledging them at the source.
362    ///
363    /// Enabling or disabling acknowledgements at the sink level takes precedence over any global
364    /// [`acknowledgements`][global_acks] configuration.
365    ///
366    /// [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
367    enabled: Option<bool>,
368}
369
370impl AcknowledgementsConfig {
371    pub const DEFAULT: Self = Self { enabled: None };
372
373    #[must_use]
374    pub fn merge_default(&self, other: &Self) -> Self {
375        let enabled = self.enabled.or(other.enabled);
376        Self { enabled }
377    }
378
379    pub fn enabled(&self) -> bool {
380        self.enabled.unwrap_or(false)
381    }
382}
383
384impl From<Option<bool>> for AcknowledgementsConfig {
385    fn from(enabled: Option<bool>) -> Self {
386        Self { enabled }
387    }
388}
389
390impl From<bool> for AcknowledgementsConfig {
391    fn from(enabled: bool) -> Self {
392        Some(enabled).into()
393    }
394}
395
396#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq, Default)]
397pub enum LogNamespace {
398    /// Vector native namespacing
399    ///
400    /// Deserialized data is placed in the root of the event.
401    /// Extra data is placed in "event metadata"
402    Vector,
403
404    /// This is the legacy namespacing.
405    ///
406    /// All data is set in the root of the event. Since this can lead
407    /// to collisions, deserialized data has priority over metadata
408    #[default]
409    Legacy,
410}
411
412/// The user-facing config for log namespace is a bool (enabling or disabling the "Log Namespacing" feature).
413/// Internally, this is converted to a enum.
414impl From<bool> for LogNamespace {
415    fn from(x: bool) -> Self {
416        if x {
417            LogNamespace::Vector
418        } else {
419            LogNamespace::Legacy
420        }
421    }
422}
423
424/// A shortcut to specify no `LegacyKey` should be used (since otherwise a turbofish would be required)
425pub const NO_LEGACY_KEY: Option<LegacyKey<&'static str>> = None;
426
427pub enum LegacyKey<T> {
428    /// Always insert the data, even if the field previously existed
429    Overwrite(T),
430    /// Only insert the data if the field is currently empty
431    InsertIfEmpty(T),
432}
433
434impl LogNamespace {
435    /// Vector: This is added to "event metadata", nested under the source name.
436    ///
437    /// Legacy: This is stored on the event root, only if a field with that name doesn't already exist.
438    pub fn insert_source_metadata<'a>(
439        &self,
440        source_name: &'a str,
441        log: &mut LogEvent,
442        legacy_key: Option<LegacyKey<impl ValuePath<'a>>>,
443        metadata_key: impl ValuePath<'a>,
444        value: impl Into<Value>,
445    ) {
446        match self {
447            LogNamespace::Vector => {
448                log.metadata_mut()
449                    .value_mut()
450                    .insert(path!(source_name).concat(metadata_key), value);
451            }
452            LogNamespace::Legacy => match legacy_key {
453                None => { /* don't insert */ }
454                Some(LegacyKey::Overwrite(key)) => {
455                    log.insert((PathPrefix::Event, key), value);
456                }
457                Some(LegacyKey::InsertIfEmpty(key)) => {
458                    log.try_insert((PathPrefix::Event, key), value);
459                }
460            },
461        }
462    }
463
464    /// Vector: This is retrieved from the "event metadata", nested under the source name.
465    ///
466    /// Legacy: This is retrieved from the event.
467    pub fn get_source_metadata<'a, 'b>(
468        &self,
469        source_name: &'a str,
470        log: &'b LogEvent,
471        legacy_key: impl ValuePath<'a>,
472        metadata_key: impl ValuePath<'a>,
473    ) -> Option<&'b Value> {
474        match self {
475            LogNamespace::Vector => log
476                .metadata()
477                .value()
478                .get(path!(source_name).concat(metadata_key)),
479            LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
480        }
481    }
482
483    /// Vector: The `ingest_timestamp`, and `source_type` fields are added to "event metadata", nested
484    /// under the name "vector". This data will be marked as read-only in VRL.
485    ///
486    /// Legacy: The values of `source_type_key`, and `timestamp_key` are stored as keys on the event root,
487    /// only if a field with that name doesn't already exist.
488    pub fn insert_standard_vector_source_metadata(
489        &self,
490        log: &mut LogEvent,
491        source_name: &'static str,
492        now: DateTime<Utc>,
493    ) {
494        self.insert_vector_metadata(
495            log,
496            log_schema().source_type_key(),
497            path!("source_type"),
498            Bytes::from_static(source_name.as_bytes()),
499        );
500        self.insert_vector_metadata(
501            log,
502            log_schema().timestamp_key(),
503            path!("ingest_timestamp"),
504            now,
505        );
506    }
507
508    /// Vector: This is added to the "event metadata", nested under the name "vector". This data
509    /// will be marked as read-only in VRL.
510    ///
511    /// Legacy: This is stored on the event root, only if a field with that name doesn't already exist.
512    pub fn insert_vector_metadata<'a>(
513        &self,
514        log: &mut LogEvent,
515        legacy_key: Option<impl ValuePath<'a>>,
516        metadata_key: impl ValuePath<'a>,
517        value: impl Into<Value>,
518    ) {
519        match self {
520            LogNamespace::Vector => {
521                log.metadata_mut()
522                    .value_mut()
523                    .insert(path!("vector").concat(metadata_key), value);
524            }
525            LogNamespace::Legacy => {
526                if let Some(legacy_key) = legacy_key {
527                    log.try_insert((PathPrefix::Event, legacy_key), value);
528                }
529            }
530        }
531    }
532
533    /// Vector: This is retrieved from the "event metadata", nested under the name "vector".
534    ///
535    /// Legacy: This is retrieved from the event.
536    pub fn get_vector_metadata<'a, 'b>(
537        &self,
538        log: &'b LogEvent,
539        legacy_key: impl ValuePath<'a>,
540        metadata_key: impl ValuePath<'a>,
541    ) -> Option<&'b Value> {
542        match self {
543            LogNamespace::Vector => log
544                .metadata()
545                .value()
546                .get(path!("vector").concat(metadata_key)),
547            LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
548        }
549    }
550
551    pub fn new_log_from_data(&self, value: impl Into<Value>) -> LogEvent {
552        match self {
553            LogNamespace::Vector | LogNamespace::Legacy => LogEvent::from(value.into()),
554        }
555    }
556
557    // combine a global (self) and local value to get the actual namespace
558    #[must_use]
559    pub fn merge(&self, override_value: Option<impl Into<LogNamespace>>) -> LogNamespace {
560        override_value.map_or(*self, Into::into)
561    }
562}
563
564#[cfg(test)]
565mod test {
566    use chrono::Utc;
567    use lookup::{OwnedTargetPath, event_path, owned_value_path};
568    use vector_common::btreemap;
569    use vrl::value::Kind;
570
571    use super::*;
572    use crate::event::LogEvent;
573
574    #[test]
575    fn test_insert_standard_vector_source_metadata() {
576        let mut schema = LogSchema::default();
577        schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
578            "a", "b", "c", "d"
579        ))));
580        init_log_schema(schema, false);
581
582        let namespace = LogNamespace::Legacy;
583        let mut event = LogEvent::from("log");
584        namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now());
585
586        assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
587    }
588
589    #[test]
590    fn test_source_definitions_legacy() {
591        let definition = schema::Definition::empty_legacy_namespace()
592            .with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
593            .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
594        let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
595
596        let valid_event = LogEvent::from(Value::from(btreemap! {
597            "zork" => "norknoog",
598            "nork" => 32
599        }))
600        .into();
601
602        let invalid_event = LogEvent::from(Value::from(btreemap! {
603            "nork" => 32
604        }))
605        .into();
606
607        // Get a definition with schema enabled.
608        let new_definition = output.schema_definition(true).unwrap();
609
610        // Meanings should still exist.
611        assert_eq!(
612            Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
613            new_definition.meaning_path("zork")
614        );
615
616        // Events should have the schema validated.
617        new_definition.assert_valid_for_event(&valid_event);
618        new_definition.assert_invalid_for_event(&invalid_event);
619
620        // There should be the default legacy definition without schemas enabled.
621        assert_eq!(
622            Some(
623                schema::Definition::default_legacy_namespace()
624                    .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork")
625            ),
626            output.schema_definition(false)
627        );
628    }
629
630    #[test]
631    fn test_source_definitons_vector() {
632        let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
633            .with_metadata_field(
634                &owned_value_path!("vector", "zork"),
635                Kind::integer(),
636                Some("zork"),
637            )
638            .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
639
640        let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
641
642        let mut valid_event = LogEvent::from(Value::from(btreemap! {
643            "nork" => 32
644        }));
645
646        valid_event
647            .metadata_mut()
648            .value_mut()
649            .insert(path!("vector").concat("zork"), 32);
650
651        let valid_event = valid_event.into();
652
653        let mut invalid_event = LogEvent::from(Value::from(btreemap! {
654            "nork" => 32
655        }));
656
657        invalid_event
658            .metadata_mut()
659            .value_mut()
660            .insert(path!("vector").concat("zork"), "noog");
661
662        let invalid_event = invalid_event.into();
663
664        // Get a definition with schema enabled.
665        let new_definition = output.schema_definition(true).unwrap();
666
667        // Meanings should still exist.
668        assert_eq!(
669            Some(&OwnedTargetPath::metadata(owned_value_path!(
670                "vector", "zork"
671            ))),
672            new_definition.meaning_path("zork")
673        );
674
675        // Events should have the schema validated.
676        new_definition.assert_valid_for_event(&valid_event);
677        new_definition.assert_invalid_for_event(&invalid_event);
678
679        // Get a definition without schema enabled.
680        let new_definition = output.schema_definition(false).unwrap();
681
682        // Meanings should still exist.
683        assert_eq!(
684            Some(&OwnedTargetPath::metadata(owned_value_path!(
685                "vector", "zork"
686            ))),
687            new_definition.meaning_path("zork")
688        );
689
690        // Events should not have the schema validated.
691        new_definition.assert_valid_for_event(&valid_event);
692        new_definition.assert_valid_for_event(&invalid_event);
693    }
694
695    #[test]
696    fn test_new_log_source_ignores_definition_with_metric_data_type() {
697        let definition = schema::Definition::any();
698        let output = SourceOutput::new_maybe_logs(DataType::Metric, definition);
699        assert_eq!(output.schema_definition(true), None);
700    }
701
702    #[test]
703    fn test_new_log_source_uses_definition_with_log_data_type() {
704        let definition = schema::Definition::any();
705        let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
706        assert_eq!(output.schema_definition(true), Some(definition));
707    }
708}