vector/config/
mod.rs

1#![allow(missing_docs)]
2use std::{
3    collections::{HashMap, HashSet},
4    fmt::{self, Display, Formatter},
5    fs,
6    hash::Hash,
7    net::SocketAddr,
8    path::PathBuf,
9    time::Duration,
10};
11
12use indexmap::IndexMap;
13use serde::Serialize;
14use vector_config::configurable_component;
15pub use vector_lib::{
16    config::{
17        AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
18        SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
19    },
20    configurable::component::{GenerateConfig, SinkDescription, TransformDescription},
21};
22
23use crate::{
24    conditions,
25    event::{Metric, Value},
26    secrets::SecretBackends,
27    serde::OneOrMany,
28};
29
30pub mod api;
31mod builder;
32mod cmd;
33mod compiler;
34mod diff;
35pub mod dot_graph;
36mod enrichment_table;
37pub mod format;
38mod graph;
39mod loading;
40pub mod provider;
41pub mod schema;
42mod secret;
43mod sink;
44mod source;
45mod transform;
46pub mod unit_test;
47mod validation;
48mod vars;
49pub mod watcher;
50
51pub use builder::ConfigBuilder;
52pub use cmd::{Opts, cmd};
53pub use diff::ConfigDiff;
54pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
55pub use format::{Format, FormatHint};
56pub use loading::{
57    COLLECTOR, CONFIG_PATHS, load, load_builder_from_paths, load_from_paths,
58    load_from_paths_with_provider_and_secrets, load_from_str, load_source_from_paths,
59    merge_path_lists, process_paths,
60};
61pub use provider::ProviderConfig;
62pub use secret::SecretBackend;
63pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
64pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
65pub use transform::{
66    BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids,
67};
68pub use unit_test::{UnitTestResult, build_unit_tests, build_unit_tests_main};
69pub use validation::warnings;
70pub use vars::{ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX, interpolate};
71pub use vector_lib::{
72    config::{
73        ComponentKey, LogSchema, OutputId, init_log_schema, init_telemetry, log_schema,
74        proxy::ProxyConfig, telemetry,
75    },
76    id::Inputs,
77};
78
79#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80// // This is not a comprehensive set; variants are added as needed.
81pub enum ComponentType {
82    Transform,
83    Sink,
84    EnrichmentTable,
85}
86
87#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
88pub struct ComponentConfig {
89    pub config_paths: Vec<PathBuf>,
90    pub component_key: ComponentKey,
91    pub component_type: ComponentType,
92}
93
94impl ComponentConfig {
95    pub fn new(
96        config_paths: Vec<PathBuf>,
97        component_key: ComponentKey,
98        component_type: ComponentType,
99    ) -> Self {
100        let canonicalized_paths = config_paths
101            .into_iter()
102            .filter_map(|p| fs::canonicalize(p).ok())
103            .collect();
104
105        Self {
106            config_paths: canonicalized_paths,
107            component_key,
108            component_type,
109        }
110    }
111
112    pub fn contains(&self, config_paths: &[PathBuf]) -> Option<(ComponentKey, ComponentType)> {
113        if config_paths.iter().any(|p| self.config_paths.contains(p)) {
114            return Some((self.component_key.clone(), self.component_type.clone()));
115        }
116        None
117    }
118}
119
120#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
121pub enum ConfigPath {
122    File(PathBuf, FormatHint),
123    Dir(PathBuf),
124}
125
126impl<'a> From<&'a ConfigPath> for &'a PathBuf {
127    fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
128        match config_path {
129            ConfigPath::File(path, _) => path,
130            ConfigPath::Dir(path) => path,
131        }
132    }
133}
134
135impl ConfigPath {
136    pub const fn as_dir(&self) -> Option<&PathBuf> {
137        match self {
138            Self::Dir(path) => Some(path),
139            _ => None,
140        }
141    }
142}
143
144#[derive(Debug, Default, Serialize)]
145pub struct Config {
146    #[cfg(feature = "api")]
147    pub api: api::Options,
148    pub schema: schema::Options,
149    pub global: GlobalOptions,
150    pub healthchecks: HealthcheckOptions,
151    sources: IndexMap<ComponentKey, SourceOuter>,
152    sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
153    transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
154    pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
155    tests: Vec<TestDefinition>,
156    secret: IndexMap<ComponentKey, SecretBackends>,
157    pub graceful_shutdown_duration: Option<Duration>,
158}
159
160impl Config {
161    pub fn builder() -> builder::ConfigBuilder {
162        Default::default()
163    }
164
165    pub fn is_empty(&self) -> bool {
166        self.sources.is_empty()
167    }
168
169    pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
170        self.sources.iter()
171    }
172
173    pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
174        self.sources.get(id)
175    }
176
177    pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
178        self.transforms.iter()
179    }
180
181    pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
182        self.transforms.get(id)
183    }
184
185    pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
186        self.sinks.iter()
187    }
188
189    pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
190        self.sinks.get(id)
191    }
192
193    pub fn enrichment_tables(
194        &self,
195    ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
196        self.enrichment_tables.iter()
197    }
198
199    pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
200        self.enrichment_tables.get(id)
201    }
202
203    pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
204        self.transforms
205            .get(id)
206            .map(|t| &t.inputs[..])
207            .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
208            .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
209    }
210
211    pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
212        let inputs: Vec<_> = self
213            .sinks
214            .iter()
215            .filter(|(_, sink)| {
216                sink.inner
217                    .acknowledgements()
218                    .merge_default(&self.global.acknowledgements)
219                    .enabled()
220            })
221            .flat_map(|(name, sink)| {
222                sink.inputs
223                    .iter()
224                    .map(|input| (name.clone(), input.clone()))
225            })
226            .collect();
227        self.propagate_acks_rec(inputs);
228        Ok(())
229    }
230
231    fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
232        for (sink, input) in sink_inputs {
233            let component = &input.component;
234            if let Some(source) = self.sources.get_mut(component) {
235                if source.inner.can_acknowledge() {
236                    source.sink_acknowledgements = true;
237                } else {
238                    warn!(
239                        message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
240                        source = component.id(),
241                        sink = sink.id(),
242                    );
243                }
244            } else if let Some(transform) = self.transforms.get(component) {
245                let inputs = transform
246                    .inputs
247                    .iter()
248                    .map(|input| (sink.clone(), input.clone()))
249                    .collect();
250                self.propagate_acks_rec(inputs);
251            }
252        }
253    }
254}
255
256/// Healthcheck options.
257#[configurable_component]
258#[derive(Clone, Copy, Debug)]
259#[serde(default)]
260pub struct HealthcheckOptions {
261    /// Whether or not healthchecks are enabled for all sinks.
262    ///
263    /// Can be overridden on a per-sink basis.
264    pub enabled: bool,
265
266    /// Whether or not to require a sink to report as being healthy during startup.
267    ///
268    /// When enabled and a sink reports not being healthy, Vector will exit during start-up.
269    ///
270    /// Can be alternatively set, and overridden by, the `--require-healthy` command-line flag.
271    pub require_healthy: bool,
272}
273
274impl HealthcheckOptions {
275    pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
276        if let Some(require_healthy) = require_healthy.into() {
277            self.require_healthy = require_healthy;
278        }
279    }
280
281    const fn merge(&mut self, other: Self) {
282        self.enabled &= other.enabled;
283        self.require_healthy |= other.require_healthy;
284    }
285}
286
287impl Default for HealthcheckOptions {
288    fn default() -> Self {
289        Self {
290            enabled: true,
291            require_healthy: false,
292        }
293    }
294}
295
296/// Unique thing, like port, of which only one owner can be.
297#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
298pub enum Resource {
299    Port(SocketAddr, Protocol),
300    SystemFdOffset(usize),
301    Fd(u32),
302    DiskBuffer(String),
303}
304
305#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
306pub enum Protocol {
307    Tcp,
308    Udp,
309}
310
311impl Resource {
312    pub const fn tcp(addr: SocketAddr) -> Self {
313        Self::Port(addr, Protocol::Tcp)
314    }
315
316    pub const fn udp(addr: SocketAddr) -> Self {
317        Self::Port(addr, Protocol::Udp)
318    }
319
320    /// From given components returns all that have a resource conflict with any other component.
321    pub fn conflicts<K: Eq + Hash + Clone>(
322        components: impl IntoIterator<Item = (K, Vec<Resource>)>,
323    ) -> HashMap<Resource, HashSet<K>> {
324        let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
325        let mut unspecified = Vec::new();
326
327        // Find equality based conflicts
328        for (key, resources) in components {
329            for resource in resources {
330                if let Resource::Port(address, protocol) = &resource
331                    && address.ip().is_unspecified()
332                {
333                    unspecified.push((key.clone(), *address, *protocol));
334                }
335
336                resource_map
337                    .entry(resource)
338                    .or_default()
339                    .insert(key.clone());
340            }
341        }
342
343        // Port with unspecified address will bind to all network interfaces
344        // so we have to check for all Port resources if they share the same
345        // port.
346        for (key, address0, protocol0) in unspecified {
347            for (resource, components) in resource_map.iter_mut() {
348                if let Resource::Port(address, protocol) = resource {
349                    // IP addresses can either be v4 or v6.
350                    // Therefore we check if the ip version matches, the port matches and if the protocol (TCP/UDP) matches
351                    // when checking for equality.
352                    if &address0 == address && &protocol0 == protocol {
353                        components.insert(key.clone());
354                    }
355                }
356            }
357        }
358
359        resource_map.retain(|_, components| components.len() > 1);
360
361        resource_map
362    }
363}
364
365impl Display for Protocol {
366    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
367        match self {
368            Protocol::Udp => write!(fmt, "udp"),
369            Protocol::Tcp => write!(fmt, "tcp"),
370        }
371    }
372}
373
374impl Display for Resource {
375    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
376        match self {
377            Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
378            Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
379            Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
380            Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
381        }
382    }
383}
384
385/// A unit test definition.
386#[configurable_component]
387#[derive(Clone, Debug)]
388#[serde(deny_unknown_fields)]
389pub struct TestDefinition<T: 'static = OutputId> {
390    /// The name of the unit test.
391    pub name: String,
392
393    /// An input event to test against.
394    pub input: Option<TestInput>,
395
396    /// A set of input events to test against.
397    #[serde(default)]
398    pub inputs: Vec<TestInput>,
399
400    /// A set of expected output events after the test has run.
401    #[serde(default)]
402    pub outputs: Vec<TestOutput<T>>,
403
404    /// A set of component outputs that should not have emitted any events.
405    #[serde(default)]
406    pub no_outputs_from: Vec<T>,
407}
408
409impl TestDefinition<String> {
410    fn resolve_outputs(
411        self,
412        graph: &graph::Graph,
413    ) -> Result<TestDefinition<OutputId>, Vec<String>> {
414        let TestDefinition {
415            name,
416            input,
417            inputs,
418            outputs,
419            no_outputs_from,
420        } = self;
421        let mut errors = Vec::new();
422
423        let output_map = graph.input_map().expect("ambiguous outputs");
424
425        let outputs = outputs
426            .into_iter()
427            .map(|old| {
428                let TestOutput {
429                    extract_from,
430                    conditions,
431                } = old;
432
433                (extract_from.to_vec(), conditions)
434            })
435            .filter_map(|(extract_from, conditions)| {
436                let mut outputs = Vec::new();
437                for from in extract_from {
438                    if no_outputs_from.contains(&from) {
439                        errors.push(format!(
440                            r#"Invalid extract_from target in test '{name}': '{from}' listed in no_outputs_from"#
441                        ));
442                    } else if let Some(output_id) = output_map.get(&from) {
443                        outputs.push(output_id.clone());
444                    } else {
445                        errors.push(format!(
446                            r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
447                        ));
448                    }
449                }
450                if outputs.is_empty() {
451                    None
452                } else {
453                    Some(TestOutput {
454                        extract_from: outputs.into(),
455                        conditions,
456                    })
457                }
458            })
459            .collect();
460
461        let no_outputs_from = no_outputs_from
462            .into_iter()
463            .filter_map(|o| {
464                if let Some(output_id) = output_map.get(&o) {
465                    Some(output_id.clone())
466                } else {
467                    errors.push(format!(
468                        r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
469                    ));
470                    None
471                }
472            })
473            .collect();
474
475        if errors.is_empty() {
476            Ok(TestDefinition {
477                name,
478                input,
479                inputs,
480                outputs,
481                no_outputs_from,
482            })
483        } else {
484            Err(errors)
485        }
486    }
487}
488
489impl TestDefinition<OutputId> {
490    fn stringify(self) -> TestDefinition<String> {
491        let TestDefinition {
492            name,
493            input,
494            inputs,
495            outputs,
496            no_outputs_from,
497        } = self;
498
499        let outputs = outputs
500            .into_iter()
501            .map(|old| TestOutput {
502                extract_from: old
503                    .extract_from
504                    .to_vec()
505                    .into_iter()
506                    .map(|item| item.to_string())
507                    .collect::<Vec<_>>()
508                    .into(),
509                conditions: old.conditions,
510            })
511            .collect();
512
513        let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
514
515        TestDefinition {
516            name,
517            input,
518            inputs,
519            outputs,
520            no_outputs_from,
521        }
522    }
523}
524
525/// A unit test input.
526///
527/// An input describes not only the type of event to insert, but also which transform within the
528/// configuration to insert it to.
529#[configurable_component]
530#[derive(Clone, Debug)]
531#[serde(deny_unknown_fields)]
532pub struct TestInput {
533    /// The name of the transform to insert the input event to.
534    pub insert_at: ComponentKey,
535
536    /// The type of the input event.
537    ///
538    /// Can be either `raw`, `vrl`, `log`, or `metric.
539    #[serde(default = "default_test_input_type", rename = "type")]
540    pub type_str: String,
541
542    /// The raw string value to use as the input event.
543    ///
544    /// Use this only when the input event should be a raw event (i.e. unprocessed/undecoded log
545    /// event) and when the input type is set to `raw`.
546    pub value: Option<String>,
547
548    /// The vrl expression to generate the input event.
549    ///
550    /// Only relevant when `type` is `vrl`.
551    pub source: Option<String>,
552
553    /// The set of log fields to use when creating a log input event.
554    ///
555    /// Only relevant when `type` is `log`.
556    pub log_fields: Option<IndexMap<String, Value>>,
557
558    /// The metric to use as an input event.
559    ///
560    /// Only relevant when `type` is `metric`.
561    pub metric: Option<Metric>,
562}
563
564fn default_test_input_type() -> String {
565    "raw".to_string()
566}
567
568/// A unit test output.
569///
570/// An output describes what we expect a transform to emit when fed a certain event, or events, when
571/// running a unit test.
572#[configurable_component]
573#[derive(Clone, Debug)]
574#[serde(deny_unknown_fields)]
575pub struct TestOutput<T: 'static = OutputId> {
576    /// The transform outputs to extract events from.
577    pub extract_from: OneOrMany<T>,
578
579    /// The conditions to run against the output to validate that they were transformed as expected.
580    pub conditions: Option<Vec<conditions::AnyCondition>>,
581}
582
583#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
584mod tests {
585    use std::{collections::HashMap, path::PathBuf};
586
587    use indoc::indoc;
588
589    use super::{ComponentKey, ConfigDiff, Format, builder::ConfigBuilder, format, load_from_str};
590    use crate::{config, topology};
591
592    async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
593        match config::load_from_str(config, format) {
594            Ok(c) => {
595                let diff = ConfigDiff::initial(&c);
596                let c2 = config::load_from_str(config, format).unwrap();
597                match (
598                    config::warnings(&c2),
599                    topology::TopologyPieces::build(&c, &diff, HashMap::new(), Default::default())
600                        .await,
601                ) {
602                    (warnings, Ok(_pieces)) => Ok(warnings),
603                    (_, Err(errors)) => Err(errors),
604                }
605            }
606            Err(error) => Err(error),
607        }
608    }
609
610    #[tokio::test]
611    async fn bad_inputs() {
612        let err = load(
613            r#"
614            [sources.in]
615            type = "test_basic"
616
617            [transforms.sample]
618            type = "test_basic"
619            inputs = []
620            suffix = "foo"
621            increase = 1.25
622
623            [transforms.sample2]
624            type = "test_basic"
625            inputs = ["qwerty"]
626            suffix = "foo"
627            increase = 1.25
628
629            [sinks.out]
630            type = "test_basic"
631            inputs = ["asdf", "in", "in"]
632            "#,
633            Format::Toml,
634        )
635        .await
636        .unwrap_err();
637
638        assert_eq!(
639            vec![
640                "Sink \"out\" has input \"in\" duplicated 2 times",
641                "Transform \"sample\" has no inputs",
642                "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
643                "Input \"asdf\" for sink \"out\" doesn't match any components.",
644            ],
645            err,
646        );
647    }
648
649    #[tokio::test]
650    async fn duplicate_name() {
651        let err = load(
652            r#"
653            [sources.foo]
654            type = "test_basic"
655
656            [sources.bar]
657            type = "test_basic"
658
659            [transforms.foo]
660            type = "test_basic"
661            inputs = ["bar"]
662            suffix = "foo"
663            increase = 1.25
664
665            [sinks.out]
666            type = "test_basic"
667            inputs = ["foo"]
668            "#,
669            Format::Toml,
670        )
671        .await
672        .unwrap_err();
673
674        assert_eq!(
675            err,
676            vec!["More than one component with name \"foo\" (source, transform).",]
677        );
678    }
679
680    #[tokio::test]
681    #[cfg(unix)]
682    async fn conflicting_stdin_and_fd_resources() {
683        let errors = load(
684            r#"
685            [sources.stdin]
686            type = "stdin"
687
688            [sources.file_descriptor]
689            type = "file_descriptor"
690            fd = 0
691
692            [sinks.out]
693            type = "test_basic"
694            inputs = ["stdin", "file_descriptor"]
695            "#,
696            Format::Toml,
697        )
698        .await
699        .unwrap_err();
700
701        assert_eq!(errors.len(), 1);
702        let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
703        assert!(errors[0].starts_with(expected_prefix));
704    }
705
706    #[tokio::test]
707    #[cfg(unix)]
708    async fn conflicting_fd_resources() {
709        let errors = load(
710            r#"
711            [sources.file_descriptor1]
712            type = "file_descriptor"
713            fd = 10
714            [sources.file_descriptor2]
715            type = "file_descriptor"
716            fd = 10
717            [sinks.out]
718            type = "test_basic"
719            inputs = ["file_descriptor1", "file_descriptor2"]
720            "#,
721            Format::Toml,
722        )
723        .await
724        .unwrap_err();
725
726        assert_eq!(errors.len(), 1);
727        let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
728        assert!(errors[0].starts_with(expected_prefix));
729    }
730
731    #[tokio::test]
732    #[cfg(all(unix, feature = "sources-file_descriptor"))]
733    async fn no_conflict_fd_resources() {
734        use crate::sources::file_descriptors::file_descriptor::null_fd;
735        let fd1 = null_fd().unwrap();
736        let fd2 = null_fd().unwrap();
737        let result = load(
738            &format!(
739                r#"
740            [sources.file_descriptor1]
741            type = "file_descriptor"
742            fd = {fd1}
743
744            [sources.file_descriptor2]
745            type = "file_descriptor"
746            fd = {fd2}
747
748            [sinks.out]
749            type = "test_basic"
750            inputs = ["file_descriptor1", "file_descriptor2"]
751            "#
752            ),
753            Format::Toml,
754        )
755        .await;
756
757        let expected = Ok(vec![]);
758        assert_eq!(result, expected);
759    }
760
761    #[tokio::test]
762    async fn warnings() {
763        let warnings = load(
764            r#"
765            [sources.in1]
766            type = "test_basic"
767
768            [sources.in2]
769            type = "test_basic"
770
771            [transforms.sample1]
772            type = "test_basic"
773            inputs = ["in1"]
774            suffix = "foo"
775            increase = 1.25
776
777            [transforms.sample2]
778            type = "test_basic"
779            inputs = ["in1"]
780            suffix = "foo"
781            increase = 1.25
782
783            [sinks.out]
784            type = "test_basic"
785            inputs = ["sample1"]
786            "#,
787            Format::Toml,
788        )
789        .await
790        .unwrap();
791
792        assert_eq!(
793            warnings,
794            vec![
795                "Transform \"sample2\" has no consumers",
796                "Source \"in2\" has no consumers",
797            ]
798        )
799    }
800
801    #[tokio::test]
802    async fn cycle() {
803        let errors = load(
804            r#"
805            [sources.in]
806            type = "test_basic"
807
808            [transforms.one]
809            type = "test_basic"
810            inputs = ["in"]
811            suffix = "foo"
812            increase = 1.25
813
814            [transforms.two]
815            type = "test_basic"
816            inputs = ["one", "four"]
817            suffix = "foo"
818            increase = 1.25
819
820            [transforms.three]
821            type = "test_basic"
822            inputs = ["two"]
823            suffix = "foo"
824            increase = 1.25
825
826            [transforms.four]
827            type = "test_basic"
828            inputs = ["three"]
829            suffix = "foo"
830            increase = 1.25
831
832            [sinks.out]
833            type = "test_basic"
834            inputs = ["four"]
835            "#,
836            Format::Toml,
837        )
838        .await
839        .unwrap_err();
840
841        assert_eq!(
842            errors,
843            vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
844        )
845    }
846
847    #[test]
848    fn default_data_dir() {
849        let config = load_from_str(
850            indoc! {r#"
851                [sources.in]
852                type = "test_basic"
853
854                [sinks.out]
855                type = "test_basic"
856                inputs = ["in"]
857            "#},
858            Format::Toml,
859        )
860        .unwrap();
861
862        assert_eq!(
863            Some(PathBuf::from("/var/lib/vector")),
864            config.global.data_dir
865        )
866    }
867
868    #[test]
869    fn default_schema() {
870        let config = load_from_str(
871            indoc! {r#"
872            [sources.in]
873            type = "test_basic"
874
875            [sinks.out]
876            type = "test_basic"
877            inputs = ["in"]
878            "#},
879            Format::Toml,
880        )
881        .unwrap();
882
883        assert_eq!(
884            "host",
885            config.global.log_schema.host_key().unwrap().to_string()
886        );
887        assert_eq!(
888            "message",
889            config.global.log_schema.message_key().unwrap().to_string()
890        );
891        assert_eq!(
892            "timestamp",
893            config
894                .global
895                .log_schema
896                .timestamp_key()
897                .unwrap()
898                .to_string()
899        );
900    }
901
902    #[test]
903    fn custom_schema() {
904        let config = load_from_str(
905            indoc! {r#"
906                [log_schema]
907                  host_key = "this"
908                  message_key = "that"
909                  timestamp_key = "then"
910
911                [sources.in]
912                  type = "test_basic"
913
914                [sinks.out]
915                  type = "test_basic"
916                  inputs = ["in"]
917            "#},
918            Format::Toml,
919        )
920        .unwrap();
921
922        assert_eq!(
923            "this",
924            config.global.log_schema.host_key().unwrap().to_string()
925        );
926        assert_eq!(
927            "that",
928            config.global.log_schema.message_key().unwrap().to_string()
929        );
930        assert_eq!(
931            "then",
932            config
933                .global
934                .log_schema
935                .timestamp_key()
936                .unwrap()
937                .to_string()
938        );
939    }
940
941    #[test]
942    fn config_append() {
943        let mut config: ConfigBuilder = format::deserialize(
944            indoc! {r#"
945                [sources.in]
946                  type = "test_basic"
947
948                [sinks.out]
949                  type = "test_basic"
950                  inputs = ["in"]
951            "#},
952            Format::Toml,
953        )
954        .unwrap();
955
956        assert_eq!(
957            config.append(
958                format::deserialize(
959                    indoc! {r#"
960                        data_dir = "/foobar"
961
962                        [proxy]
963                          http = "http://proxy.inc:3128"
964
965                        [transforms.foo]
966                          type = "test_basic"
967                          inputs = [ "in" ]
968                          suffix = "foo"
969                          increase = 1.25
970
971                        [[tests]]
972                          name = "check_simple_log"
973                          [tests.input]
974                            insert_at = "foo"
975                            type = "raw"
976                            value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
977                          [[tests.outputs]]
978                            extract_from = "foo"
979                            [[tests.outputs.conditions]]
980                              type = "vrl"
981                              source = ".message == \"Sorry, I'm busy this week Cecil\""
982                    "#},
983                    Format::Toml,
984                )
985                .unwrap()
986            ),
987            Ok(())
988        );
989
990        assert!(config.global.proxy.http.is_some());
991        assert!(config.global.proxy.https.is_none());
992        assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
993        assert!(config.sources.contains_key(&ComponentKey::from("in")));
994        assert!(config.sinks.contains_key(&ComponentKey::from("out")));
995        assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
996        assert_eq!(config.tests.len(), 1);
997    }
998
999    #[test]
1000    fn config_append_collisions() {
1001        let mut config: ConfigBuilder = format::deserialize(
1002            indoc! {r#"
1003                [sources.in]
1004                  type = "test_basic"
1005
1006                [sinks.out]
1007                  type = "test_basic"
1008                  inputs = ["in"]
1009            "#},
1010            Format::Toml,
1011        )
1012        .unwrap();
1013
1014        assert_eq!(
1015            config.append(
1016                format::deserialize(
1017                    indoc! {r#"
1018                        [sources.in]
1019                          type = "test_basic"
1020
1021                        [transforms.foo]
1022                          type = "test_basic"
1023                          inputs = [ "in" ]
1024                          suffix = "foo"
1025                          increase = 1.25
1026
1027                        [sinks.out]
1028                          type = "test_basic"
1029                          inputs = ["in"]
1030                    "#},
1031                    Format::Toml,
1032                )
1033                .unwrap()
1034            ),
1035            Err(vec![
1036                "duplicate source id found: in".into(),
1037                "duplicate sink id found: out".into(),
1038            ])
1039        );
1040    }
1041
1042    #[test]
1043    fn with_proxy() {
1044        let config: ConfigBuilder = format::deserialize(
1045            indoc! {r#"
1046                [proxy]
1047                  http = "http://server:3128"
1048                  https = "http://other:3128"
1049                  no_proxy = ["localhost", "127.0.0.1"]
1050
1051                [sources.in]
1052                  type = "nginx_metrics"
1053                  endpoints = ["http://localhost:8000/basic_status"]
1054                  proxy.http = "http://server:3128"
1055                  proxy.https = "http://other:3128"
1056                  proxy.no_proxy = ["localhost", "127.0.0.1"]
1057
1058                [sinks.out]
1059                  type = "console"
1060                  inputs = ["in"]
1061                  encoding.codec = "json"
1062            "#},
1063            Format::Toml,
1064        )
1065        .unwrap();
1066        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1067        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1068        assert!(config.global.proxy.no_proxy.matches("localhost"));
1069        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1070        assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1071        assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1072        assert!(source.proxy.no_proxy.matches("localhost"));
1073    }
1074
1075    #[test]
1076    fn with_partial_global_proxy() {
1077        let config: ConfigBuilder = format::deserialize(
1078            indoc! {r#"
1079                [proxy]
1080                  http = "http://server:3128"
1081
1082                [sources.in]
1083                  type = "nginx_metrics"
1084                  endpoints = ["http://localhost:8000/basic_status"]
1085
1086                [sources.in.proxy]
1087                  http = "http://server:3129"
1088                  https = "http://other:3129"
1089                  no_proxy = ["localhost", "127.0.0.1"]
1090
1091                [sinks.out]
1092                  type = "console"
1093                  inputs = ["in"]
1094                  encoding.codec = "json"
1095            "#},
1096            Format::Toml,
1097        )
1098        .unwrap();
1099        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1100        assert_eq!(config.global.proxy.https, None);
1101        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1102        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1103        assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1104        assert!(source.proxy.no_proxy.matches("localhost"));
1105    }
1106
1107    #[test]
1108    fn with_partial_source_proxy() {
1109        let config: ConfigBuilder = format::deserialize(
1110            indoc! {r#"
1111                [proxy]
1112                  http = "http://server:3128"
1113                  https = "http://other:3128"
1114
1115                [sources.in]
1116                  type = "nginx_metrics"
1117                  endpoints = ["http://localhost:8000/basic_status"]
1118
1119                [sources.in.proxy]
1120                  http = "http://server:3129"
1121                  no_proxy = ["localhost", "127.0.0.1"]
1122
1123                [sinks.out]
1124                  type = "console"
1125                  inputs = ["in"]
1126                  encoding.codec = "json"
1127            "#},
1128            Format::Toml,
1129        )
1130        .unwrap();
1131        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1132        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1133        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1134        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1135        assert_eq!(source.proxy.https, None);
1136        assert!(source.proxy.no_proxy.matches("localhost"));
1137    }
1138}
1139
1140#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1141mod acknowledgements_tests {
1142    use indoc::indoc;
1143
1144    use super::*;
1145
1146    #[test]
1147    fn propagates_settings() {
1148        // The topology:
1149        // in1 => out1
1150        // in2 => out2 (acks enabled)
1151        // in3 => parse3 => out3 (acks enabled)
1152        let config: ConfigBuilder = format::deserialize(
1153            indoc! {r#"
1154                data_dir = "/tmp"
1155                [sources.in1]
1156                    type = "file"
1157                    include = ["/var/log/**/*.log"]
1158                [sources.in2]
1159                    type = "file"
1160                    include = ["/var/log/**/*.log"]
1161                [sources.in3]
1162                    type = "file"
1163                    include = ["/var/log/**/*.log"]
1164                [transforms.parse3]
1165                    type = "test_basic"
1166                    inputs = ["in3"]
1167                    increase = 0.0
1168                    suffix = ""
1169                [sinks.out1]
1170                    type = "file"
1171                    inputs = ["in1"]
1172                    encoding.codec = "text"
1173                    path = "/path/to/out1"
1174                [sinks.out2]
1175                    type = "file"
1176                    inputs = ["in2"]
1177                    encoding.codec = "text"
1178                    path = "/path/to/out2"
1179                    acknowledgements = true
1180                [sinks.out3]
1181                    type = "file"
1182                    inputs = ["parse3"]
1183                    encoding.codec = "text"
1184                    path = "/path/to/out3"
1185                    acknowledgements.enabled = true
1186            "#},
1187            Format::Toml,
1188        )
1189        .unwrap();
1190
1191        for source in config.sources.values() {
1192            assert!(
1193                !source.sink_acknowledgements,
1194                "Source `sink_acknowledgements` should be `false` before propagation"
1195            );
1196        }
1197
1198        let config = config.build().unwrap();
1199
1200        let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1201        assert!(!get("in1").sink_acknowledgements);
1202        assert!(get("in2").sink_acknowledgements);
1203        assert!(get("in3").sink_acknowledgements);
1204    }
1205}
1206
1207#[cfg(test)]
1208mod resource_tests {
1209    use std::{
1210        collections::{HashMap, HashSet},
1211        net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1212    };
1213
1214    use proptest::prelude::*;
1215
1216    use super::Resource;
1217
1218    fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1219        Resource::tcp(SocketAddr::new(addr.into(), port))
1220    }
1221
1222    fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1223        Resource::udp(SocketAddr::new(addr.into(), port))
1224    }
1225
1226    fn unspecified() -> impl Strategy<Value = IpAddr> {
1227        prop_oneof![
1228            Just(Ipv4Addr::UNSPECIFIED.into()),
1229            Just(Ipv6Addr::UNSPECIFIED.into()),
1230        ]
1231    }
1232
1233    fn specaddr() -> impl Strategy<Value = IpAddr> {
1234        any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1235    }
1236
1237    fn specport() -> impl Strategy<Value = u16> {
1238        any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1239    }
1240
1241    fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1242        conflicts
1243            .into_iter()
1244            .map(|(key, values)| (key, values.into_iter().collect()))
1245            .collect()
1246    }
1247
1248    proptest! {
1249        #[test]
1250        fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1251            prop_assume!(port1 != port2);
1252            let components = vec![
1253                ("sink_0", vec![tcp(addr, 0)]),
1254                ("sink_1", vec![tcp(addr, port1)]),
1255                ("sink_2", vec![tcp(addr, port2)]),
1256            ];
1257            let conflicting = Resource::conflicts(components);
1258            assert_eq!(conflicting, HashMap::new());
1259        }
1260
1261        #[test]
1262        fn conflicting_pair(addr: IpAddr, port in specport()) {
1263            let components = vec![
1264                ("sink_0", vec![tcp(addr, 0)]),
1265                ("sink_1", vec![tcp(addr, port)]),
1266                ("sink_2", vec![tcp(addr, port)]),
1267            ];
1268            let conflicting = Resource::conflicts(components);
1269            assert_eq!(
1270                conflicting,
1271                hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1272            );
1273        }
1274
1275        #[test]
1276        fn conflicting_multi(addr: IpAddr, port in specport()) {
1277            let components = vec![
1278                ("sink_0", vec![tcp(addr, 0)]),
1279                ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1280                ("sink_2", vec![tcp(addr, port)]),
1281            ];
1282            let conflicting = Resource::conflicts(components);
1283            assert_eq!(
1284                conflicting,
1285                hashmap(vec![
1286                    (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1287                    (tcp(addr, port), vec!["sink_1", "sink_2"])
1288                ])
1289            );
1290        }
1291
1292        #[test]
1293        fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1294            prop_assume!(addr1 != addr2);
1295            let components = vec![
1296                ("sink_0", vec![tcp(addr1, port)]),
1297                ("sink_1", vec![tcp(addr2, port)]),
1298            ];
1299            let conflicting = Resource::conflicts(components);
1300            assert_eq!(conflicting, HashMap::new());
1301        }
1302
1303        #[test]
1304        fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1305            let components = vec![
1306                ("sink_0", vec![tcp(addr, port)]),
1307                ("sink_1", vec![tcp(unspec, port)]),
1308            ];
1309            let conflicting = Resource::conflicts(components);
1310            assert_eq!(conflicting, HashMap::new());
1311        }
1312
1313        #[test]
1314        fn different_protocol(addr: IpAddr) {
1315            let components = vec![
1316                ("sink_0", vec![tcp(addr, 0)]),
1317                ("sink_1", vec![udp(addr, 0)]),
1318            ];
1319            let conflicting = Resource::conflicts(components);
1320            assert_eq!(conflicting, HashMap::new());
1321        }
1322    }
1323
1324    #[test]
1325    fn different_unspecified_ip_version() {
1326        let components = vec![
1327            ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1328            ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1329        ];
1330        let conflicting = Resource::conflicts(components);
1331        assert_eq!(conflicting, HashMap::new());
1332    }
1333}
1334
1335#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1336mod resource_config_tests {
1337    use indoc::indoc;
1338    use vector_lib::configurable::schema::generate_root_schema;
1339
1340    use super::{Format, load_from_str};
1341
1342    #[test]
1343    fn config_conflict_detected() {
1344        assert!(
1345            load_from_str(
1346                indoc! {r#"
1347                [sources.in0]
1348                  type = "stdin"
1349
1350                [sources.in1]
1351                  type = "stdin"
1352
1353                [sinks.out]
1354                  type = "console"
1355                  inputs = ["in0","in1"]
1356                  encoding.codec = "json"
1357            "#},
1358                Format::Toml,
1359            )
1360            .is_err()
1361        );
1362    }
1363
1364    #[test]
1365    #[ignore]
1366    #[allow(clippy::print_stdout)]
1367    #[allow(clippy::print_stderr)]
1368    fn generate_component_config_schema() {
1369        use indexmap::IndexMap;
1370        use vector_lib::{config::ComponentKey, configurable::configurable_component};
1371
1372        use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1373
1374        /// Top-level Vector configuration.
1375        #[configurable_component]
1376        #[derive(Clone)]
1377        struct ComponentsOnlyConfig {
1378            /// Configured sources.
1379            #[serde(default)]
1380            pub sources: IndexMap<ComponentKey, SourceOuter>,
1381
1382            /// Configured transforms.
1383            #[serde(default)]
1384            pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1385
1386            /// Configured sinks.
1387            #[serde(default)]
1388            pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1389        }
1390
1391        match generate_root_schema::<ComponentsOnlyConfig>() {
1392            Ok(schema) => {
1393                let json = serde_json::to_string_pretty(&schema)
1394                    .expect("rendering root schema to JSON should not fail");
1395
1396                println!("{json}");
1397            }
1398            Err(e) => eprintln!("error while generating schema: {e:?}"),
1399        }
1400    }
1401}