vector/config/
mod.rs

1#![allow(missing_docs)]
2use std::{
3    collections::{HashMap, HashSet},
4    fmt::{self, Display, Formatter},
5    fs,
6    hash::Hash,
7    net::SocketAddr,
8    path::PathBuf,
9    time::Duration,
10};
11
12use crate::{
13    conditions,
14    event::{Metric, Value},
15    secrets::SecretBackends,
16    serde::OneOrMany,
17};
18
19use indexmap::IndexMap;
20use serde::Serialize;
21
22use vector_config::configurable_component;
23pub use vector_lib::config::{
24    AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
25    SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
26};
27pub use vector_lib::configurable::component::{
28    GenerateConfig, SinkDescription, TransformDescription,
29};
30
31pub mod api;
32mod builder;
33mod cmd;
34mod compiler;
35mod diff;
36pub mod dot_graph;
37mod enrichment_table;
38pub mod format;
39mod graph;
40mod loading;
41pub mod provider;
42pub mod schema;
43mod secret;
44mod sink;
45mod source;
46mod transform;
47pub mod unit_test;
48mod validation;
49mod vars;
50pub mod watcher;
51
52pub use builder::ConfigBuilder;
53pub use cmd::{cmd, Opts};
54pub use diff::ConfigDiff;
55pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
56pub use format::{Format, FormatHint};
57pub use loading::{
58    load, load_builder_from_paths, load_from_paths, load_from_paths_with_provider_and_secrets,
59    load_from_str, load_source_from_paths, merge_path_lists, process_paths, COLLECTOR,
60    CONFIG_PATHS,
61};
62pub use provider::ProviderConfig;
63pub use secret::SecretBackend;
64pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
65pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
66pub use transform::{
67    get_transform_output_ids, BoxedTransform, TransformConfig, TransformContext, TransformOuter,
68};
69pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
70pub use validation::warnings;
71pub use vars::{interpolate, ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX};
72pub use vector_lib::{
73    config::{
74        init_log_schema, init_telemetry, log_schema, proxy::ProxyConfig, telemetry, ComponentKey,
75        LogSchema, OutputId,
76    },
77    id::Inputs,
78};
79
80#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
81// // This is not a comprehensive set; variants are added as needed.
82pub enum ComponentType {
83    Transform,
84    Sink,
85    EnrichmentTable,
86}
87
88#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
89pub struct ComponentConfig {
90    pub config_paths: Vec<PathBuf>,
91    pub component_key: ComponentKey,
92    pub component_type: ComponentType,
93}
94
95impl ComponentConfig {
96    pub fn new(
97        config_paths: Vec<PathBuf>,
98        component_key: ComponentKey,
99        component_type: ComponentType,
100    ) -> Self {
101        let canonicalized_paths = config_paths
102            .into_iter()
103            .filter_map(|p| fs::canonicalize(p).ok())
104            .collect();
105
106        Self {
107            config_paths: canonicalized_paths,
108            component_key,
109            component_type,
110        }
111    }
112
113    pub fn contains(&self, config_paths: &[PathBuf]) -> Option<(ComponentKey, ComponentType)> {
114        if config_paths.iter().any(|p| self.config_paths.contains(p)) {
115            return Some((self.component_key.clone(), self.component_type.clone()));
116        }
117        None
118    }
119}
120
121#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
122pub enum ConfigPath {
123    File(PathBuf, FormatHint),
124    Dir(PathBuf),
125}
126
127impl<'a> From<&'a ConfigPath> for &'a PathBuf {
128    fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
129        match config_path {
130            ConfigPath::File(path, _) => path,
131            ConfigPath::Dir(path) => path,
132        }
133    }
134}
135
136impl ConfigPath {
137    pub const fn as_dir(&self) -> Option<&PathBuf> {
138        match self {
139            Self::Dir(path) => Some(path),
140            _ => None,
141        }
142    }
143}
144
145#[derive(Debug, Default, Serialize)]
146pub struct Config {
147    #[cfg(feature = "api")]
148    pub api: api::Options,
149    pub schema: schema::Options,
150    pub global: GlobalOptions,
151    pub healthchecks: HealthcheckOptions,
152    sources: IndexMap<ComponentKey, SourceOuter>,
153    sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
154    transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
155    pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
156    tests: Vec<TestDefinition>,
157    secret: IndexMap<ComponentKey, SecretBackends>,
158    pub graceful_shutdown_duration: Option<Duration>,
159}
160
161impl Config {
162    pub fn builder() -> builder::ConfigBuilder {
163        Default::default()
164    }
165
166    pub fn is_empty(&self) -> bool {
167        self.sources.is_empty()
168    }
169
170    pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
171        self.sources.iter()
172    }
173
174    pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
175        self.sources.get(id)
176    }
177
178    pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
179        self.transforms.iter()
180    }
181
182    pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
183        self.transforms.get(id)
184    }
185
186    pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
187        self.sinks.iter()
188    }
189
190    pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
191        self.sinks.get(id)
192    }
193
194    pub fn enrichment_tables(
195        &self,
196    ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
197        self.enrichment_tables.iter()
198    }
199
200    pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
201        self.enrichment_tables.get(id)
202    }
203
204    pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
205        self.transforms
206            .get(id)
207            .map(|t| &t.inputs[..])
208            .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
209            .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
210    }
211
212    pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
213        let inputs: Vec<_> = self
214            .sinks
215            .iter()
216            .filter(|(_, sink)| {
217                sink.inner
218                    .acknowledgements()
219                    .merge_default(&self.global.acknowledgements)
220                    .enabled()
221            })
222            .flat_map(|(name, sink)| {
223                sink.inputs
224                    .iter()
225                    .map(|input| (name.clone(), input.clone()))
226            })
227            .collect();
228        self.propagate_acks_rec(inputs);
229        Ok(())
230    }
231
232    fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
233        for (sink, input) in sink_inputs {
234            let component = &input.component;
235            if let Some(source) = self.sources.get_mut(component) {
236                if source.inner.can_acknowledge() {
237                    source.sink_acknowledgements = true;
238                } else {
239                    warn!(
240                        message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
241                        source = component.id(),
242                        sink = sink.id(),
243                    );
244                }
245            } else if let Some(transform) = self.transforms.get(component) {
246                let inputs = transform
247                    .inputs
248                    .iter()
249                    .map(|input| (sink.clone(), input.clone()))
250                    .collect();
251                self.propagate_acks_rec(inputs);
252            }
253        }
254    }
255}
256
257/// Healthcheck options.
258#[configurable_component]
259#[derive(Clone, Copy, Debug)]
260#[serde(default)]
261pub struct HealthcheckOptions {
262    /// Whether or not healthchecks are enabled for all sinks.
263    ///
264    /// Can be overridden on a per-sink basis.
265    pub enabled: bool,
266
267    /// Whether or not to require a sink to report as being healthy during startup.
268    ///
269    /// When enabled and a sink reports not being healthy, Vector will exit during start-up.
270    ///
271    /// Can be alternatively set, and overridden by, the `--require-healthy` command-line flag.
272    pub require_healthy: bool,
273}
274
275impl HealthcheckOptions {
276    pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
277        if let Some(require_healthy) = require_healthy.into() {
278            self.require_healthy = require_healthy;
279        }
280    }
281
282    const fn merge(&mut self, other: Self) {
283        self.enabled &= other.enabled;
284        self.require_healthy |= other.require_healthy;
285    }
286}
287
288impl Default for HealthcheckOptions {
289    fn default() -> Self {
290        Self {
291            enabled: true,
292            require_healthy: false,
293        }
294    }
295}
296
297/// Unique thing, like port, of which only one owner can be.
298#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
299pub enum Resource {
300    Port(SocketAddr, Protocol),
301    SystemFdOffset(usize),
302    Fd(u32),
303    DiskBuffer(String),
304}
305
306#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
307pub enum Protocol {
308    Tcp,
309    Udp,
310}
311
312impl Resource {
313    pub const fn tcp(addr: SocketAddr) -> Self {
314        Self::Port(addr, Protocol::Tcp)
315    }
316
317    pub const fn udp(addr: SocketAddr) -> Self {
318        Self::Port(addr, Protocol::Udp)
319    }
320
321    /// From given components returns all that have a resource conflict with any other component.
322    pub fn conflicts<K: Eq + Hash + Clone>(
323        components: impl IntoIterator<Item = (K, Vec<Resource>)>,
324    ) -> HashMap<Resource, HashSet<K>> {
325        let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
326        let mut unspecified = Vec::new();
327
328        // Find equality based conflicts
329        for (key, resources) in components {
330            for resource in resources {
331                if let Resource::Port(address, protocol) = &resource {
332                    if address.ip().is_unspecified() {
333                        unspecified.push((key.clone(), *address, *protocol));
334                    }
335                }
336
337                resource_map
338                    .entry(resource)
339                    .or_default()
340                    .insert(key.clone());
341            }
342        }
343
344        // Port with unspecified address will bind to all network interfaces
345        // so we have to check for all Port resources if they share the same
346        // port.
347        for (key, address0, protocol0) in unspecified {
348            for (resource, components) in resource_map.iter_mut() {
349                if let Resource::Port(address, protocol) = resource {
350                    // IP addresses can either be v4 or v6.
351                    // Therefore we check if the ip version matches, the port matches and if the protocol (TCP/UDP) matches
352                    // when checking for equality.
353                    if &address0 == address && &protocol0 == protocol {
354                        components.insert(key.clone());
355                    }
356                }
357            }
358        }
359
360        resource_map.retain(|_, components| components.len() > 1);
361
362        resource_map
363    }
364}
365
366impl Display for Protocol {
367    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
368        match self {
369            Protocol::Udp => write!(fmt, "udp"),
370            Protocol::Tcp => write!(fmt, "tcp"),
371        }
372    }
373}
374
375impl Display for Resource {
376    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
377        match self {
378            Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
379            Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
380            Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
381            Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
382        }
383    }
384}
385
386/// A unit test definition.
387#[configurable_component]
388#[derive(Clone, Debug)]
389#[serde(deny_unknown_fields)]
390pub struct TestDefinition<T: 'static = OutputId> {
391    /// The name of the unit test.
392    pub name: String,
393
394    /// An input event to test against.
395    pub input: Option<TestInput>,
396
397    /// A set of input events to test against.
398    #[serde(default)]
399    pub inputs: Vec<TestInput>,
400
401    /// A set of expected output events after the test has run.
402    #[serde(default)]
403    pub outputs: Vec<TestOutput<T>>,
404
405    /// A set of component outputs that should not have emitted any events.
406    #[serde(default)]
407    pub no_outputs_from: Vec<T>,
408}
409
410impl TestDefinition<String> {
411    fn resolve_outputs(
412        self,
413        graph: &graph::Graph,
414    ) -> Result<TestDefinition<OutputId>, Vec<String>> {
415        let TestDefinition {
416            name,
417            input,
418            inputs,
419            outputs,
420            no_outputs_from,
421        } = self;
422        let mut errors = Vec::new();
423
424        let output_map = graph.input_map().expect("ambiguous outputs");
425
426        let outputs = outputs
427            .into_iter()
428            .map(|old| {
429                let TestOutput {
430                    extract_from,
431                    conditions,
432                } = old;
433
434                (extract_from.to_vec(), conditions)
435            })
436            .filter_map(|(extract_from, conditions)| {
437                let mut outputs = Vec::new();
438                for from in extract_from {
439                    if let Some(output_id) = output_map.get(&from) {
440                        outputs.push(output_id.clone());
441                    } else {
442                        errors.push(format!(
443                            r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
444                        ));
445                    }
446                }
447                if outputs.is_empty() {
448                    None
449                } else {
450                    Some(TestOutput {
451                        extract_from: outputs.into(),
452                        conditions,
453                    })
454                }
455            })
456            .collect();
457
458        let no_outputs_from = no_outputs_from
459            .into_iter()
460            .filter_map(|o| {
461                if let Some(output_id) = output_map.get(&o) {
462                    Some(output_id.clone())
463                } else {
464                    errors.push(format!(
465                        r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
466                    ));
467                    None
468                }
469            })
470            .collect();
471
472        if errors.is_empty() {
473            Ok(TestDefinition {
474                name,
475                input,
476                inputs,
477                outputs,
478                no_outputs_from,
479            })
480        } else {
481            Err(errors)
482        }
483    }
484}
485
486impl TestDefinition<OutputId> {
487    fn stringify(self) -> TestDefinition<String> {
488        let TestDefinition {
489            name,
490            input,
491            inputs,
492            outputs,
493            no_outputs_from,
494        } = self;
495
496        let outputs = outputs
497            .into_iter()
498            .map(|old| TestOutput {
499                extract_from: old
500                    .extract_from
501                    .to_vec()
502                    .into_iter()
503                    .map(|item| item.to_string())
504                    .collect::<Vec<_>>()
505                    .into(),
506                conditions: old.conditions,
507            })
508            .collect();
509
510        let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
511
512        TestDefinition {
513            name,
514            input,
515            inputs,
516            outputs,
517            no_outputs_from,
518        }
519    }
520}
521
522/// A unit test input.
523///
524/// An input describes not only the type of event to insert, but also which transform within the
525/// configuration to insert it to.
526#[configurable_component]
527#[derive(Clone, Debug)]
528#[serde(deny_unknown_fields)]
529pub struct TestInput {
530    /// The name of the transform to insert the input event to.
531    pub insert_at: ComponentKey,
532
533    /// The type of the input event.
534    ///
535    /// Can be either `raw`, `vrl`, `log`, or `metric.
536    #[serde(default = "default_test_input_type", rename = "type")]
537    pub type_str: String,
538
539    /// The raw string value to use as the input event.
540    ///
541    /// Use this only when the input event should be a raw event (i.e. unprocessed/undecoded log
542    /// event) and when the input type is set to `raw`.
543    pub value: Option<String>,
544
545    /// The vrl expression to generate the input event.
546    ///
547    /// Only relevant when `type` is `vrl`.
548    pub source: Option<String>,
549
550    /// The set of log fields to use when creating a log input event.
551    ///
552    /// Only relevant when `type` is `log`.
553    pub log_fields: Option<IndexMap<String, Value>>,
554
555    /// The metric to use as an input event.
556    ///
557    /// Only relevant when `type` is `metric`.
558    pub metric: Option<Metric>,
559}
560
561fn default_test_input_type() -> String {
562    "raw".to_string()
563}
564
565/// A unit test output.
566///
567/// An output describes what we expect a transform to emit when fed a certain event, or events, when
568/// running a unit test.
569#[configurable_component]
570#[derive(Clone, Debug)]
571#[serde(deny_unknown_fields)]
572pub struct TestOutput<T: 'static = OutputId> {
573    /// The transform outputs to extract events from.
574    pub extract_from: OneOrMany<T>,
575
576    /// The conditions to run against the output to validate that they were transformed as expected.
577    pub conditions: Option<Vec<conditions::AnyCondition>>,
578}
579
580#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
581mod tests {
582    use std::{collections::HashMap, path::PathBuf};
583
584    use crate::{config, topology};
585    use indoc::indoc;
586
587    use super::{builder::ConfigBuilder, format, load_from_str, ComponentKey, ConfigDiff, Format};
588
589    async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
590        match config::load_from_str(config, format) {
591            Ok(c) => {
592                let diff = ConfigDiff::initial(&c);
593                let c2 = config::load_from_str(config, format).unwrap();
594                match (
595                    config::warnings(&c2),
596                    topology::TopologyPieces::build(&c, &diff, HashMap::new(), Default::default())
597                        .await,
598                ) {
599                    (warnings, Ok(_pieces)) => Ok(warnings),
600                    (_, Err(errors)) => Err(errors),
601                }
602            }
603            Err(error) => Err(error),
604        }
605    }
606
607    #[tokio::test]
608    async fn bad_inputs() {
609        let err = load(
610            r#"
611            [sources.in]
612            type = "test_basic"
613
614            [transforms.sample]
615            type = "test_basic"
616            inputs = []
617            suffix = "foo"
618            increase = 1.25
619
620            [transforms.sample2]
621            type = "test_basic"
622            inputs = ["qwerty"]
623            suffix = "foo"
624            increase = 1.25
625
626            [sinks.out]
627            type = "test_basic"
628            inputs = ["asdf", "in", "in"]
629            "#,
630            Format::Toml,
631        )
632        .await
633        .unwrap_err();
634
635        assert_eq!(
636            vec![
637                "Sink \"out\" has input \"in\" duplicated 2 times",
638                "Transform \"sample\" has no inputs",
639                "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
640                "Input \"asdf\" for sink \"out\" doesn't match any components.",
641            ],
642            err,
643        );
644    }
645
646    #[tokio::test]
647    async fn duplicate_name() {
648        let err = load(
649            r#"
650            [sources.foo]
651            type = "test_basic"
652
653            [sources.bar]
654            type = "test_basic"
655
656            [transforms.foo]
657            type = "test_basic"
658            inputs = ["bar"]
659            suffix = "foo"
660            increase = 1.25
661
662            [sinks.out]
663            type = "test_basic"
664            inputs = ["foo"]
665            "#,
666            Format::Toml,
667        )
668        .await
669        .unwrap_err();
670
671        assert_eq!(
672            err,
673            vec!["More than one component with name \"foo\" (source, transform).",]
674        );
675    }
676
677    #[tokio::test]
678    #[cfg(unix)]
679    async fn conflicting_stdin_and_fd_resources() {
680        let errors = load(
681            r#"
682            [sources.stdin]
683            type = "stdin"
684
685            [sources.file_descriptor]
686            type = "file_descriptor"
687            fd = 0
688
689            [sinks.out]
690            type = "test_basic"
691            inputs = ["stdin", "file_descriptor"]
692            "#,
693            Format::Toml,
694        )
695        .await
696        .unwrap_err();
697
698        assert_eq!(errors.len(), 1);
699        let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
700        assert!(errors[0].starts_with(expected_prefix));
701    }
702
703    #[tokio::test]
704    #[cfg(unix)]
705    async fn conflicting_fd_resources() {
706        let errors = load(
707            r#"
708            [sources.file_descriptor1]
709            type = "file_descriptor"
710            fd = 10
711            [sources.file_descriptor2]
712            type = "file_descriptor"
713            fd = 10
714            [sinks.out]
715            type = "test_basic"
716            inputs = ["file_descriptor1", "file_descriptor2"]
717            "#,
718            Format::Toml,
719        )
720        .await
721        .unwrap_err();
722
723        assert_eq!(errors.len(), 1);
724        let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
725        assert!(errors[0].starts_with(expected_prefix));
726    }
727
728    #[tokio::test]
729    #[cfg(all(unix, feature = "sources-file_descriptor"))]
730    async fn no_conflict_fd_resources() {
731        use crate::sources::file_descriptors::file_descriptor::null_fd;
732        let fd1 = null_fd().unwrap();
733        let fd2 = null_fd().unwrap();
734        let result = load(
735            &format!(
736                r#"
737            [sources.file_descriptor1]
738            type = "file_descriptor"
739            fd = {fd1}
740
741            [sources.file_descriptor2]
742            type = "file_descriptor"
743            fd = {fd2}
744
745            [sinks.out]
746            type = "test_basic"
747            inputs = ["file_descriptor1", "file_descriptor2"]
748            "#
749            ),
750            Format::Toml,
751        )
752        .await;
753
754        let expected = Ok(vec![]);
755        assert_eq!(result, expected);
756    }
757
758    #[tokio::test]
759    async fn warnings() {
760        let warnings = load(
761            r#"
762            [sources.in1]
763            type = "test_basic"
764
765            [sources.in2]
766            type = "test_basic"
767
768            [transforms.sample1]
769            type = "test_basic"
770            inputs = ["in1"]
771            suffix = "foo"
772            increase = 1.25
773
774            [transforms.sample2]
775            type = "test_basic"
776            inputs = ["in1"]
777            suffix = "foo"
778            increase = 1.25
779
780            [sinks.out]
781            type = "test_basic"
782            inputs = ["sample1"]
783            "#,
784            Format::Toml,
785        )
786        .await
787        .unwrap();
788
789        assert_eq!(
790            warnings,
791            vec![
792                "Transform \"sample2\" has no consumers",
793                "Source \"in2\" has no consumers",
794            ]
795        )
796    }
797
798    #[tokio::test]
799    async fn cycle() {
800        let errors = load(
801            r#"
802            [sources.in]
803            type = "test_basic"
804
805            [transforms.one]
806            type = "test_basic"
807            inputs = ["in"]
808            suffix = "foo"
809            increase = 1.25
810
811            [transforms.two]
812            type = "test_basic"
813            inputs = ["one", "four"]
814            suffix = "foo"
815            increase = 1.25
816
817            [transforms.three]
818            type = "test_basic"
819            inputs = ["two"]
820            suffix = "foo"
821            increase = 1.25
822
823            [transforms.four]
824            type = "test_basic"
825            inputs = ["three"]
826            suffix = "foo"
827            increase = 1.25
828
829            [sinks.out]
830            type = "test_basic"
831            inputs = ["four"]
832            "#,
833            Format::Toml,
834        )
835        .await
836        .unwrap_err();
837
838        assert_eq!(
839            errors,
840            vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
841        )
842    }
843
844    #[test]
845    fn default_data_dir() {
846        let config = load_from_str(
847            indoc! {r#"
848                [sources.in]
849                type = "test_basic"
850
851                [sinks.out]
852                type = "test_basic"
853                inputs = ["in"]
854            "#},
855            Format::Toml,
856        )
857        .unwrap();
858
859        assert_eq!(
860            Some(PathBuf::from("/var/lib/vector")),
861            config.global.data_dir
862        )
863    }
864
865    #[test]
866    fn default_schema() {
867        let config = load_from_str(
868            indoc! {r#"
869            [sources.in]
870            type = "test_basic"
871
872            [sinks.out]
873            type = "test_basic"
874            inputs = ["in"]
875            "#},
876            Format::Toml,
877        )
878        .unwrap();
879
880        assert_eq!(
881            "host",
882            config.global.log_schema.host_key().unwrap().to_string()
883        );
884        assert_eq!(
885            "message",
886            config.global.log_schema.message_key().unwrap().to_string()
887        );
888        assert_eq!(
889            "timestamp",
890            config
891                .global
892                .log_schema
893                .timestamp_key()
894                .unwrap()
895                .to_string()
896        );
897    }
898
899    #[test]
900    fn custom_schema() {
901        let config = load_from_str(
902            indoc! {r#"
903                [log_schema]
904                  host_key = "this"
905                  message_key = "that"
906                  timestamp_key = "then"
907
908                [sources.in]
909                  type = "test_basic"
910
911                [sinks.out]
912                  type = "test_basic"
913                  inputs = ["in"]
914            "#},
915            Format::Toml,
916        )
917        .unwrap();
918
919        assert_eq!(
920            "this",
921            config.global.log_schema.host_key().unwrap().to_string()
922        );
923        assert_eq!(
924            "that",
925            config.global.log_schema.message_key().unwrap().to_string()
926        );
927        assert_eq!(
928            "then",
929            config
930                .global
931                .log_schema
932                .timestamp_key()
933                .unwrap()
934                .to_string()
935        );
936    }
937
938    #[test]
939    fn config_append() {
940        let mut config: ConfigBuilder = format::deserialize(
941            indoc! {r#"
942                [sources.in]
943                  type = "test_basic"
944
945                [sinks.out]
946                  type = "test_basic"
947                  inputs = ["in"]
948            "#},
949            Format::Toml,
950        )
951        .unwrap();
952
953        assert_eq!(
954            config.append(
955                format::deserialize(
956                    indoc! {r#"
957                        data_dir = "/foobar"
958
959                        [proxy]
960                          http = "http://proxy.inc:3128"
961
962                        [transforms.foo]
963                          type = "test_basic"
964                          inputs = [ "in" ]
965                          suffix = "foo"
966                          increase = 1.25
967
968                        [[tests]]
969                          name = "check_simple_log"
970                          [tests.input]
971                            insert_at = "foo"
972                            type = "raw"
973                            value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
974                          [[tests.outputs]]
975                            extract_from = "foo"
976                            [[tests.outputs.conditions]]
977                              type = "vrl"
978                              source = ".message == \"Sorry, I'm busy this week Cecil\""
979                    "#},
980                    Format::Toml,
981                )
982                .unwrap()
983            ),
984            Ok(())
985        );
986
987        assert!(config.global.proxy.http.is_some());
988        assert!(config.global.proxy.https.is_none());
989        assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
990        assert!(config.sources.contains_key(&ComponentKey::from("in")));
991        assert!(config.sinks.contains_key(&ComponentKey::from("out")));
992        assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
993        assert_eq!(config.tests.len(), 1);
994    }
995
996    #[test]
997    fn config_append_collisions() {
998        let mut config: ConfigBuilder = format::deserialize(
999            indoc! {r#"
1000                [sources.in]
1001                  type = "test_basic"
1002
1003                [sinks.out]
1004                  type = "test_basic"
1005                  inputs = ["in"]
1006            "#},
1007            Format::Toml,
1008        )
1009        .unwrap();
1010
1011        assert_eq!(
1012            config.append(
1013                format::deserialize(
1014                    indoc! {r#"
1015                        [sources.in]
1016                          type = "test_basic"
1017
1018                        [transforms.foo]
1019                          type = "test_basic"
1020                          inputs = [ "in" ]
1021                          suffix = "foo"
1022                          increase = 1.25
1023
1024                        [sinks.out]
1025                          type = "test_basic"
1026                          inputs = ["in"]
1027                    "#},
1028                    Format::Toml,
1029                )
1030                .unwrap()
1031            ),
1032            Err(vec![
1033                "duplicate source id found: in".into(),
1034                "duplicate sink id found: out".into(),
1035            ])
1036        );
1037    }
1038
1039    #[test]
1040    fn with_proxy() {
1041        let config: ConfigBuilder = format::deserialize(
1042            indoc! {r#"
1043                [proxy]
1044                  http = "http://server:3128"
1045                  https = "http://other:3128"
1046                  no_proxy = ["localhost", "127.0.0.1"]
1047
1048                [sources.in]
1049                  type = "nginx_metrics"
1050                  endpoints = ["http://localhost:8000/basic_status"]
1051                  proxy.http = "http://server:3128"
1052                  proxy.https = "http://other:3128"
1053                  proxy.no_proxy = ["localhost", "127.0.0.1"]
1054
1055                [sinks.out]
1056                  type = "console"
1057                  inputs = ["in"]
1058                  encoding.codec = "json"
1059            "#},
1060            Format::Toml,
1061        )
1062        .unwrap();
1063        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1064        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1065        assert!(config.global.proxy.no_proxy.matches("localhost"));
1066        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1067        assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1068        assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1069        assert!(source.proxy.no_proxy.matches("localhost"));
1070    }
1071
1072    #[test]
1073    fn with_partial_global_proxy() {
1074        let config: ConfigBuilder = format::deserialize(
1075            indoc! {r#"
1076                [proxy]
1077                  http = "http://server:3128"
1078
1079                [sources.in]
1080                  type = "nginx_metrics"
1081                  endpoints = ["http://localhost:8000/basic_status"]
1082
1083                [sources.in.proxy]
1084                  http = "http://server:3129"
1085                  https = "http://other:3129"
1086                  no_proxy = ["localhost", "127.0.0.1"]
1087
1088                [sinks.out]
1089                  type = "console"
1090                  inputs = ["in"]
1091                  encoding.codec = "json"
1092            "#},
1093            Format::Toml,
1094        )
1095        .unwrap();
1096        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1097        assert_eq!(config.global.proxy.https, None);
1098        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1099        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1100        assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1101        assert!(source.proxy.no_proxy.matches("localhost"));
1102    }
1103
1104    #[test]
1105    fn with_partial_source_proxy() {
1106        let config: ConfigBuilder = format::deserialize(
1107            indoc! {r#"
1108                [proxy]
1109                  http = "http://server:3128"
1110                  https = "http://other:3128"
1111
1112                [sources.in]
1113                  type = "nginx_metrics"
1114                  endpoints = ["http://localhost:8000/basic_status"]
1115
1116                [sources.in.proxy]
1117                  http = "http://server:3129"
1118                  no_proxy = ["localhost", "127.0.0.1"]
1119
1120                [sinks.out]
1121                  type = "console"
1122                  inputs = ["in"]
1123                  encoding.codec = "json"
1124            "#},
1125            Format::Toml,
1126        )
1127        .unwrap();
1128        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1129        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1130        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1131        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1132        assert_eq!(source.proxy.https, None);
1133        assert!(source.proxy.no_proxy.matches("localhost"));
1134    }
1135}
1136
1137#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1138mod acknowledgements_tests {
1139    use indoc::indoc;
1140
1141    use super::*;
1142
1143    #[test]
1144    fn propagates_settings() {
1145        // The topology:
1146        // in1 => out1
1147        // in2 => out2 (acks enabled)
1148        // in3 => parse3 => out3 (acks enabled)
1149        let config: ConfigBuilder = format::deserialize(
1150            indoc! {r#"
1151                data_dir = "/tmp"
1152                [sources.in1]
1153                    type = "file"
1154                    include = ["/var/log/**/*.log"]
1155                [sources.in2]
1156                    type = "file"
1157                    include = ["/var/log/**/*.log"]
1158                [sources.in3]
1159                    type = "file"
1160                    include = ["/var/log/**/*.log"]
1161                [transforms.parse3]
1162                    type = "test_basic"
1163                    inputs = ["in3"]
1164                    increase = 0.0
1165                    suffix = ""
1166                [sinks.out1]
1167                    type = "file"
1168                    inputs = ["in1"]
1169                    encoding.codec = "text"
1170                    path = "/path/to/out1"
1171                [sinks.out2]
1172                    type = "file"
1173                    inputs = ["in2"]
1174                    encoding.codec = "text"
1175                    path = "/path/to/out2"
1176                    acknowledgements = true
1177                [sinks.out3]
1178                    type = "file"
1179                    inputs = ["parse3"]
1180                    encoding.codec = "text"
1181                    path = "/path/to/out3"
1182                    acknowledgements.enabled = true
1183            "#},
1184            Format::Toml,
1185        )
1186        .unwrap();
1187
1188        for source in config.sources.values() {
1189            assert!(
1190                !source.sink_acknowledgements,
1191                "Source `sink_acknowledgements` should be `false` before propagation"
1192            );
1193        }
1194
1195        let config = config.build().unwrap();
1196
1197        let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1198        assert!(!get("in1").sink_acknowledgements);
1199        assert!(get("in2").sink_acknowledgements);
1200        assert!(get("in3").sink_acknowledgements);
1201    }
1202}
1203
1204#[cfg(test)]
1205mod resource_tests {
1206    use std::{
1207        collections::{HashMap, HashSet},
1208        net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1209    };
1210
1211    use proptest::prelude::*;
1212
1213    use super::Resource;
1214
1215    fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1216        Resource::tcp(SocketAddr::new(addr.into(), port))
1217    }
1218
1219    fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1220        Resource::udp(SocketAddr::new(addr.into(), port))
1221    }
1222
1223    fn unspecified() -> impl Strategy<Value = IpAddr> {
1224        prop_oneof![
1225            Just(Ipv4Addr::UNSPECIFIED.into()),
1226            Just(Ipv6Addr::UNSPECIFIED.into()),
1227        ]
1228    }
1229
1230    fn specaddr() -> impl Strategy<Value = IpAddr> {
1231        any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1232    }
1233
1234    fn specport() -> impl Strategy<Value = u16> {
1235        any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1236    }
1237
1238    fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1239        conflicts
1240            .into_iter()
1241            .map(|(key, values)| (key, values.into_iter().collect()))
1242            .collect()
1243    }
1244
1245    proptest! {
1246        #[test]
1247        fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1248            prop_assume!(port1 != port2);
1249            let components = vec![
1250                ("sink_0", vec![tcp(addr, 0)]),
1251                ("sink_1", vec![tcp(addr, port1)]),
1252                ("sink_2", vec![tcp(addr, port2)]),
1253            ];
1254            let conflicting = Resource::conflicts(components);
1255            assert_eq!(conflicting, HashMap::new());
1256        }
1257
1258        #[test]
1259        fn conflicting_pair(addr: IpAddr, port in specport()) {
1260            let components = vec![
1261                ("sink_0", vec![tcp(addr, 0)]),
1262                ("sink_1", vec![tcp(addr, port)]),
1263                ("sink_2", vec![tcp(addr, port)]),
1264            ];
1265            let conflicting = Resource::conflicts(components);
1266            assert_eq!(
1267                conflicting,
1268                hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1269            );
1270        }
1271
1272        #[test]
1273        fn conflicting_multi(addr: IpAddr, port in specport()) {
1274            let components = vec![
1275                ("sink_0", vec![tcp(addr, 0)]),
1276                ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1277                ("sink_2", vec![tcp(addr, port)]),
1278            ];
1279            let conflicting = Resource::conflicts(components);
1280            assert_eq!(
1281                conflicting,
1282                hashmap(vec![
1283                    (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1284                    (tcp(addr, port), vec!["sink_1", "sink_2"])
1285                ])
1286            );
1287        }
1288
1289        #[test]
1290        fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1291            prop_assume!(addr1 != addr2);
1292            let components = vec![
1293                ("sink_0", vec![tcp(addr1, port)]),
1294                ("sink_1", vec![tcp(addr2, port)]),
1295            ];
1296            let conflicting = Resource::conflicts(components);
1297            assert_eq!(conflicting, HashMap::new());
1298        }
1299
1300        #[test]
1301        fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1302            let components = vec![
1303                ("sink_0", vec![tcp(addr, port)]),
1304                ("sink_1", vec![tcp(unspec, port)]),
1305            ];
1306            let conflicting = Resource::conflicts(components);
1307            assert_eq!(conflicting, HashMap::new());
1308        }
1309
1310        #[test]
1311        fn different_protocol(addr: IpAddr) {
1312            let components = vec![
1313                ("sink_0", vec![tcp(addr, 0)]),
1314                ("sink_1", vec![udp(addr, 0)]),
1315            ];
1316            let conflicting = Resource::conflicts(components);
1317            assert_eq!(conflicting, HashMap::new());
1318        }
1319    }
1320
1321    #[test]
1322    fn different_unspecified_ip_version() {
1323        let components = vec![
1324            ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1325            ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1326        ];
1327        let conflicting = Resource::conflicts(components);
1328        assert_eq!(conflicting, HashMap::new());
1329    }
1330}
1331
1332#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1333mod resource_config_tests {
1334    use indoc::indoc;
1335    use vector_lib::configurable::schema::generate_root_schema;
1336
1337    use super::{load_from_str, Format};
1338
1339    #[test]
1340    fn config_conflict_detected() {
1341        assert!(load_from_str(
1342            indoc! {r#"
1343                [sources.in0]
1344                  type = "stdin"
1345
1346                [sources.in1]
1347                  type = "stdin"
1348
1349                [sinks.out]
1350                  type = "console"
1351                  inputs = ["in0","in1"]
1352                  encoding.codec = "json"
1353            "#},
1354            Format::Toml,
1355        )
1356        .is_err());
1357    }
1358
1359    #[test]
1360    #[ignore]
1361    #[allow(clippy::print_stdout)]
1362    #[allow(clippy::print_stderr)]
1363    fn generate_component_config_schema() {
1364        use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1365        use indexmap::IndexMap;
1366        use vector_lib::config::ComponentKey;
1367        use vector_lib::configurable::configurable_component;
1368
1369        /// Top-level Vector configuration.
1370        #[configurable_component]
1371        #[derive(Clone)]
1372        struct ComponentsOnlyConfig {
1373            /// Configured sources.
1374            #[serde(default)]
1375            pub sources: IndexMap<ComponentKey, SourceOuter>,
1376
1377            /// Configured transforms.
1378            #[serde(default)]
1379            pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1380
1381            /// Configured sinks.
1382            #[serde(default)]
1383            pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1384        }
1385
1386        match generate_root_schema::<ComponentsOnlyConfig>() {
1387            Ok(schema) => {
1388                let json = serde_json::to_string_pretty(&schema)
1389                    .expect("rendering root schema to JSON should not fail");
1390
1391                println!("{json}");
1392            }
1393            Err(e) => eprintln!("error while generating schema: {e:?}"),
1394        }
1395    }
1396}