vector_core/config/
mod.rs

1use std::sync::Arc;
2use std::{collections::HashMap, fmt, num::NonZeroUsize};
3
4use bitmask_enum::bitmask;
5use bytes::Bytes;
6use chrono::{DateTime, Utc};
7
8mod global_options;
9mod log_schema;
10pub(crate) mod metrics_expiration;
11pub mod output_id;
12pub mod proxy;
13mod telemetry;
14
15use crate::event::LogEvent;
16pub use global_options::{GlobalOptions, WildcardMatching};
17pub use log_schema::{init_log_schema, log_schema, LogSchema};
18use lookup::{lookup_v2::ValuePath, path, PathPrefix};
19pub use output_id::OutputId;
20use serde::{Deserialize, Serialize};
21pub use telemetry::{init_telemetry, telemetry, Tags, Telemetry};
22pub use vector_common::config::ComponentKey;
23use vector_config::configurable_component;
24use vrl::value::Value;
25
26use crate::schema;
27
28pub const MEMORY_BUFFER_DEFAULT_MAX_EVENTS: NonZeroUsize =
29    vector_buffers::config::memory_buffer_default_max_events();
30
31// This enum should be kept alphabetically sorted as the bitmask value is used when
32// sorting sources by data type in the GraphQL API.
33#[bitmask(u8)]
34#[bitmask_config(flags_iter)]
35pub enum DataType {
36    Log,
37    Metric,
38    Trace,
39}
40
41impl fmt::Display for DataType {
42    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43        f.debug_list()
44            .entries(
45                Self::flags().filter_map(|&(name, value)| self.contains(value).then_some(name)),
46            )
47            .finish()
48    }
49}
50
51#[derive(Debug, Clone, PartialEq)]
52pub struct Input {
53    ty: DataType,
54    log_schema_requirement: schema::Requirement,
55}
56
57impl Input {
58    pub fn data_type(&self) -> DataType {
59        self.ty
60    }
61
62    pub fn schema_requirement(&self) -> &schema::Requirement {
63        &self.log_schema_requirement
64    }
65
66    pub fn new(ty: DataType) -> Self {
67        Self {
68            ty,
69            log_schema_requirement: schema::Requirement::empty(),
70        }
71    }
72
73    pub fn log() -> Self {
74        Self {
75            ty: DataType::Log,
76            log_schema_requirement: schema::Requirement::empty(),
77        }
78    }
79
80    pub fn metric() -> Self {
81        Self {
82            ty: DataType::Metric,
83            log_schema_requirement: schema::Requirement::empty(),
84        }
85    }
86
87    pub fn trace() -> Self {
88        Self {
89            ty: DataType::Trace,
90            log_schema_requirement: schema::Requirement::empty(),
91        }
92    }
93
94    pub fn all() -> Self {
95        Self {
96            ty: DataType::all_bits(),
97            log_schema_requirement: schema::Requirement::empty(),
98        }
99    }
100
101    /// Set the schema requirement for this output.
102    #[must_use]
103    pub fn with_schema_requirement(mut self, schema_requirement: schema::Requirement) -> Self {
104        self.log_schema_requirement = schema_requirement;
105        self
106    }
107}
108
109#[derive(Debug, Clone, PartialEq)]
110pub struct SourceOutput {
111    pub port: Option<String>,
112    pub ty: DataType,
113
114    // NOTE: schema definitions are only implemented/supported for log-type events. There is no
115    // inherent blocker to support other types as well, but it'll require additional work to add
116    // the relevant schemas, and store them separately in this type.
117    pub schema_definition: Option<Arc<schema::Definition>>,
118}
119
120impl SourceOutput {
121    /// Create a `SourceOutput` of the given data type that contains a single output `Definition`.
122    /// If the data type does not contain logs, the schema definition will be ignored.
123    /// Designed for use in log sources.
124    #[must_use]
125    pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
126        let schema_definition = ty
127            .contains(DataType::Log)
128            .then(|| Arc::new(schema_definition));
129
130        Self {
131            port: None,
132            ty,
133            schema_definition,
134        }
135    }
136
137    /// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
138    /// Designed for use in metrics sources.
139    ///
140    /// Sets the datatype to be [`DataType::Metric`].
141    #[must_use]
142    pub fn new_metrics() -> Self {
143        Self {
144            port: None,
145            ty: DataType::Metric,
146            schema_definition: None,
147        }
148    }
149
150    /// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
151    /// Designed for use in trace sources.
152    ///
153    /// Sets the datatype to be [`DataType::Trace`].
154    #[must_use]
155    pub fn new_traces() -> Self {
156        Self {
157            port: None,
158            ty: DataType::Trace,
159            schema_definition: None,
160        }
161    }
162
163    /// Return the schema [`schema::Definition`] from this output.
164    ///
165    /// Takes a `schema_enabled` flag to determine if the full definition including the fields
166    /// and associated types should be returned, or if a simple definition should be returned.
167    /// A simple definition is just the default for the namespace. For the Vector namespace the
168    /// meanings are included.
169    /// Schema enabled is set in the users configuration.
170    #[must_use]
171    pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
172        use std::ops::Deref;
173
174        self.schema_definition.as_ref().map(|definition| {
175            if schema_enabled {
176                definition.deref().clone()
177            } else {
178                let mut new_definition =
179                    schema::Definition::default_for_namespace(definition.log_namespaces());
180                new_definition.add_meanings(definition.meanings());
181                new_definition
182            }
183        })
184    }
185}
186
187impl SourceOutput {
188    /// Set the port name for this `SourceOutput`.
189    #[must_use]
190    pub fn with_port(mut self, name: impl Into<String>) -> Self {
191        self.port = Some(name.into());
192        self
193    }
194}
195
196fn fmt_helper(
197    f: &mut fmt::Formatter<'_>,
198    maybe_port: Option<&String>,
199    data_type: DataType,
200) -> fmt::Result {
201    match maybe_port {
202        Some(port) => write!(f, "port: \"{port}\",",),
203        None => write!(f, "port: None,"),
204    }?;
205    write!(f, " types: {data_type}")
206}
207
208impl fmt::Display for SourceOutput {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        fmt_helper(f, self.port.as_ref(), self.ty)
211    }
212}
213
214#[derive(Debug, Clone, PartialEq)]
215pub struct TransformOutput {
216    pub port: Option<String>,
217    pub ty: DataType,
218
219    /// For *transforms* if `Datatype` is [`DataType::Log`], if schema is
220    /// enabled, at least one definition  should be output. If the transform
221    /// has multiple connected sources, it is possible to have multiple output
222    /// definitions - one for each input.
223    pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
224}
225
226impl fmt::Display for TransformOutput {
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        fmt_helper(f, self.port.as_ref(), self.ty)
229    }
230}
231
232impl TransformOutput {
233    /// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
234    /// Designed for use in transforms.
235    #[must_use]
236    pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
237        Self {
238            port: None,
239            ty,
240            log_schema_definitions: schema_definitions,
241        }
242    }
243
244    /// Set the port name for this `Output`.
245    #[must_use]
246    pub fn with_port(mut self, name: impl Into<String>) -> Self {
247        self.port = Some(name.into());
248        self
249    }
250
251    /// Return the schema [`schema::Definition`] from this output.
252    ///
253    /// Takes a `schema_enabled` flag to determine if the full definition including the fields
254    /// and associated types should be returned, or if a simple definition should be returned.
255    /// A simple definition is just the default for the namespace. For the Vector namespace the
256    /// meanings are included.
257    /// Schema enabled is set in the users configuration.
258    #[must_use]
259    pub fn schema_definitions(
260        &self,
261        schema_enabled: bool,
262    ) -> HashMap<OutputId, schema::Definition> {
263        if schema_enabled {
264            self.log_schema_definitions.clone()
265        } else {
266            self.log_schema_definitions
267                .iter()
268                .map(|(output, definition)| {
269                    let mut new_definition =
270                        schema::Definition::default_for_namespace(definition.log_namespaces());
271                    new_definition.add_meanings(definition.meanings());
272                    (output.clone(), new_definition)
273                })
274                .collect()
275        }
276    }
277}
278
279/// Simple utility function that can be used by transforms that make no changes to
280/// the schema definitions of events.
281/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`].
282pub fn clone_input_definitions(
283    input_definitions: &[(OutputId, schema::Definition)],
284) -> HashMap<OutputId, schema::Definition> {
285    input_definitions
286        .iter()
287        .map(|(output, definition)| (output.clone(), definition.clone()))
288        .collect()
289}
290
291/// Source-specific end-to-end acknowledgements configuration.
292///
293/// This type exists solely to provide a source-specific description of the `acknowledgements`
294/// setting, as it is deprecated, and we still need to maintain a way to expose it in the
295/// documentation before it's removed while also making sure people know it shouldn't be used.
296#[configurable_component]
297#[configurable(deprecated)]
298#[configurable(title = "Controls how acknowledgements are handled by this source.")]
299#[configurable(
300    description = "This setting is **deprecated** in favor of enabling `acknowledgements` at the [global][global_acks] or sink level.
301
302Enabling or disabling acknowledgements at the source level has **no effect** on acknowledgement behavior.
303
304See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
305
306[global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
307[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
308)]
309#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
310pub struct SourceAcknowledgementsConfig {
311    /// Whether or not end-to-end acknowledgements are enabled for this source.
312    enabled: Option<bool>,
313}
314
315impl SourceAcknowledgementsConfig {
316    pub const DEFAULT: Self = Self { enabled: None };
317
318    #[must_use]
319    pub fn merge_default(&self, other: &Self) -> Self {
320        let enabled = self.enabled.or(other.enabled);
321        Self { enabled }
322    }
323
324    pub fn enabled(&self) -> bool {
325        self.enabled.unwrap_or(false)
326    }
327}
328
329impl From<Option<bool>> for SourceAcknowledgementsConfig {
330    fn from(enabled: Option<bool>) -> Self {
331        Self { enabled }
332    }
333}
334
335impl From<bool> for SourceAcknowledgementsConfig {
336    fn from(enabled: bool) -> Self {
337        Some(enabled).into()
338    }
339}
340
341impl From<SourceAcknowledgementsConfig> for AcknowledgementsConfig {
342    fn from(config: SourceAcknowledgementsConfig) -> Self {
343        Self {
344            enabled: config.enabled,
345        }
346    }
347}
348
349/// End-to-end acknowledgements configuration.
350#[configurable_component]
351#[configurable(title = "Controls how acknowledgements are handled for this sink.")]
352#[configurable(
353    description = "See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
354
355[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
356)]
357#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
358pub struct AcknowledgementsConfig {
359    /// Whether or not end-to-end acknowledgements are enabled.
360    ///
361    /// When enabled for a sink, any source that supports end-to-end
362    /// acknowledgements that is connected to that sink waits for events
363    /// to be acknowledged by **all connected sinks** before acknowledging them at the source.
364    ///
365    /// Enabling or disabling acknowledgements at the sink level takes precedence over any global
366    /// [`acknowledgements`][global_acks] configuration.
367    ///
368    /// [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
369    enabled: Option<bool>,
370}
371
372impl AcknowledgementsConfig {
373    pub const DEFAULT: Self = Self { enabled: None };
374
375    #[must_use]
376    pub fn merge_default(&self, other: &Self) -> Self {
377        let enabled = self.enabled.or(other.enabled);
378        Self { enabled }
379    }
380
381    pub fn enabled(&self) -> bool {
382        self.enabled.unwrap_or(false)
383    }
384}
385
386impl From<Option<bool>> for AcknowledgementsConfig {
387    fn from(enabled: Option<bool>) -> Self {
388        Self { enabled }
389    }
390}
391
392impl From<bool> for AcknowledgementsConfig {
393    fn from(enabled: bool) -> Self {
394        Some(enabled).into()
395    }
396}
397
398#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq)]
399pub enum LogNamespace {
400    /// Vector native namespacing
401    ///
402    /// Deserialized data is placed in the root of the event.
403    /// Extra data is placed in "event metadata"
404    Vector,
405
406    /// This is the legacy namespacing.
407    ///
408    /// All data is set in the root of the event. Since this can lead
409    /// to collisions, deserialized data has priority over metadata
410    Legacy,
411}
412
413/// The user-facing config for log namespace is a bool (enabling or disabling the "Log Namespacing" feature).
414/// Internally, this is converted to a enum.
415impl From<bool> for LogNamespace {
416    fn from(x: bool) -> Self {
417        if x {
418            LogNamespace::Vector
419        } else {
420            LogNamespace::Legacy
421        }
422    }
423}
424
425impl Default for LogNamespace {
426    fn default() -> Self {
427        Self::Legacy
428    }
429}
430
431/// A shortcut to specify no `LegacyKey` should be used (since otherwise a turbofish would be required)
432pub const NO_LEGACY_KEY: Option<LegacyKey<&'static str>> = None;
433
434pub enum LegacyKey<T> {
435    /// Always insert the data, even if the field previously existed
436    Overwrite(T),
437    /// Only insert the data if the field is currently empty
438    InsertIfEmpty(T),
439}
440
441impl LogNamespace {
442    /// Vector: This is added to "event metadata", nested under the source name.
443    ///
444    /// Legacy: This is stored on the event root, only if a field with that name doesn't already exist.
445    pub fn insert_source_metadata<'a>(
446        &self,
447        source_name: &'a str,
448        log: &mut LogEvent,
449        legacy_key: Option<LegacyKey<impl ValuePath<'a>>>,
450        metadata_key: impl ValuePath<'a>,
451        value: impl Into<Value>,
452    ) {
453        match self {
454            LogNamespace::Vector => {
455                log.metadata_mut()
456                    .value_mut()
457                    .insert(path!(source_name).concat(metadata_key), value);
458            }
459            LogNamespace::Legacy => match legacy_key {
460                None => { /* don't insert */ }
461                Some(LegacyKey::Overwrite(key)) => {
462                    log.insert((PathPrefix::Event, key), value);
463                }
464                Some(LegacyKey::InsertIfEmpty(key)) => {
465                    log.try_insert((PathPrefix::Event, key), value);
466                }
467            },
468        }
469    }
470
471    /// Vector: This is retrieved from the "event metadata", nested under the source name.
472    ///
473    /// Legacy: This is retrieved from the event.
474    pub fn get_source_metadata<'a, 'b>(
475        &self,
476        source_name: &'a str,
477        log: &'b LogEvent,
478        legacy_key: impl ValuePath<'a>,
479        metadata_key: impl ValuePath<'a>,
480    ) -> Option<&'b Value> {
481        match self {
482            LogNamespace::Vector => log
483                .metadata()
484                .value()
485                .get(path!(source_name).concat(metadata_key)),
486            LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
487        }
488    }
489
490    /// Vector: The `ingest_timestamp`, and `source_type` fields are added to "event metadata", nested
491    /// under the name "vector". This data will be marked as read-only in VRL.
492    ///
493    /// Legacy: The values of `source_type_key`, and `timestamp_key` are stored as keys on the event root,
494    /// only if a field with that name doesn't already exist.
495    pub fn insert_standard_vector_source_metadata(
496        &self,
497        log: &mut LogEvent,
498        source_name: &'static str,
499        now: DateTime<Utc>,
500    ) {
501        self.insert_vector_metadata(
502            log,
503            log_schema().source_type_key(),
504            path!("source_type"),
505            Bytes::from_static(source_name.as_bytes()),
506        );
507        self.insert_vector_metadata(
508            log,
509            log_schema().timestamp_key(),
510            path!("ingest_timestamp"),
511            now,
512        );
513    }
514
515    /// Vector: This is added to the "event metadata", nested under the name "vector". This data
516    /// will be marked as read-only in VRL.
517    ///
518    /// Legacy: This is stored on the event root, only if a field with that name doesn't already exist.
519    pub fn insert_vector_metadata<'a>(
520        &self,
521        log: &mut LogEvent,
522        legacy_key: Option<impl ValuePath<'a>>,
523        metadata_key: impl ValuePath<'a>,
524        value: impl Into<Value>,
525    ) {
526        match self {
527            LogNamespace::Vector => {
528                log.metadata_mut()
529                    .value_mut()
530                    .insert(path!("vector").concat(metadata_key), value);
531            }
532            LogNamespace::Legacy => {
533                if let Some(legacy_key) = legacy_key {
534                    log.try_insert((PathPrefix::Event, legacy_key), value);
535                }
536            }
537        }
538    }
539
540    /// Vector: This is retrieved from the "event metadata", nested under the name "vector".
541    ///
542    /// Legacy: This is retrieved from the event.
543    pub fn get_vector_metadata<'a, 'b>(
544        &self,
545        log: &'b LogEvent,
546        legacy_key: impl ValuePath<'a>,
547        metadata_key: impl ValuePath<'a>,
548    ) -> Option<&'b Value> {
549        match self {
550            LogNamespace::Vector => log
551                .metadata()
552                .value()
553                .get(path!("vector").concat(metadata_key)),
554            LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
555        }
556    }
557
558    pub fn new_log_from_data(&self, value: impl Into<Value>) -> LogEvent {
559        match self {
560            LogNamespace::Vector | LogNamespace::Legacy => LogEvent::from(value.into()),
561        }
562    }
563
564    // combine a global (self) and local value to get the actual namespace
565    #[must_use]
566    pub fn merge(&self, override_value: Option<impl Into<LogNamespace>>) -> LogNamespace {
567        override_value.map_or(*self, Into::into)
568    }
569}
570
571#[cfg(test)]
572mod test {
573    use super::*;
574    use crate::event::LogEvent;
575    use chrono::Utc;
576    use lookup::{event_path, owned_value_path, OwnedTargetPath};
577    use vector_common::btreemap;
578    use vrl::value::Kind;
579
580    #[test]
581    fn test_insert_standard_vector_source_metadata() {
582        let mut schema = LogSchema::default();
583        schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
584            "a", "b", "c", "d"
585        ))));
586        init_log_schema(schema, false);
587
588        let namespace = LogNamespace::Legacy;
589        let mut event = LogEvent::from("log");
590        namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now());
591
592        assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
593    }
594
595    #[test]
596    fn test_source_definitions_legacy() {
597        let definition = schema::Definition::empty_legacy_namespace()
598            .with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
599            .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
600        let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
601
602        let valid_event = LogEvent::from(Value::from(btreemap! {
603            "zork" => "norknoog",
604            "nork" => 32
605        }))
606        .into();
607
608        let invalid_event = LogEvent::from(Value::from(btreemap! {
609            "nork" => 32
610        }))
611        .into();
612
613        // Get a definition with schema enabled.
614        let new_definition = output.schema_definition(true).unwrap();
615
616        // Meanings should still exist.
617        assert_eq!(
618            Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
619            new_definition.meaning_path("zork")
620        );
621
622        // Events should have the schema validated.
623        new_definition.assert_valid_for_event(&valid_event);
624        new_definition.assert_invalid_for_event(&invalid_event);
625
626        // There should be the default legacy definition without schemas enabled.
627        assert_eq!(
628            Some(
629                schema::Definition::default_legacy_namespace()
630                    .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork")
631            ),
632            output.schema_definition(false)
633        );
634    }
635
636    #[test]
637    fn test_source_definitons_vector() {
638        let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
639            .with_metadata_field(
640                &owned_value_path!("vector", "zork"),
641                Kind::integer(),
642                Some("zork"),
643            )
644            .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
645
646        let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
647
648        let mut valid_event = LogEvent::from(Value::from(btreemap! {
649            "nork" => 32
650        }));
651
652        valid_event
653            .metadata_mut()
654            .value_mut()
655            .insert(path!("vector").concat("zork"), 32);
656
657        let valid_event = valid_event.into();
658
659        let mut invalid_event = LogEvent::from(Value::from(btreemap! {
660            "nork" => 32
661        }));
662
663        invalid_event
664            .metadata_mut()
665            .value_mut()
666            .insert(path!("vector").concat("zork"), "noog");
667
668        let invalid_event = invalid_event.into();
669
670        // Get a definition with schema enabled.
671        let new_definition = output.schema_definition(true).unwrap();
672
673        // Meanings should still exist.
674        assert_eq!(
675            Some(&OwnedTargetPath::metadata(owned_value_path!(
676                "vector", "zork"
677            ))),
678            new_definition.meaning_path("zork")
679        );
680
681        // Events should have the schema validated.
682        new_definition.assert_valid_for_event(&valid_event);
683        new_definition.assert_invalid_for_event(&invalid_event);
684
685        // Get a definition without schema enabled.
686        let new_definition = output.schema_definition(false).unwrap();
687
688        // Meanings should still exist.
689        assert_eq!(
690            Some(&OwnedTargetPath::metadata(owned_value_path!(
691                "vector", "zork"
692            ))),
693            new_definition.meaning_path("zork")
694        );
695
696        // Events should not have the schema validated.
697        new_definition.assert_valid_for_event(&valid_event);
698        new_definition.assert_valid_for_event(&invalid_event);
699    }
700
701    #[test]
702    fn test_new_log_source_ignores_definition_with_metric_data_type() {
703        let definition = schema::Definition::any();
704        let output = SourceOutput::new_maybe_logs(DataType::Metric, definition);
705        assert_eq!(output.schema_definition(true), None);
706    }
707
708    #[test]
709    fn test_new_log_source_uses_definition_with_log_data_type() {
710        let definition = schema::Definition::any();
711        let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
712        assert_eq!(output.schema_definition(true), Some(definition));
713    }
714}