vector/config/
mod.rs

1#![allow(missing_docs)]
2use std::{
3    collections::{HashMap, HashSet},
4    fmt::{self, Display, Formatter},
5    fs,
6    hash::Hash,
7    net::SocketAddr,
8    path::PathBuf,
9    time::Duration,
10};
11
12use indexmap::IndexMap;
13use serde::Serialize;
14use vector_config::configurable_component;
15pub use vector_lib::{
16    config::{
17        AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
18        SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
19    },
20    configurable::component::{GenerateConfig, SinkDescription, TransformDescription},
21};
22
23use crate::{
24    conditions,
25    event::{Metric, Value},
26    secrets::SecretBackends,
27    serde::OneOrMany,
28};
29
30pub mod api;
31mod builder;
32mod cmd;
33mod compiler;
34mod diff;
35pub mod dot_graph;
36mod enrichment_table;
37pub mod format;
38mod graph;
39mod loading;
40pub mod provider;
41pub mod schema;
42mod secret;
43mod sink;
44mod source;
45mod transform;
46pub mod unit_test;
47mod validation;
48mod vars;
49pub mod watcher;
50
51pub use builder::ConfigBuilder;
52pub use cmd::{Opts, cmd};
53pub use diff::ConfigDiff;
54pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
55pub use format::{Format, FormatHint};
56pub use loading::{
57    COLLECTOR, CONFIG_PATHS, load, load_builder_from_paths, load_from_paths,
58    load_from_paths_with_provider_and_secrets, load_from_str, load_from_str_with_secrets,
59    load_source_from_paths, merge_path_lists, process_paths,
60};
61pub use provider::ProviderConfig;
62pub use secret::SecretBackend;
63pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
64pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
65pub use transform::{
66    BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids,
67};
68pub use unit_test::{UnitTestResult, build_unit_tests, build_unit_tests_main};
69pub use validation::warnings;
70pub use vars::{ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX, interpolate};
71pub use vector_lib::{
72    config::{
73        ComponentKey, LogSchema, OutputId, init_log_schema, init_telemetry, log_schema,
74        proxy::ProxyConfig, telemetry,
75    },
76    id::Inputs,
77};
78
79#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80// // This is not a comprehensive set; variants are added as needed.
81pub enum ComponentType {
82    Transform,
83    Sink,
84    EnrichmentTable,
85}
86
87#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
88pub struct ComponentConfig {
89    pub config_paths: Vec<PathBuf>,
90    pub component_key: ComponentKey,
91    pub component_type: ComponentType,
92}
93
94impl ComponentConfig {
95    pub fn new(
96        config_paths: Vec<PathBuf>,
97        component_key: ComponentKey,
98        component_type: ComponentType,
99    ) -> Self {
100        let canonicalized_paths = config_paths
101            .into_iter()
102            .filter_map(|p| fs::canonicalize(p).ok())
103            .collect();
104
105        Self {
106            config_paths: canonicalized_paths,
107            component_key,
108            component_type,
109        }
110    }
111
112    pub fn contains(
113        &self,
114        config_paths: &HashSet<PathBuf>,
115    ) -> Option<(ComponentKey, ComponentType)> {
116        if config_paths.iter().any(|p| self.config_paths.contains(p)) {
117            return Some((self.component_key.clone(), self.component_type.clone()));
118        }
119        None
120    }
121}
122
123#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
124pub enum ConfigPath {
125    File(PathBuf, FormatHint),
126    Dir(PathBuf),
127}
128
129impl<'a> From<&'a ConfigPath> for &'a PathBuf {
130    fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
131        match config_path {
132            ConfigPath::File(path, _) => path,
133            ConfigPath::Dir(path) => path,
134        }
135    }
136}
137
138impl ConfigPath {
139    pub const fn as_dir(&self) -> Option<&PathBuf> {
140        match self {
141            Self::Dir(path) => Some(path),
142            _ => None,
143        }
144    }
145}
146
147#[derive(Debug, Default, Serialize)]
148pub struct Config {
149    #[cfg(feature = "api")]
150    pub api: api::Options,
151    pub schema: schema::Options,
152    pub global: GlobalOptions,
153    pub healthchecks: HealthcheckOptions,
154    sources: IndexMap<ComponentKey, SourceOuter>,
155    sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
156    transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
157    pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
158    tests: Vec<TestDefinition>,
159    secret: IndexMap<ComponentKey, SecretBackends>,
160    pub graceful_shutdown_duration: Option<Duration>,
161}
162
163impl Config {
164    pub fn builder() -> builder::ConfigBuilder {
165        Default::default()
166    }
167
168    pub fn is_empty(&self) -> bool {
169        self.sources.is_empty()
170    }
171
172    pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
173        self.sources.iter()
174    }
175
176    pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
177        self.sources.get(id)
178    }
179
180    pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
181        self.transforms.iter()
182    }
183
184    pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
185        self.transforms.get(id)
186    }
187
188    pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
189        self.sinks.iter()
190    }
191
192    pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
193        self.sinks.get(id)
194    }
195
196    pub fn enrichment_tables(
197        &self,
198    ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
199        self.enrichment_tables.iter()
200    }
201
202    pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
203        self.enrichment_tables.get(id)
204    }
205
206    pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
207        self.transforms
208            .get(id)
209            .map(|t| &t.inputs[..])
210            .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
211            .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
212    }
213
214    pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
215        let inputs: Vec<_> = self
216            .sinks
217            .iter()
218            .filter(|(_, sink)| {
219                sink.inner
220                    .acknowledgements()
221                    .merge_default(&self.global.acknowledgements)
222                    .enabled()
223            })
224            .flat_map(|(name, sink)| {
225                sink.inputs
226                    .iter()
227                    .map(|input| (name.clone(), input.clone()))
228            })
229            .collect();
230        self.propagate_acks_rec(inputs);
231        Ok(())
232    }
233
234    fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
235        for (sink, input) in sink_inputs {
236            let component = &input.component;
237            if let Some(source) = self.sources.get_mut(component) {
238                if source.inner.can_acknowledge() {
239                    source.sink_acknowledgements = true;
240                } else {
241                    warn!(
242                        message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
243                        source = component.id(),
244                        sink = sink.id(),
245                    );
246                }
247            } else if let Some(transform) = self.transforms.get(component) {
248                let inputs = transform
249                    .inputs
250                    .iter()
251                    .map(|input| (sink.clone(), input.clone()))
252                    .collect();
253                self.propagate_acks_rec(inputs);
254            }
255        }
256    }
257
258    pub fn transform_keys_with_external_files(&self) -> HashSet<ComponentKey> {
259        self.transforms
260            .iter()
261            .filter_map(|(name, transform_outer)| {
262                if !transform_outer.inner.files_to_watch().is_empty() {
263                    Some(name.clone())
264                } else {
265                    None
266                }
267            })
268            .collect()
269    }
270}
271
272/// Healthcheck options.
273#[configurable_component(global_option("healthchecks"))]
274#[derive(Clone, Copy, Debug)]
275#[serde(default)]
276pub struct HealthcheckOptions {
277    /// Whether or not healthchecks are enabled for all sinks.
278    ///
279    /// Can be overridden on a per-sink basis.
280    pub enabled: bool,
281
282    /// Whether or not to require a sink to report as being healthy during startup.
283    ///
284    /// When enabled and a sink reports not being healthy, Vector will exit during start-up.
285    ///
286    /// Can be alternatively set, and overridden by, the `--require-healthy` command-line flag.
287    pub require_healthy: bool,
288}
289
290impl HealthcheckOptions {
291    pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
292        if let Some(require_healthy) = require_healthy.into() {
293            self.require_healthy = require_healthy;
294        }
295    }
296
297    const fn merge(&mut self, other: Self) {
298        self.enabled &= other.enabled;
299        self.require_healthy |= other.require_healthy;
300    }
301}
302
303impl Default for HealthcheckOptions {
304    fn default() -> Self {
305        Self {
306            enabled: true,
307            require_healthy: false,
308        }
309    }
310}
311
312impl_generate_config_from_default!(HealthcheckOptions);
313
314/// Unique thing, like port, of which only one owner can be.
315#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
316pub enum Resource {
317    Port(SocketAddr, Protocol),
318    SystemFdOffset(usize),
319    Fd(u32),
320    DiskBuffer(String),
321}
322
323#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
324pub enum Protocol {
325    Tcp,
326    Udp,
327}
328
329impl Resource {
330    pub const fn tcp(addr: SocketAddr) -> Self {
331        Self::Port(addr, Protocol::Tcp)
332    }
333
334    pub const fn udp(addr: SocketAddr) -> Self {
335        Self::Port(addr, Protocol::Udp)
336    }
337
338    /// From given components returns all that have a resource conflict with any other component.
339    pub fn conflicts<K: Eq + Hash + Clone>(
340        components: impl IntoIterator<Item = (K, Vec<Resource>)>,
341    ) -> HashMap<Resource, HashSet<K>> {
342        let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
343        let mut unspecified = Vec::new();
344
345        // Find equality based conflicts
346        for (key, resources) in components {
347            for resource in resources {
348                if let Resource::Port(address, protocol) = &resource
349                    && address.ip().is_unspecified()
350                {
351                    unspecified.push((key.clone(), *address, *protocol));
352                }
353
354                resource_map
355                    .entry(resource)
356                    .or_default()
357                    .insert(key.clone());
358            }
359        }
360
361        // Port with unspecified address will bind to all network interfaces
362        // so we have to check for all Port resources if they share the same
363        // port.
364        for (key, address0, protocol0) in unspecified {
365            for (resource, components) in resource_map.iter_mut() {
366                if let Resource::Port(address, protocol) = resource {
367                    // IP addresses can either be v4 or v6.
368                    // Therefore we check if the ip version matches, the port matches and if the protocol (TCP/UDP) matches
369                    // when checking for equality.
370                    if &address0 == address && &protocol0 == protocol {
371                        components.insert(key.clone());
372                    }
373                }
374            }
375        }
376
377        resource_map.retain(|_, components| components.len() > 1);
378
379        resource_map
380    }
381}
382
383impl Display for Protocol {
384    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
385        match self {
386            Protocol::Udp => write!(fmt, "udp"),
387            Protocol::Tcp => write!(fmt, "tcp"),
388        }
389    }
390}
391
392impl Display for Resource {
393    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
394        match self {
395            Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
396            Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
397            Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
398            Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
399        }
400    }
401}
402
403/// A unit test definition.
404#[configurable_component]
405#[derive(Clone, Debug)]
406#[serde(deny_unknown_fields)]
407pub struct TestDefinition<T: 'static = OutputId> {
408    /// The name of the unit test.
409    pub name: String,
410
411    /// An input event to test against.
412    pub input: Option<TestInput>,
413
414    /// A set of input events to test against.
415    #[serde(default)]
416    pub inputs: Vec<TestInput>,
417
418    /// A set of expected output events after the test has run.
419    #[serde(default)]
420    pub outputs: Vec<TestOutput<T>>,
421
422    /// A set of component outputs that should not have emitted any events.
423    #[serde(default)]
424    pub no_outputs_from: Vec<T>,
425}
426
427impl TestDefinition<String> {
428    fn resolve_outputs(
429        self,
430        graph: &graph::Graph,
431    ) -> Result<TestDefinition<OutputId>, Vec<String>> {
432        let TestDefinition {
433            name,
434            input,
435            inputs,
436            outputs,
437            no_outputs_from,
438        } = self;
439        let mut errors = Vec::new();
440
441        let output_map = graph.input_map().expect("ambiguous outputs");
442
443        let outputs = outputs
444            .into_iter()
445            .map(|old| {
446                let TestOutput {
447                    extract_from,
448                    conditions,
449                } = old;
450
451                (extract_from.to_vec(), conditions)
452            })
453            .filter_map(|(extract_from, conditions)| {
454                let mut outputs = Vec::new();
455                for from in extract_from {
456                    if no_outputs_from.contains(&from) {
457                        errors.push(format!(
458                            r#"Invalid extract_from target in test '{name}': '{from}' listed in no_outputs_from"#
459                        ));
460                    } else if let Some(output_id) = output_map.get(&from) {
461                        outputs.push(output_id.clone());
462                    } else {
463                        errors.push(format!(
464                            r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
465                        ));
466                    }
467                }
468                if outputs.is_empty() {
469                    None
470                } else {
471                    Some(TestOutput {
472                        extract_from: outputs.into(),
473                        conditions,
474                    })
475                }
476            })
477            .collect();
478
479        let no_outputs_from = no_outputs_from
480            .into_iter()
481            .filter_map(|o| {
482                if let Some(output_id) = output_map.get(&o) {
483                    Some(output_id.clone())
484                } else {
485                    errors.push(format!(
486                        r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
487                    ));
488                    None
489                }
490            })
491            .collect();
492
493        if errors.is_empty() {
494            Ok(TestDefinition {
495                name,
496                input,
497                inputs,
498                outputs,
499                no_outputs_from,
500            })
501        } else {
502            Err(errors)
503        }
504    }
505}
506
507impl TestDefinition<OutputId> {
508    fn stringify(self) -> TestDefinition<String> {
509        let TestDefinition {
510            name,
511            input,
512            inputs,
513            outputs,
514            no_outputs_from,
515        } = self;
516
517        let outputs = outputs
518            .into_iter()
519            .map(|old| TestOutput {
520                extract_from: old
521                    .extract_from
522                    .to_vec()
523                    .into_iter()
524                    .map(|item| item.to_string())
525                    .collect::<Vec<_>>()
526                    .into(),
527                conditions: old.conditions,
528            })
529            .collect();
530
531        let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
532
533        TestDefinition {
534            name,
535            input,
536            inputs,
537            outputs,
538            no_outputs_from,
539        }
540    }
541}
542
543/// A unit test input.
544///
545/// An input describes not only the type of event to insert, but also which transform within the
546/// configuration to insert it to.
547#[configurable_component]
548#[derive(Clone, Debug)]
549#[serde(deny_unknown_fields)]
550pub struct TestInput {
551    /// The name of the transform to insert the input event to.
552    pub insert_at: ComponentKey,
553
554    /// The type of the input event.
555    ///
556    /// Can be either `raw`, `vrl`, `log`, or `metric.
557    #[serde(default = "default_test_input_type", rename = "type")]
558    pub type_str: String,
559
560    /// The raw string value to use as the input event.
561    ///
562    /// Use this only when the input event should be a raw event (i.e. unprocessed/undecoded log
563    /// event) and when the input type is set to `raw`.
564    pub value: Option<String>,
565
566    /// The vrl expression to generate the input event.
567    ///
568    /// Only relevant when `type` is `vrl`.
569    pub source: Option<String>,
570
571    /// The set of log fields to use when creating a log input event.
572    ///
573    /// Only relevant when `type` is `log`.
574    pub log_fields: Option<IndexMap<String, Value>>,
575
576    /// The metric to use as an input event.
577    ///
578    /// Only relevant when `type` is `metric`.
579    pub metric: Option<Metric>,
580}
581
582fn default_test_input_type() -> String {
583    "raw".to_string()
584}
585
586/// A unit test output.
587///
588/// An output describes what we expect a transform to emit when fed a certain event, or events, when
589/// running a unit test.
590#[configurable_component]
591#[derive(Clone, Debug)]
592#[serde(deny_unknown_fields)]
593pub struct TestOutput<T: 'static = OutputId> {
594    /// The transform outputs to extract events from.
595    pub extract_from: OneOrMany<T>,
596
597    /// The conditions to run against the output to validate that they were transformed as expected.
598    pub conditions: Option<Vec<conditions::AnyCondition>>,
599}
600
601#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
602mod tests {
603    use std::path::PathBuf;
604
605    use indoc::indoc;
606
607    use super::{ComponentKey, ConfigDiff, Format, builder::ConfigBuilder, format, load_from_str};
608    use crate::{config, topology::builder::TopologyPiecesBuilder};
609
610    async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
611        match config::load_from_str(config, format) {
612            Ok(c) => {
613                let diff = ConfigDiff::initial(&c);
614                let c2 = config::load_from_str(config, format).unwrap();
615                match (
616                    config::warnings(&c2),
617                    TopologyPiecesBuilder::new(&c, &diff).build().await,
618                ) {
619                    (warnings, Ok(_pieces)) => Ok(warnings),
620                    (_, Err(errors)) => Err(errors),
621                }
622            }
623            Err(error) => Err(error),
624        }
625    }
626
627    #[tokio::test]
628    async fn bad_inputs() {
629        let err = load(
630            r#"
631            [sources.in]
632            type = "test_basic"
633
634            [transforms.sample]
635            type = "test_basic"
636            inputs = []
637            suffix = "foo"
638            increase = 1.25
639
640            [transforms.sample2]
641            type = "test_basic"
642            inputs = ["qwerty"]
643            suffix = "foo"
644            increase = 1.25
645
646            [sinks.out]
647            type = "test_basic"
648            inputs = ["asdf", "in", "in"]
649            "#,
650            Format::Toml,
651        )
652        .await
653        .unwrap_err();
654
655        assert_eq!(
656            vec![
657                "Sink \"out\" has input \"in\" duplicated 2 times",
658                "Transform \"sample\" has no inputs",
659                "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
660                "Input \"asdf\" for sink \"out\" doesn't match any components.",
661            ],
662            err,
663        );
664    }
665
666    #[tokio::test]
667    async fn duplicate_name() {
668        let err = load(
669            r#"
670            [sources.foo]
671            type = "test_basic"
672
673            [sources.bar]
674            type = "test_basic"
675
676            [transforms.foo]
677            type = "test_basic"
678            inputs = ["bar"]
679            suffix = "foo"
680            increase = 1.25
681
682            [sinks.out]
683            type = "test_basic"
684            inputs = ["foo"]
685            "#,
686            Format::Toml,
687        )
688        .await
689        .unwrap_err();
690
691        assert_eq!(
692            err,
693            vec!["More than one component with name \"foo\" (source, transform).",]
694        );
695    }
696
697    #[tokio::test]
698    #[cfg(unix)]
699    async fn conflicting_stdin_and_fd_resources() {
700        let errors = load(
701            r#"
702            [sources.stdin]
703            type = "stdin"
704
705            [sources.file_descriptor]
706            type = "file_descriptor"
707            fd = 0
708
709            [sinks.out]
710            type = "test_basic"
711            inputs = ["stdin", "file_descriptor"]
712            "#,
713            Format::Toml,
714        )
715        .await
716        .unwrap_err();
717
718        assert_eq!(errors.len(), 1);
719        let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
720        assert!(errors[0].starts_with(expected_prefix));
721    }
722
723    #[tokio::test]
724    #[cfg(unix)]
725    async fn conflicting_fd_resources() {
726        let errors = load(
727            r#"
728            [sources.file_descriptor1]
729            type = "file_descriptor"
730            fd = 10
731            [sources.file_descriptor2]
732            type = "file_descriptor"
733            fd = 10
734            [sinks.out]
735            type = "test_basic"
736            inputs = ["file_descriptor1", "file_descriptor2"]
737            "#,
738            Format::Toml,
739        )
740        .await
741        .unwrap_err();
742
743        assert_eq!(errors.len(), 1);
744        let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
745        assert!(errors[0].starts_with(expected_prefix));
746    }
747
748    #[tokio::test]
749    #[cfg(all(unix, feature = "sources-file_descriptor"))]
750    async fn no_conflict_fd_resources() {
751        use crate::sources::file_descriptors::file_descriptor::null_fd;
752        let fd1 = null_fd().unwrap();
753        let fd2 = null_fd().unwrap();
754        let result = load(
755            &format!(
756                r#"
757            [sources.file_descriptor1]
758            type = "file_descriptor"
759            fd = {fd1}
760
761            [sources.file_descriptor2]
762            type = "file_descriptor"
763            fd = {fd2}
764
765            [sinks.out]
766            type = "test_basic"
767            inputs = ["file_descriptor1", "file_descriptor2"]
768            "#
769            ),
770            Format::Toml,
771        )
772        .await;
773
774        let expected = Ok(vec![]);
775        assert_eq!(result, expected);
776    }
777
778    #[tokio::test]
779    async fn warnings() {
780        let warnings = load(
781            r#"
782            [sources.in1]
783            type = "test_basic"
784
785            [sources.in2]
786            type = "test_basic"
787
788            [transforms.sample1]
789            type = "test_basic"
790            inputs = ["in1"]
791            suffix = "foo"
792            increase = 1.25
793
794            [transforms.sample2]
795            type = "test_basic"
796            inputs = ["in1"]
797            suffix = "foo"
798            increase = 1.25
799
800            [sinks.out]
801            type = "test_basic"
802            inputs = ["sample1"]
803            "#,
804            Format::Toml,
805        )
806        .await
807        .unwrap();
808
809        assert_eq!(
810            warnings,
811            vec![
812                "Transform \"sample2\" has no consumers",
813                "Source \"in2\" has no consumers",
814            ]
815        )
816    }
817
818    #[tokio::test]
819    async fn cycle() {
820        let errors = load(
821            r#"
822            [sources.in]
823            type = "test_basic"
824
825            [transforms.one]
826            type = "test_basic"
827            inputs = ["in"]
828            suffix = "foo"
829            increase = 1.25
830
831            [transforms.two]
832            type = "test_basic"
833            inputs = ["one", "four"]
834            suffix = "foo"
835            increase = 1.25
836
837            [transforms.three]
838            type = "test_basic"
839            inputs = ["two"]
840            suffix = "foo"
841            increase = 1.25
842
843            [transforms.four]
844            type = "test_basic"
845            inputs = ["three"]
846            suffix = "foo"
847            increase = 1.25
848
849            [sinks.out]
850            type = "test_basic"
851            inputs = ["four"]
852            "#,
853            Format::Toml,
854        )
855        .await
856        .unwrap_err();
857
858        assert_eq!(
859            errors,
860            vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
861        )
862    }
863
864    #[test]
865    fn default_data_dir() {
866        let config = load_from_str(
867            indoc! {r#"
868                [sources.in]
869                type = "test_basic"
870
871                [sinks.out]
872                type = "test_basic"
873                inputs = ["in"]
874            "#},
875            Format::Toml,
876        )
877        .unwrap();
878
879        assert_eq!(
880            Some(PathBuf::from("/var/lib/vector")),
881            config.global.data_dir
882        )
883    }
884
885    #[test]
886    fn default_schema() {
887        let config = load_from_str(
888            indoc! {r#"
889            [sources.in]
890            type = "test_basic"
891
892            [sinks.out]
893            type = "test_basic"
894            inputs = ["in"]
895            "#},
896            Format::Toml,
897        )
898        .unwrap();
899
900        assert_eq!(
901            "host",
902            config.global.log_schema.host_key().unwrap().to_string()
903        );
904        assert_eq!(
905            "message",
906            config.global.log_schema.message_key().unwrap().to_string()
907        );
908        assert_eq!(
909            "timestamp",
910            config
911                .global
912                .log_schema
913                .timestamp_key()
914                .unwrap()
915                .to_string()
916        );
917    }
918
919    #[test]
920    fn custom_schema() {
921        let config = load_from_str(
922            indoc! {r#"
923                [log_schema]
924                  host_key = "this"
925                  message_key = "that"
926                  timestamp_key = "then"
927
928                [sources.in]
929                  type = "test_basic"
930
931                [sinks.out]
932                  type = "test_basic"
933                  inputs = ["in"]
934            "#},
935            Format::Toml,
936        )
937        .unwrap();
938
939        assert_eq!(
940            "this",
941            config.global.log_schema.host_key().unwrap().to_string()
942        );
943        assert_eq!(
944            "that",
945            config.global.log_schema.message_key().unwrap().to_string()
946        );
947        assert_eq!(
948            "then",
949            config
950                .global
951                .log_schema
952                .timestamp_key()
953                .unwrap()
954                .to_string()
955        );
956    }
957
958    #[test]
959    fn config_append() {
960        let mut config: ConfigBuilder = format::deserialize(
961            indoc! {r#"
962                [sources.in]
963                  type = "test_basic"
964
965                [sinks.out]
966                  type = "test_basic"
967                  inputs = ["in"]
968            "#},
969            Format::Toml,
970        )
971        .unwrap();
972
973        assert_eq!(
974            config.append(
975                format::deserialize(
976                    indoc! {r#"
977                        data_dir = "/foobar"
978
979                        [proxy]
980                          http = "http://proxy.inc:3128"
981
982                        [transforms.foo]
983                          type = "test_basic"
984                          inputs = [ "in" ]
985                          suffix = "foo"
986                          increase = 1.25
987
988                        [[tests]]
989                          name = "check_simple_log"
990                          [tests.input]
991                            insert_at = "foo"
992                            type = "raw"
993                            value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
994                          [[tests.outputs]]
995                            extract_from = "foo"
996                            [[tests.outputs.conditions]]
997                              type = "vrl"
998                              source = ".message == \"Sorry, I'm busy this week Cecil\""
999                    "#},
1000                    Format::Toml,
1001                )
1002                .unwrap()
1003            ),
1004            Ok(())
1005        );
1006
1007        assert!(config.global.proxy.http.is_some());
1008        assert!(config.global.proxy.https.is_none());
1009        assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
1010        assert!(config.sources.contains_key(&ComponentKey::from("in")));
1011        assert!(config.sinks.contains_key(&ComponentKey::from("out")));
1012        assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
1013        assert_eq!(config.tests.len(), 1);
1014    }
1015
1016    #[test]
1017    fn config_append_collisions() {
1018        let mut config: ConfigBuilder = format::deserialize(
1019            indoc! {r#"
1020                [sources.in]
1021                  type = "test_basic"
1022
1023                [sinks.out]
1024                  type = "test_basic"
1025                  inputs = ["in"]
1026            "#},
1027            Format::Toml,
1028        )
1029        .unwrap();
1030
1031        assert_eq!(
1032            config.append(
1033                format::deserialize(
1034                    indoc! {r#"
1035                        [sources.in]
1036                          type = "test_basic"
1037
1038                        [transforms.foo]
1039                          type = "test_basic"
1040                          inputs = [ "in" ]
1041                          suffix = "foo"
1042                          increase = 1.25
1043
1044                        [sinks.out]
1045                          type = "test_basic"
1046                          inputs = ["in"]
1047                    "#},
1048                    Format::Toml,
1049                )
1050                .unwrap()
1051            ),
1052            Err(vec![
1053                "duplicate source id found: in".into(),
1054                "duplicate sink id found: out".into(),
1055            ])
1056        );
1057    }
1058
1059    #[test]
1060    fn with_proxy() {
1061        let config: ConfigBuilder = format::deserialize(
1062            indoc! {r#"
1063                [proxy]
1064                  http = "http://server:3128"
1065                  https = "http://other:3128"
1066                  no_proxy = ["localhost", "127.0.0.1"]
1067
1068                [sources.in]
1069                  type = "nginx_metrics"
1070                  endpoints = ["http://localhost:8000/basic_status"]
1071                  proxy.http = "http://server:3128"
1072                  proxy.https = "http://other:3128"
1073                  proxy.no_proxy = ["localhost", "127.0.0.1"]
1074
1075                [sinks.out]
1076                  type = "console"
1077                  inputs = ["in"]
1078                  encoding.codec = "json"
1079            "#},
1080            Format::Toml,
1081        )
1082        .unwrap();
1083        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1084        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1085        assert!(config.global.proxy.no_proxy.matches("localhost"));
1086        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1087        assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1088        assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1089        assert!(source.proxy.no_proxy.matches("localhost"));
1090    }
1091
1092    #[test]
1093    fn with_partial_global_proxy() {
1094        let config: ConfigBuilder = format::deserialize(
1095            indoc! {r#"
1096                [proxy]
1097                  http = "http://server:3128"
1098
1099                [sources.in]
1100                  type = "nginx_metrics"
1101                  endpoints = ["http://localhost:8000/basic_status"]
1102
1103                [sources.in.proxy]
1104                  http = "http://server:3129"
1105                  https = "http://other:3129"
1106                  no_proxy = ["localhost", "127.0.0.1"]
1107
1108                [sinks.out]
1109                  type = "console"
1110                  inputs = ["in"]
1111                  encoding.codec = "json"
1112            "#},
1113            Format::Toml,
1114        )
1115        .unwrap();
1116        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1117        assert_eq!(config.global.proxy.https, None);
1118        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1119        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1120        assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1121        assert!(source.proxy.no_proxy.matches("localhost"));
1122    }
1123
1124    #[test]
1125    fn with_partial_source_proxy() {
1126        let config: ConfigBuilder = format::deserialize(
1127            indoc! {r#"
1128                [proxy]
1129                  http = "http://server:3128"
1130                  https = "http://other:3128"
1131
1132                [sources.in]
1133                  type = "nginx_metrics"
1134                  endpoints = ["http://localhost:8000/basic_status"]
1135
1136                [sources.in.proxy]
1137                  http = "http://server:3129"
1138                  no_proxy = ["localhost", "127.0.0.1"]
1139
1140                [sinks.out]
1141                  type = "console"
1142                  inputs = ["in"]
1143                  encoding.codec = "json"
1144            "#},
1145            Format::Toml,
1146        )
1147        .unwrap();
1148        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1149        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1150        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1151        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1152        assert_eq!(source.proxy.https, None);
1153        assert!(source.proxy.no_proxy.matches("localhost"));
1154    }
1155}
1156
1157#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1158mod acknowledgements_tests {
1159    use indoc::indoc;
1160
1161    use super::*;
1162
1163    #[test]
1164    fn propagates_settings() {
1165        // The topology:
1166        // in1 => out1
1167        // in2 => out2 (acks enabled)
1168        // in3 => parse3 => out3 (acks enabled)
1169        let config: ConfigBuilder = format::deserialize(
1170            indoc! {r#"
1171                data_dir = "/tmp"
1172                [sources.in1]
1173                    type = "file"
1174                    include = ["/var/log/**/*.log"]
1175                [sources.in2]
1176                    type = "file"
1177                    include = ["/var/log/**/*.log"]
1178                [sources.in3]
1179                    type = "file"
1180                    include = ["/var/log/**/*.log"]
1181                [transforms.parse3]
1182                    type = "test_basic"
1183                    inputs = ["in3"]
1184                    increase = 0.0
1185                    suffix = ""
1186                [sinks.out1]
1187                    type = "file"
1188                    inputs = ["in1"]
1189                    encoding.codec = "text"
1190                    path = "/path/to/out1"
1191                [sinks.out2]
1192                    type = "file"
1193                    inputs = ["in2"]
1194                    encoding.codec = "text"
1195                    path = "/path/to/out2"
1196                    acknowledgements = true
1197                [sinks.out3]
1198                    type = "file"
1199                    inputs = ["parse3"]
1200                    encoding.codec = "text"
1201                    path = "/path/to/out3"
1202                    acknowledgements.enabled = true
1203            "#},
1204            Format::Toml,
1205        )
1206        .unwrap();
1207
1208        for source in config.sources.values() {
1209            assert!(
1210                !source.sink_acknowledgements,
1211                "Source `sink_acknowledgements` should be `false` before propagation"
1212            );
1213        }
1214
1215        let config = config.build().unwrap();
1216
1217        let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1218        assert!(!get("in1").sink_acknowledgements);
1219        assert!(get("in2").sink_acknowledgements);
1220        assert!(get("in3").sink_acknowledgements);
1221    }
1222}
1223
1224#[cfg(test)]
1225mod resource_tests {
1226    use std::{
1227        collections::{HashMap, HashSet},
1228        net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1229    };
1230
1231    use proptest::prelude::*;
1232
1233    use super::Resource;
1234
1235    fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1236        Resource::tcp(SocketAddr::new(addr.into(), port))
1237    }
1238
1239    fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1240        Resource::udp(SocketAddr::new(addr.into(), port))
1241    }
1242
1243    fn unspecified() -> impl Strategy<Value = IpAddr> {
1244        prop_oneof![
1245            Just(Ipv4Addr::UNSPECIFIED.into()),
1246            Just(Ipv6Addr::UNSPECIFIED.into()),
1247        ]
1248    }
1249
1250    fn specaddr() -> impl Strategy<Value = IpAddr> {
1251        any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1252    }
1253
1254    fn specport() -> impl Strategy<Value = u16> {
1255        any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1256    }
1257
1258    fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1259        conflicts
1260            .into_iter()
1261            .map(|(key, values)| (key, values.into_iter().collect()))
1262            .collect()
1263    }
1264
1265    proptest! {
1266        #[test]
1267        fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1268            prop_assume!(port1 != port2);
1269            let components = vec![
1270                ("sink_0", vec![tcp(addr, 0)]),
1271                ("sink_1", vec![tcp(addr, port1)]),
1272                ("sink_2", vec![tcp(addr, port2)]),
1273            ];
1274            let conflicting = Resource::conflicts(components);
1275            assert_eq!(conflicting, HashMap::new());
1276        }
1277
1278        #[test]
1279        fn conflicting_pair(addr: IpAddr, port in specport()) {
1280            let components = vec![
1281                ("sink_0", vec![tcp(addr, 0)]),
1282                ("sink_1", vec![tcp(addr, port)]),
1283                ("sink_2", vec![tcp(addr, port)]),
1284            ];
1285            let conflicting = Resource::conflicts(components);
1286            assert_eq!(
1287                conflicting,
1288                hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1289            );
1290        }
1291
1292        #[test]
1293        fn conflicting_multi(addr: IpAddr, port in specport()) {
1294            let components = vec![
1295                ("sink_0", vec![tcp(addr, 0)]),
1296                ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1297                ("sink_2", vec![tcp(addr, port)]),
1298            ];
1299            let conflicting = Resource::conflicts(components);
1300            assert_eq!(
1301                conflicting,
1302                hashmap(vec![
1303                    (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1304                    (tcp(addr, port), vec!["sink_1", "sink_2"])
1305                ])
1306            );
1307        }
1308
1309        #[test]
1310        fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1311            prop_assume!(addr1 != addr2);
1312            let components = vec![
1313                ("sink_0", vec![tcp(addr1, port)]),
1314                ("sink_1", vec![tcp(addr2, port)]),
1315            ];
1316            let conflicting = Resource::conflicts(components);
1317            assert_eq!(conflicting, HashMap::new());
1318        }
1319
1320        #[test]
1321        fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1322            let components = vec![
1323                ("sink_0", vec![tcp(addr, port)]),
1324                ("sink_1", vec![tcp(unspec, port)]),
1325            ];
1326            let conflicting = Resource::conflicts(components);
1327            assert_eq!(conflicting, HashMap::new());
1328        }
1329
1330        #[test]
1331        fn different_protocol(addr: IpAddr) {
1332            let components = vec![
1333                ("sink_0", vec![tcp(addr, 0)]),
1334                ("sink_1", vec![udp(addr, 0)]),
1335            ];
1336            let conflicting = Resource::conflicts(components);
1337            assert_eq!(conflicting, HashMap::new());
1338        }
1339    }
1340
1341    #[test]
1342    fn different_unspecified_ip_version() {
1343        let components = vec![
1344            ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1345            ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1346        ];
1347        let conflicting = Resource::conflicts(components);
1348        assert_eq!(conflicting, HashMap::new());
1349    }
1350}
1351
1352#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1353mod resource_config_tests {
1354    use indoc::indoc;
1355    use vector_lib::configurable::schema::generate_root_schema;
1356
1357    use super::{Format, load_from_str};
1358
1359    #[test]
1360    fn config_conflict_detected() {
1361        assert!(
1362            load_from_str(
1363                indoc! {r#"
1364                [sources.in0]
1365                  type = "stdin"
1366
1367                [sources.in1]
1368                  type = "stdin"
1369
1370                [sinks.out]
1371                  type = "console"
1372                  inputs = ["in0","in1"]
1373                  encoding.codec = "json"
1374            "#},
1375                Format::Toml,
1376            )
1377            .is_err()
1378        );
1379    }
1380
1381    #[test]
1382    #[ignore]
1383    #[allow(clippy::print_stdout)]
1384    #[allow(clippy::print_stderr)]
1385    fn generate_component_config_schema() {
1386        use indexmap::IndexMap;
1387        use vector_lib::{config::ComponentKey, configurable::configurable_component};
1388
1389        use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1390
1391        /// Top-level Vector configuration.
1392        #[configurable_component]
1393        #[derive(Clone)]
1394        struct ComponentsOnlyConfig {
1395            /// Configured sources.
1396            #[serde(default)]
1397            pub sources: IndexMap<ComponentKey, SourceOuter>,
1398
1399            /// Configured transforms.
1400            #[serde(default)]
1401            pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1402
1403            /// Configured sinks.
1404            #[serde(default)]
1405            pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1406        }
1407
1408        match generate_root_schema::<ComponentsOnlyConfig>() {
1409            Ok(schema) => {
1410                let json = serde_json::to_string_pretty(&schema)
1411                    .expect("rendering root schema to JSON should not fail");
1412
1413                println!("{json}");
1414            }
1415            Err(e) => eprintln!("error while generating schema: {e:?}"),
1416        }
1417    }
1418}