vector_core/config/
mod.rs

1use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc};
2
3use bitmask_enum::bitmask;
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6
7mod global_options;
8mod log_schema;
9pub(crate) mod metrics_expiration;
10pub mod output_id;
11pub mod proxy;
12mod telemetry;
13
14pub use global_options::{GlobalOptions, WildcardMatching};
15pub use log_schema::{LogSchema, init_log_schema, log_schema};
16use lookup::{PathPrefix, lookup_v2::ValuePath, path};
17pub use output_id::OutputId;
18use serde::{Deserialize, Serialize};
19pub use telemetry::{Tags, Telemetry, init_telemetry, telemetry};
20pub use vector_common::config::ComponentKey;
21use vector_config::configurable_component;
22use vrl::value::Value;
23
24use crate::{event::LogEvent, schema};
25
26pub const MEMORY_BUFFER_DEFAULT_MAX_EVENTS: NonZeroUsize =
27    vector_buffers::config::memory_buffer_default_max_events();
28
29// This enum should be kept alphabetically sorted as the bitmask value is used when
30// sorting sources by data type in the GraphQL API.
31#[bitmask(u8)]
32#[bitmask_config(flags_iter)]
33pub enum DataType {
34    Log,
35    Metric,
36    Trace,
37}
38
39impl fmt::Display for DataType {
40    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41        f.debug_list()
42            .entries(
43                Self::flags().filter_map(|&(name, value)| self.contains(value).then_some(name)),
44            )
45            .finish()
46    }
47}
48
49#[derive(Debug, Clone, PartialEq)]
50pub struct Input {
51    ty: DataType,
52    log_schema_requirement: schema::Requirement,
53}
54
55impl Input {
56    pub fn data_type(&self) -> DataType {
57        self.ty
58    }
59
60    pub fn schema_requirement(&self) -> &schema::Requirement {
61        &self.log_schema_requirement
62    }
63
64    pub fn new(ty: DataType) -> Self {
65        Self {
66            ty,
67            log_schema_requirement: schema::Requirement::empty(),
68        }
69    }
70
71    pub fn log() -> Self {
72        Self {
73            ty: DataType::Log,
74            log_schema_requirement: schema::Requirement::empty(),
75        }
76    }
77
78    pub fn metric() -> Self {
79        Self {
80            ty: DataType::Metric,
81            log_schema_requirement: schema::Requirement::empty(),
82        }
83    }
84
85    pub fn trace() -> Self {
86        Self {
87            ty: DataType::Trace,
88            log_schema_requirement: schema::Requirement::empty(),
89        }
90    }
91
92    pub fn all() -> Self {
93        Self {
94            ty: DataType::all_bits(),
95            log_schema_requirement: schema::Requirement::empty(),
96        }
97    }
98
99    /// Set the schema requirement for this output.
100    #[must_use]
101    pub fn with_schema_requirement(mut self, schema_requirement: schema::Requirement) -> Self {
102        self.log_schema_requirement = schema_requirement;
103        self
104    }
105}
106
107#[derive(Debug, Clone, PartialEq)]
108pub struct SourceOutput {
109    pub port: Option<String>,
110    pub ty: DataType,
111
112    // NOTE: schema definitions are only implemented/supported for log-type events. There is no
113    // inherent blocker to support other types as well, but it'll require additional work to add
114    // the relevant schemas, and store them separately in this type.
115    pub schema_definition: Option<Arc<schema::Definition>>,
116}
117
118impl SourceOutput {
119    /// Create a `SourceOutput` of the given data type that contains a single output `Definition`.
120    /// If the data type does not contain logs, the schema definition will be ignored.
121    /// Designed for use in log sources.
122    #[must_use]
123    pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
124        let schema_definition = ty
125            .contains(DataType::Log)
126            .then(|| Arc::new(schema_definition));
127
128        Self {
129            port: None,
130            ty,
131            schema_definition,
132        }
133    }
134
135    /// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
136    /// Designed for use in metrics sources.
137    ///
138    /// Sets the datatype to be [`DataType::Metric`].
139    #[must_use]
140    pub fn new_metrics() -> Self {
141        Self {
142            port: None,
143            ty: DataType::Metric,
144            schema_definition: None,
145        }
146    }
147
148    /// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
149    /// Designed for use in trace sources.
150    ///
151    /// Sets the datatype to be [`DataType::Trace`].
152    #[must_use]
153    pub fn new_traces() -> Self {
154        Self {
155            port: None,
156            ty: DataType::Trace,
157            schema_definition: None,
158        }
159    }
160
161    /// Return the schema [`schema::Definition`] from this output.
162    ///
163    /// Takes a `schema_enabled` flag to determine if the full definition including the fields
164    /// and associated types should be returned, or if a simple definition should be returned.
165    /// A simple definition is just the default for the namespace. For the Vector namespace the
166    /// meanings are included.
167    /// Schema enabled is set in the users configuration.
168    #[must_use]
169    pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
170        use std::ops::Deref;
171
172        self.schema_definition.as_ref().map(|definition| {
173            if schema_enabled {
174                definition.deref().clone()
175            } else {
176                let mut new_definition =
177                    schema::Definition::default_for_namespace(definition.log_namespaces());
178                new_definition.add_meanings(definition.meanings());
179                new_definition
180            }
181        })
182    }
183}
184
185impl SourceOutput {
186    /// Set the port name for this `SourceOutput`.
187    #[must_use]
188    pub fn with_port(mut self, name: impl Into<String>) -> Self {
189        self.port = Some(name.into());
190        self
191    }
192}
193
194fn fmt_helper(
195    f: &mut fmt::Formatter<'_>,
196    maybe_port: Option<&String>,
197    data_type: DataType,
198) -> fmt::Result {
199    match maybe_port {
200        Some(port) => write!(f, "port: \"{port}\",",),
201        None => write!(f, "port: None,"),
202    }?;
203    write!(f, " types: {data_type}")
204}
205
206impl fmt::Display for SourceOutput {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        fmt_helper(f, self.port.as_ref(), self.ty)
209    }
210}
211
212#[derive(Debug, Clone, PartialEq)]
213pub struct TransformOutput {
214    pub port: Option<String>,
215    pub ty: DataType,
216
217    /// For *transforms* if `Datatype` is [`DataType::Log`], if schema is
218    /// enabled, at least one definition  should be output. If the transform
219    /// has multiple connected sources, it is possible to have multiple output
220    /// definitions - one for each input.
221    pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
222}
223
224impl fmt::Display for TransformOutput {
225    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226        fmt_helper(f, self.port.as_ref(), self.ty)
227    }
228}
229
230impl TransformOutput {
231    /// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
232    /// Designed for use in transforms.
233    #[must_use]
234    pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
235        Self {
236            port: None,
237            ty,
238            log_schema_definitions: schema_definitions,
239        }
240    }
241
242    /// Set the port name for this `Output`.
243    #[must_use]
244    pub fn with_port(mut self, name: impl Into<String>) -> Self {
245        self.port = Some(name.into());
246        self
247    }
248
249    /// Return the schema [`schema::Definition`] from this output.
250    ///
251    /// Takes a `schema_enabled` flag to determine if the full definition including the fields
252    /// and associated types should be returned, or if a simple definition should be returned.
253    /// A simple definition is just the default for the namespace. For the Vector namespace the
254    /// meanings are included.
255    /// Schema enabled is set in the users configuration.
256    #[must_use]
257    pub fn schema_definitions(
258        &self,
259        schema_enabled: bool,
260    ) -> HashMap<OutputId, schema::Definition> {
261        if schema_enabled {
262            self.log_schema_definitions.clone()
263        } else {
264            self.log_schema_definitions
265                .iter()
266                .map(|(output, definition)| {
267                    let mut new_definition =
268                        schema::Definition::default_for_namespace(definition.log_namespaces());
269                    new_definition.add_meanings(definition.meanings());
270                    (output.clone(), new_definition)
271                })
272                .collect()
273        }
274    }
275}
276
277/// Simple utility function that can be used by transforms that make no changes to
278/// the schema definitions of events.
279/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`].
280pub fn clone_input_definitions(
281    input_definitions: &[(OutputId, schema::Definition)],
282) -> HashMap<OutputId, schema::Definition> {
283    input_definitions
284        .iter()
285        .map(|(output, definition)| (output.clone(), definition.clone()))
286        .collect()
287}
288
289/// Source-specific end-to-end acknowledgements configuration.
290///
291/// This type exists solely to provide a source-specific description of the `acknowledgements`
292/// setting, as it is deprecated, and we still need to maintain a way to expose it in the
293/// documentation before it's removed while also making sure people know it shouldn't be used.
294#[configurable_component]
295#[configurable(deprecated)]
296#[configurable(title = "Controls how acknowledgements are handled by this source.")]
297#[configurable(
298    description = "This setting is **deprecated** in favor of enabling `acknowledgements` at the [global][global_acks] or sink level.
299
300Enabling or disabling acknowledgements at the source level has **no effect** on acknowledgement behavior.
301
302See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
303
304[global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
305[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
306)]
307#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
308pub struct SourceAcknowledgementsConfig {
309    /// Whether or not end-to-end acknowledgements are enabled for this source.
310    enabled: Option<bool>,
311}
312
313impl SourceAcknowledgementsConfig {
314    pub const DEFAULT: Self = Self { enabled: None };
315
316    #[must_use]
317    pub fn merge_default(&self, other: &Self) -> Self {
318        let enabled = self.enabled.or(other.enabled);
319        Self { enabled }
320    }
321
322    pub fn enabled(&self) -> bool {
323        self.enabled.unwrap_or(false)
324    }
325}
326
327impl From<Option<bool>> for SourceAcknowledgementsConfig {
328    fn from(enabled: Option<bool>) -> Self {
329        Self { enabled }
330    }
331}
332
333impl From<bool> for SourceAcknowledgementsConfig {
334    fn from(enabled: bool) -> Self {
335        Some(enabled).into()
336    }
337}
338
339impl From<SourceAcknowledgementsConfig> for AcknowledgementsConfig {
340    fn from(config: SourceAcknowledgementsConfig) -> Self {
341        Self {
342            enabled: config.enabled,
343        }
344    }
345}
346
347/// End-to-end acknowledgements configuration.
348#[configurable_component]
349#[configurable(title = "Controls how acknowledgements are handled for this sink.")]
350#[configurable(
351    description = "See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
352
353[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
354)]
355#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
356pub struct AcknowledgementsConfig {
357    /// Whether or not end-to-end acknowledgements are enabled.
358    ///
359    /// When enabled for a sink, any source that supports end-to-end
360    /// acknowledgements that is connected to that sink waits for events
361    /// to be acknowledged by **all connected sinks** before acknowledging them at the source.
362    ///
363    /// Enabling or disabling acknowledgements at the sink level takes precedence over any global
364    /// [`acknowledgements`][global_acks] configuration.
365    ///
366    /// [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
367    enabled: Option<bool>,
368}
369
370impl AcknowledgementsConfig {
371    pub const DEFAULT: Self = Self { enabled: None };
372
373    #[must_use]
374    pub fn merge_default(&self, other: &Self) -> Self {
375        let enabled = self.enabled.or(other.enabled);
376        Self { enabled }
377    }
378
379    pub fn enabled(&self) -> bool {
380        self.enabled.unwrap_or(false)
381    }
382}
383
384impl From<Option<bool>> for AcknowledgementsConfig {
385    fn from(enabled: Option<bool>) -> Self {
386        Self { enabled }
387    }
388}
389
390impl From<bool> for AcknowledgementsConfig {
391    fn from(enabled: bool) -> Self {
392        Some(enabled).into()
393    }
394}
395
396#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq)]
397pub enum LogNamespace {
398    /// Vector native namespacing
399    ///
400    /// Deserialized data is placed in the root of the event.
401    /// Extra data is placed in "event metadata"
402    Vector,
403
404    /// This is the legacy namespacing.
405    ///
406    /// All data is set in the root of the event. Since this can lead
407    /// to collisions, deserialized data has priority over metadata
408    Legacy,
409}
410
411/// The user-facing config for log namespace is a bool (enabling or disabling the "Log Namespacing" feature).
412/// Internally, this is converted to a enum.
413impl From<bool> for LogNamespace {
414    fn from(x: bool) -> Self {
415        if x {
416            LogNamespace::Vector
417        } else {
418            LogNamespace::Legacy
419        }
420    }
421}
422
423impl Default for LogNamespace {
424    fn default() -> Self {
425        Self::Legacy
426    }
427}
428
429/// A shortcut to specify no `LegacyKey` should be used (since otherwise a turbofish would be required)
430pub const NO_LEGACY_KEY: Option<LegacyKey<&'static str>> = None;
431
432pub enum LegacyKey<T> {
433    /// Always insert the data, even if the field previously existed
434    Overwrite(T),
435    /// Only insert the data if the field is currently empty
436    InsertIfEmpty(T),
437}
438
439impl LogNamespace {
440    /// Vector: This is added to "event metadata", nested under the source name.
441    ///
442    /// Legacy: This is stored on the event root, only if a field with that name doesn't already exist.
443    pub fn insert_source_metadata<'a>(
444        &self,
445        source_name: &'a str,
446        log: &mut LogEvent,
447        legacy_key: Option<LegacyKey<impl ValuePath<'a>>>,
448        metadata_key: impl ValuePath<'a>,
449        value: impl Into<Value>,
450    ) {
451        match self {
452            LogNamespace::Vector => {
453                log.metadata_mut()
454                    .value_mut()
455                    .insert(path!(source_name).concat(metadata_key), value);
456            }
457            LogNamespace::Legacy => match legacy_key {
458                None => { /* don't insert */ }
459                Some(LegacyKey::Overwrite(key)) => {
460                    log.insert((PathPrefix::Event, key), value);
461                }
462                Some(LegacyKey::InsertIfEmpty(key)) => {
463                    log.try_insert((PathPrefix::Event, key), value);
464                }
465            },
466        }
467    }
468
469    /// Vector: This is retrieved from the "event metadata", nested under the source name.
470    ///
471    /// Legacy: This is retrieved from the event.
472    pub fn get_source_metadata<'a, 'b>(
473        &self,
474        source_name: &'a str,
475        log: &'b LogEvent,
476        legacy_key: impl ValuePath<'a>,
477        metadata_key: impl ValuePath<'a>,
478    ) -> Option<&'b Value> {
479        match self {
480            LogNamespace::Vector => log
481                .metadata()
482                .value()
483                .get(path!(source_name).concat(metadata_key)),
484            LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
485        }
486    }
487
488    /// Vector: The `ingest_timestamp`, and `source_type` fields are added to "event metadata", nested
489    /// under the name "vector". This data will be marked as read-only in VRL.
490    ///
491    /// Legacy: The values of `source_type_key`, and `timestamp_key` are stored as keys on the event root,
492    /// only if a field with that name doesn't already exist.
493    pub fn insert_standard_vector_source_metadata(
494        &self,
495        log: &mut LogEvent,
496        source_name: &'static str,
497        now: DateTime<Utc>,
498    ) {
499        self.insert_vector_metadata(
500            log,
501            log_schema().source_type_key(),
502            path!("source_type"),
503            Bytes::from_static(source_name.as_bytes()),
504        );
505        self.insert_vector_metadata(
506            log,
507            log_schema().timestamp_key(),
508            path!("ingest_timestamp"),
509            now,
510        );
511    }
512
513    /// Vector: This is added to the "event metadata", nested under the name "vector". This data
514    /// will be marked as read-only in VRL.
515    ///
516    /// Legacy: This is stored on the event root, only if a field with that name doesn't already exist.
517    pub fn insert_vector_metadata<'a>(
518        &self,
519        log: &mut LogEvent,
520        legacy_key: Option<impl ValuePath<'a>>,
521        metadata_key: impl ValuePath<'a>,
522        value: impl Into<Value>,
523    ) {
524        match self {
525            LogNamespace::Vector => {
526                log.metadata_mut()
527                    .value_mut()
528                    .insert(path!("vector").concat(metadata_key), value);
529            }
530            LogNamespace::Legacy => {
531                if let Some(legacy_key) = legacy_key {
532                    log.try_insert((PathPrefix::Event, legacy_key), value);
533                }
534            }
535        }
536    }
537
538    /// Vector: This is retrieved from the "event metadata", nested under the name "vector".
539    ///
540    /// Legacy: This is retrieved from the event.
541    pub fn get_vector_metadata<'a, 'b>(
542        &self,
543        log: &'b LogEvent,
544        legacy_key: impl ValuePath<'a>,
545        metadata_key: impl ValuePath<'a>,
546    ) -> Option<&'b Value> {
547        match self {
548            LogNamespace::Vector => log
549                .metadata()
550                .value()
551                .get(path!("vector").concat(metadata_key)),
552            LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
553        }
554    }
555
556    pub fn new_log_from_data(&self, value: impl Into<Value>) -> LogEvent {
557        match self {
558            LogNamespace::Vector | LogNamespace::Legacy => LogEvent::from(value.into()),
559        }
560    }
561
562    // combine a global (self) and local value to get the actual namespace
563    #[must_use]
564    pub fn merge(&self, override_value: Option<impl Into<LogNamespace>>) -> LogNamespace {
565        override_value.map_or(*self, Into::into)
566    }
567}
568
569#[cfg(test)]
570mod test {
571    use chrono::Utc;
572    use lookup::{OwnedTargetPath, event_path, owned_value_path};
573    use vector_common::btreemap;
574    use vrl::value::Kind;
575
576    use super::*;
577    use crate::event::LogEvent;
578
579    #[test]
580    fn test_insert_standard_vector_source_metadata() {
581        let mut schema = LogSchema::default();
582        schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
583            "a", "b", "c", "d"
584        ))));
585        init_log_schema(schema, false);
586
587        let namespace = LogNamespace::Legacy;
588        let mut event = LogEvent::from("log");
589        namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now());
590
591        assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
592    }
593
594    #[test]
595    fn test_source_definitions_legacy() {
596        let definition = schema::Definition::empty_legacy_namespace()
597            .with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
598            .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
599        let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
600
601        let valid_event = LogEvent::from(Value::from(btreemap! {
602            "zork" => "norknoog",
603            "nork" => 32
604        }))
605        .into();
606
607        let invalid_event = LogEvent::from(Value::from(btreemap! {
608            "nork" => 32
609        }))
610        .into();
611
612        // Get a definition with schema enabled.
613        let new_definition = output.schema_definition(true).unwrap();
614
615        // Meanings should still exist.
616        assert_eq!(
617            Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
618            new_definition.meaning_path("zork")
619        );
620
621        // Events should have the schema validated.
622        new_definition.assert_valid_for_event(&valid_event);
623        new_definition.assert_invalid_for_event(&invalid_event);
624
625        // There should be the default legacy definition without schemas enabled.
626        assert_eq!(
627            Some(
628                schema::Definition::default_legacy_namespace()
629                    .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork")
630            ),
631            output.schema_definition(false)
632        );
633    }
634
635    #[test]
636    fn test_source_definitons_vector() {
637        let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
638            .with_metadata_field(
639                &owned_value_path!("vector", "zork"),
640                Kind::integer(),
641                Some("zork"),
642            )
643            .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
644
645        let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
646
647        let mut valid_event = LogEvent::from(Value::from(btreemap! {
648            "nork" => 32
649        }));
650
651        valid_event
652            .metadata_mut()
653            .value_mut()
654            .insert(path!("vector").concat("zork"), 32);
655
656        let valid_event = valid_event.into();
657
658        let mut invalid_event = LogEvent::from(Value::from(btreemap! {
659            "nork" => 32
660        }));
661
662        invalid_event
663            .metadata_mut()
664            .value_mut()
665            .insert(path!("vector").concat("zork"), "noog");
666
667        let invalid_event = invalid_event.into();
668
669        // Get a definition with schema enabled.
670        let new_definition = output.schema_definition(true).unwrap();
671
672        // Meanings should still exist.
673        assert_eq!(
674            Some(&OwnedTargetPath::metadata(owned_value_path!(
675                "vector", "zork"
676            ))),
677            new_definition.meaning_path("zork")
678        );
679
680        // Events should have the schema validated.
681        new_definition.assert_valid_for_event(&valid_event);
682        new_definition.assert_invalid_for_event(&invalid_event);
683
684        // Get a definition without schema enabled.
685        let new_definition = output.schema_definition(false).unwrap();
686
687        // Meanings should still exist.
688        assert_eq!(
689            Some(&OwnedTargetPath::metadata(owned_value_path!(
690                "vector", "zork"
691            ))),
692            new_definition.meaning_path("zork")
693        );
694
695        // Events should not have the schema validated.
696        new_definition.assert_valid_for_event(&valid_event);
697        new_definition.assert_valid_for_event(&invalid_event);
698    }
699
700    #[test]
701    fn test_new_log_source_ignores_definition_with_metric_data_type() {
702        let definition = schema::Definition::any();
703        let output = SourceOutput::new_maybe_logs(DataType::Metric, definition);
704        assert_eq!(output.schema_definition(true), None);
705    }
706
707    #[test]
708    fn test_new_log_source_uses_definition_with_log_data_type() {
709        let definition = schema::Definition::any();
710        let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
711        assert_eq!(output.schema_definition(true), Some(definition));
712    }
713}