vector/config/
mod.rs

1#![allow(missing_docs)]
2use std::{
3    collections::{HashMap, HashSet},
4    fmt::{self, Display, Formatter},
5    fs,
6    hash::Hash,
7    net::SocketAddr,
8    path::PathBuf,
9    time::Duration,
10};
11
12use indexmap::IndexMap;
13use serde::Serialize;
14use vector_config::configurable_component;
15pub use vector_lib::{
16    config::{
17        AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
18        SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
19    },
20    configurable::component::{GenerateConfig, SinkDescription, TransformDescription},
21};
22
23use crate::{
24    conditions,
25    event::{Metric, Value},
26    secrets::SecretBackends,
27    serde::OneOrMany,
28};
29
30pub mod api;
31mod builder;
32mod cmd;
33mod compiler;
34mod diff;
35pub mod dot_graph;
36mod enrichment_table;
37pub mod format;
38mod graph;
39mod loading;
40pub mod provider;
41pub mod schema;
42mod secret;
43mod sink;
44mod source;
45mod transform;
46pub mod unit_test;
47mod validation;
48mod vars;
49pub mod watcher;
50
51pub use builder::ConfigBuilder;
52pub use cmd::{Opts, cmd};
53pub use diff::ConfigDiff;
54pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
55pub use format::{Format, FormatHint};
56pub use loading::{
57    COLLECTOR, CONFIG_PATHS, load, load_builder_from_paths, load_from_paths,
58    load_from_paths_with_provider_and_secrets, load_from_str, load_from_str_with_secrets,
59    load_source_from_paths, merge_path_lists, process_paths,
60};
61pub use provider::ProviderConfig;
62pub use secret::SecretBackend;
63pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
64pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
65pub use transform::{
66    BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids,
67};
68pub use unit_test::{UnitTestResult, build_unit_tests, build_unit_tests_main};
69pub use validation::warnings;
70pub use vars::{ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX, interpolate};
71pub use vector_lib::{
72    config::{
73        ComponentKey, LogSchema, OutputId, init_log_schema, init_telemetry, log_schema,
74        proxy::ProxyConfig, telemetry,
75    },
76    id::Inputs,
77};
78
79#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80// // This is not a comprehensive set; variants are added as needed.
81pub enum ComponentType {
82    Transform,
83    Sink,
84    EnrichmentTable,
85}
86
87#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
88pub struct ComponentConfig {
89    pub config_paths: Vec<PathBuf>,
90    pub component_key: ComponentKey,
91    pub component_type: ComponentType,
92}
93
94impl ComponentConfig {
95    pub fn new(
96        config_paths: Vec<PathBuf>,
97        component_key: ComponentKey,
98        component_type: ComponentType,
99    ) -> Self {
100        let canonicalized_paths = config_paths
101            .into_iter()
102            .filter_map(|p| fs::canonicalize(p).ok())
103            .collect();
104
105        Self {
106            config_paths: canonicalized_paths,
107            component_key,
108            component_type,
109        }
110    }
111
112    pub fn contains(
113        &self,
114        config_paths: &HashSet<PathBuf>,
115    ) -> Option<(ComponentKey, ComponentType)> {
116        if config_paths.iter().any(|p| self.config_paths.contains(p)) {
117            return Some((self.component_key.clone(), self.component_type.clone()));
118        }
119        None
120    }
121}
122
123#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
124pub enum ConfigPath {
125    File(PathBuf, FormatHint),
126    Dir(PathBuf),
127}
128
129impl<'a> From<&'a ConfigPath> for &'a PathBuf {
130    fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
131        match config_path {
132            ConfigPath::File(path, _) => path,
133            ConfigPath::Dir(path) => path,
134        }
135    }
136}
137
138impl ConfigPath {
139    pub const fn as_dir(&self) -> Option<&PathBuf> {
140        match self {
141            Self::Dir(path) => Some(path),
142            _ => None,
143        }
144    }
145}
146
147#[derive(Debug, Default, Serialize)]
148pub struct Config {
149    #[cfg(feature = "api")]
150    pub api: api::Options,
151    pub schema: schema::Options,
152    pub global: GlobalOptions,
153    pub healthchecks: HealthcheckOptions,
154    sources: IndexMap<ComponentKey, SourceOuter>,
155    sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
156    transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
157    pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
158    tests: Vec<TestDefinition>,
159    secret: IndexMap<ComponentKey, SecretBackends>,
160    pub graceful_shutdown_duration: Option<Duration>,
161}
162
163impl Config {
164    pub fn builder() -> builder::ConfigBuilder {
165        Default::default()
166    }
167
168    pub fn is_empty(&self) -> bool {
169        self.sources.is_empty()
170    }
171
172    pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
173        self.sources.iter()
174    }
175
176    pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
177        self.sources.get(id)
178    }
179
180    pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
181        self.transforms.iter()
182    }
183
184    pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
185        self.transforms.get(id)
186    }
187
188    pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
189        self.sinks.iter()
190    }
191
192    pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
193        self.sinks.get(id)
194    }
195
196    pub fn enrichment_tables(
197        &self,
198    ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
199        self.enrichment_tables.iter()
200    }
201
202    pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
203        self.enrichment_tables.get(id)
204    }
205
206    pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
207        self.transforms
208            .get(id)
209            .map(|t| &t.inputs[..])
210            .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
211            .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
212    }
213
214    pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
215        let inputs: Vec<_> = self
216            .sinks
217            .iter()
218            .filter(|(_, sink)| {
219                sink.inner
220                    .acknowledgements()
221                    .merge_default(&self.global.acknowledgements)
222                    .enabled()
223            })
224            .flat_map(|(name, sink)| {
225                sink.inputs
226                    .iter()
227                    .map(|input| (name.clone(), input.clone()))
228            })
229            .collect();
230        self.propagate_acks_rec(inputs);
231        Ok(())
232    }
233
234    fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
235        for (sink, input) in sink_inputs {
236            let component = &input.component;
237            if let Some(source) = self.sources.get_mut(component) {
238                if source.inner.can_acknowledge() {
239                    source.sink_acknowledgements = true;
240                } else {
241                    warn!(
242                        message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
243                        source = component.id(),
244                        sink = sink.id(),
245                    );
246                }
247            } else if let Some(transform) = self.transforms.get(component) {
248                let inputs = transform
249                    .inputs
250                    .iter()
251                    .map(|input| (sink.clone(), input.clone()))
252                    .collect();
253                self.propagate_acks_rec(inputs);
254            }
255        }
256    }
257
258    pub fn transform_keys_with_external_files(&self) -> HashSet<ComponentKey> {
259        self.transforms
260            .iter()
261            .filter_map(|(name, transform_outer)| {
262                if !transform_outer.inner.files_to_watch().is_empty() {
263                    Some(name.clone())
264                } else {
265                    None
266                }
267            })
268            .collect()
269    }
270}
271
272/// Healthcheck options.
273#[configurable_component]
274#[derive(Clone, Copy, Debug)]
275#[serde(default)]
276pub struct HealthcheckOptions {
277    /// Whether or not healthchecks are enabled for all sinks.
278    ///
279    /// Can be overridden on a per-sink basis.
280    pub enabled: bool,
281
282    /// Whether or not to require a sink to report as being healthy during startup.
283    ///
284    /// When enabled and a sink reports not being healthy, Vector will exit during start-up.
285    ///
286    /// Can be alternatively set, and overridden by, the `--require-healthy` command-line flag.
287    pub require_healthy: bool,
288}
289
290impl HealthcheckOptions {
291    pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
292        if let Some(require_healthy) = require_healthy.into() {
293            self.require_healthy = require_healthy;
294        }
295    }
296
297    const fn merge(&mut self, other: Self) {
298        self.enabled &= other.enabled;
299        self.require_healthy |= other.require_healthy;
300    }
301}
302
303impl Default for HealthcheckOptions {
304    fn default() -> Self {
305        Self {
306            enabled: true,
307            require_healthy: false,
308        }
309    }
310}
311
312/// Unique thing, like port, of which only one owner can be.
313#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
314pub enum Resource {
315    Port(SocketAddr, Protocol),
316    SystemFdOffset(usize),
317    Fd(u32),
318    DiskBuffer(String),
319}
320
321#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
322pub enum Protocol {
323    Tcp,
324    Udp,
325}
326
327impl Resource {
328    pub const fn tcp(addr: SocketAddr) -> Self {
329        Self::Port(addr, Protocol::Tcp)
330    }
331
332    pub const fn udp(addr: SocketAddr) -> Self {
333        Self::Port(addr, Protocol::Udp)
334    }
335
336    /// From given components returns all that have a resource conflict with any other component.
337    pub fn conflicts<K: Eq + Hash + Clone>(
338        components: impl IntoIterator<Item = (K, Vec<Resource>)>,
339    ) -> HashMap<Resource, HashSet<K>> {
340        let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
341        let mut unspecified = Vec::new();
342
343        // Find equality based conflicts
344        for (key, resources) in components {
345            for resource in resources {
346                if let Resource::Port(address, protocol) = &resource
347                    && address.ip().is_unspecified()
348                {
349                    unspecified.push((key.clone(), *address, *protocol));
350                }
351
352                resource_map
353                    .entry(resource)
354                    .or_default()
355                    .insert(key.clone());
356            }
357        }
358
359        // Port with unspecified address will bind to all network interfaces
360        // so we have to check for all Port resources if they share the same
361        // port.
362        for (key, address0, protocol0) in unspecified {
363            for (resource, components) in resource_map.iter_mut() {
364                if let Resource::Port(address, protocol) = resource {
365                    // IP addresses can either be v4 or v6.
366                    // Therefore we check if the ip version matches, the port matches and if the protocol (TCP/UDP) matches
367                    // when checking for equality.
368                    if &address0 == address && &protocol0 == protocol {
369                        components.insert(key.clone());
370                    }
371                }
372            }
373        }
374
375        resource_map.retain(|_, components| components.len() > 1);
376
377        resource_map
378    }
379}
380
381impl Display for Protocol {
382    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
383        match self {
384            Protocol::Udp => write!(fmt, "udp"),
385            Protocol::Tcp => write!(fmt, "tcp"),
386        }
387    }
388}
389
390impl Display for Resource {
391    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
392        match self {
393            Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
394            Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
395            Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
396            Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
397        }
398    }
399}
400
401/// A unit test definition.
402#[configurable_component]
403#[derive(Clone, Debug)]
404#[serde(deny_unknown_fields)]
405pub struct TestDefinition<T: 'static = OutputId> {
406    /// The name of the unit test.
407    pub name: String,
408
409    /// An input event to test against.
410    pub input: Option<TestInput>,
411
412    /// A set of input events to test against.
413    #[serde(default)]
414    pub inputs: Vec<TestInput>,
415
416    /// A set of expected output events after the test has run.
417    #[serde(default)]
418    pub outputs: Vec<TestOutput<T>>,
419
420    /// A set of component outputs that should not have emitted any events.
421    #[serde(default)]
422    pub no_outputs_from: Vec<T>,
423}
424
425impl TestDefinition<String> {
426    fn resolve_outputs(
427        self,
428        graph: &graph::Graph,
429    ) -> Result<TestDefinition<OutputId>, Vec<String>> {
430        let TestDefinition {
431            name,
432            input,
433            inputs,
434            outputs,
435            no_outputs_from,
436        } = self;
437        let mut errors = Vec::new();
438
439        let output_map = graph.input_map().expect("ambiguous outputs");
440
441        let outputs = outputs
442            .into_iter()
443            .map(|old| {
444                let TestOutput {
445                    extract_from,
446                    conditions,
447                } = old;
448
449                (extract_from.to_vec(), conditions)
450            })
451            .filter_map(|(extract_from, conditions)| {
452                let mut outputs = Vec::new();
453                for from in extract_from {
454                    if no_outputs_from.contains(&from) {
455                        errors.push(format!(
456                            r#"Invalid extract_from target in test '{name}': '{from}' listed in no_outputs_from"#
457                        ));
458                    } else if let Some(output_id) = output_map.get(&from) {
459                        outputs.push(output_id.clone());
460                    } else {
461                        errors.push(format!(
462                            r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
463                        ));
464                    }
465                }
466                if outputs.is_empty() {
467                    None
468                } else {
469                    Some(TestOutput {
470                        extract_from: outputs.into(),
471                        conditions,
472                    })
473                }
474            })
475            .collect();
476
477        let no_outputs_from = no_outputs_from
478            .into_iter()
479            .filter_map(|o| {
480                if let Some(output_id) = output_map.get(&o) {
481                    Some(output_id.clone())
482                } else {
483                    errors.push(format!(
484                        r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
485                    ));
486                    None
487                }
488            })
489            .collect();
490
491        if errors.is_empty() {
492            Ok(TestDefinition {
493                name,
494                input,
495                inputs,
496                outputs,
497                no_outputs_from,
498            })
499        } else {
500            Err(errors)
501        }
502    }
503}
504
505impl TestDefinition<OutputId> {
506    fn stringify(self) -> TestDefinition<String> {
507        let TestDefinition {
508            name,
509            input,
510            inputs,
511            outputs,
512            no_outputs_from,
513        } = self;
514
515        let outputs = outputs
516            .into_iter()
517            .map(|old| TestOutput {
518                extract_from: old
519                    .extract_from
520                    .to_vec()
521                    .into_iter()
522                    .map(|item| item.to_string())
523                    .collect::<Vec<_>>()
524                    .into(),
525                conditions: old.conditions,
526            })
527            .collect();
528
529        let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
530
531        TestDefinition {
532            name,
533            input,
534            inputs,
535            outputs,
536            no_outputs_from,
537        }
538    }
539}
540
541/// A unit test input.
542///
543/// An input describes not only the type of event to insert, but also which transform within the
544/// configuration to insert it to.
545#[configurable_component]
546#[derive(Clone, Debug)]
547#[serde(deny_unknown_fields)]
548pub struct TestInput {
549    /// The name of the transform to insert the input event to.
550    pub insert_at: ComponentKey,
551
552    /// The type of the input event.
553    ///
554    /// Can be either `raw`, `vrl`, `log`, or `metric.
555    #[serde(default = "default_test_input_type", rename = "type")]
556    pub type_str: String,
557
558    /// The raw string value to use as the input event.
559    ///
560    /// Use this only when the input event should be a raw event (i.e. unprocessed/undecoded log
561    /// event) and when the input type is set to `raw`.
562    pub value: Option<String>,
563
564    /// The vrl expression to generate the input event.
565    ///
566    /// Only relevant when `type` is `vrl`.
567    pub source: Option<String>,
568
569    /// The set of log fields to use when creating a log input event.
570    ///
571    /// Only relevant when `type` is `log`.
572    pub log_fields: Option<IndexMap<String, Value>>,
573
574    /// The metric to use as an input event.
575    ///
576    /// Only relevant when `type` is `metric`.
577    pub metric: Option<Metric>,
578}
579
580fn default_test_input_type() -> String {
581    "raw".to_string()
582}
583
584/// A unit test output.
585///
586/// An output describes what we expect a transform to emit when fed a certain event, or events, when
587/// running a unit test.
588#[configurable_component]
589#[derive(Clone, Debug)]
590#[serde(deny_unknown_fields)]
591pub struct TestOutput<T: 'static = OutputId> {
592    /// The transform outputs to extract events from.
593    pub extract_from: OneOrMany<T>,
594
595    /// The conditions to run against the output to validate that they were transformed as expected.
596    pub conditions: Option<Vec<conditions::AnyCondition>>,
597}
598
599#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
600mod tests {
601    use std::path::PathBuf;
602
603    use indoc::indoc;
604
605    use super::{ComponentKey, ConfigDiff, Format, builder::ConfigBuilder, format, load_from_str};
606    use crate::{config, topology::builder::TopologyPiecesBuilder};
607
608    async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
609        match config::load_from_str(config, format) {
610            Ok(c) => {
611                let diff = ConfigDiff::initial(&c);
612                let c2 = config::load_from_str(config, format).unwrap();
613                match (
614                    config::warnings(&c2),
615                    TopologyPiecesBuilder::new(&c, &diff).build().await,
616                ) {
617                    (warnings, Ok(_pieces)) => Ok(warnings),
618                    (_, Err(errors)) => Err(errors),
619                }
620            }
621            Err(error) => Err(error),
622        }
623    }
624
625    #[tokio::test]
626    async fn bad_inputs() {
627        let err = load(
628            r#"
629            [sources.in]
630            type = "test_basic"
631
632            [transforms.sample]
633            type = "test_basic"
634            inputs = []
635            suffix = "foo"
636            increase = 1.25
637
638            [transforms.sample2]
639            type = "test_basic"
640            inputs = ["qwerty"]
641            suffix = "foo"
642            increase = 1.25
643
644            [sinks.out]
645            type = "test_basic"
646            inputs = ["asdf", "in", "in"]
647            "#,
648            Format::Toml,
649        )
650        .await
651        .unwrap_err();
652
653        assert_eq!(
654            vec![
655                "Sink \"out\" has input \"in\" duplicated 2 times",
656                "Transform \"sample\" has no inputs",
657                "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
658                "Input \"asdf\" for sink \"out\" doesn't match any components.",
659            ],
660            err,
661        );
662    }
663
664    #[tokio::test]
665    async fn duplicate_name() {
666        let err = load(
667            r#"
668            [sources.foo]
669            type = "test_basic"
670
671            [sources.bar]
672            type = "test_basic"
673
674            [transforms.foo]
675            type = "test_basic"
676            inputs = ["bar"]
677            suffix = "foo"
678            increase = 1.25
679
680            [sinks.out]
681            type = "test_basic"
682            inputs = ["foo"]
683            "#,
684            Format::Toml,
685        )
686        .await
687        .unwrap_err();
688
689        assert_eq!(
690            err,
691            vec!["More than one component with name \"foo\" (source, transform).",]
692        );
693    }
694
695    #[tokio::test]
696    #[cfg(unix)]
697    async fn conflicting_stdin_and_fd_resources() {
698        let errors = load(
699            r#"
700            [sources.stdin]
701            type = "stdin"
702
703            [sources.file_descriptor]
704            type = "file_descriptor"
705            fd = 0
706
707            [sinks.out]
708            type = "test_basic"
709            inputs = ["stdin", "file_descriptor"]
710            "#,
711            Format::Toml,
712        )
713        .await
714        .unwrap_err();
715
716        assert_eq!(errors.len(), 1);
717        let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
718        assert!(errors[0].starts_with(expected_prefix));
719    }
720
721    #[tokio::test]
722    #[cfg(unix)]
723    async fn conflicting_fd_resources() {
724        let errors = load(
725            r#"
726            [sources.file_descriptor1]
727            type = "file_descriptor"
728            fd = 10
729            [sources.file_descriptor2]
730            type = "file_descriptor"
731            fd = 10
732            [sinks.out]
733            type = "test_basic"
734            inputs = ["file_descriptor1", "file_descriptor2"]
735            "#,
736            Format::Toml,
737        )
738        .await
739        .unwrap_err();
740
741        assert_eq!(errors.len(), 1);
742        let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
743        assert!(errors[0].starts_with(expected_prefix));
744    }
745
746    #[tokio::test]
747    #[cfg(all(unix, feature = "sources-file_descriptor"))]
748    async fn no_conflict_fd_resources() {
749        use crate::sources::file_descriptors::file_descriptor::null_fd;
750        let fd1 = null_fd().unwrap();
751        let fd2 = null_fd().unwrap();
752        let result = load(
753            &format!(
754                r#"
755            [sources.file_descriptor1]
756            type = "file_descriptor"
757            fd = {fd1}
758
759            [sources.file_descriptor2]
760            type = "file_descriptor"
761            fd = {fd2}
762
763            [sinks.out]
764            type = "test_basic"
765            inputs = ["file_descriptor1", "file_descriptor2"]
766            "#
767            ),
768            Format::Toml,
769        )
770        .await;
771
772        let expected = Ok(vec![]);
773        assert_eq!(result, expected);
774    }
775
776    #[tokio::test]
777    async fn warnings() {
778        let warnings = load(
779            r#"
780            [sources.in1]
781            type = "test_basic"
782
783            [sources.in2]
784            type = "test_basic"
785
786            [transforms.sample1]
787            type = "test_basic"
788            inputs = ["in1"]
789            suffix = "foo"
790            increase = 1.25
791
792            [transforms.sample2]
793            type = "test_basic"
794            inputs = ["in1"]
795            suffix = "foo"
796            increase = 1.25
797
798            [sinks.out]
799            type = "test_basic"
800            inputs = ["sample1"]
801            "#,
802            Format::Toml,
803        )
804        .await
805        .unwrap();
806
807        assert_eq!(
808            warnings,
809            vec![
810                "Transform \"sample2\" has no consumers",
811                "Source \"in2\" has no consumers",
812            ]
813        )
814    }
815
816    #[tokio::test]
817    async fn cycle() {
818        let errors = load(
819            r#"
820            [sources.in]
821            type = "test_basic"
822
823            [transforms.one]
824            type = "test_basic"
825            inputs = ["in"]
826            suffix = "foo"
827            increase = 1.25
828
829            [transforms.two]
830            type = "test_basic"
831            inputs = ["one", "four"]
832            suffix = "foo"
833            increase = 1.25
834
835            [transforms.three]
836            type = "test_basic"
837            inputs = ["two"]
838            suffix = "foo"
839            increase = 1.25
840
841            [transforms.four]
842            type = "test_basic"
843            inputs = ["three"]
844            suffix = "foo"
845            increase = 1.25
846
847            [sinks.out]
848            type = "test_basic"
849            inputs = ["four"]
850            "#,
851            Format::Toml,
852        )
853        .await
854        .unwrap_err();
855
856        assert_eq!(
857            errors,
858            vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
859        )
860    }
861
862    #[test]
863    fn default_data_dir() {
864        let config = load_from_str(
865            indoc! {r#"
866                [sources.in]
867                type = "test_basic"
868
869                [sinks.out]
870                type = "test_basic"
871                inputs = ["in"]
872            "#},
873            Format::Toml,
874        )
875        .unwrap();
876
877        assert_eq!(
878            Some(PathBuf::from("/var/lib/vector")),
879            config.global.data_dir
880        )
881    }
882
883    #[test]
884    fn default_schema() {
885        let config = load_from_str(
886            indoc! {r#"
887            [sources.in]
888            type = "test_basic"
889
890            [sinks.out]
891            type = "test_basic"
892            inputs = ["in"]
893            "#},
894            Format::Toml,
895        )
896        .unwrap();
897
898        assert_eq!(
899            "host",
900            config.global.log_schema.host_key().unwrap().to_string()
901        );
902        assert_eq!(
903            "message",
904            config.global.log_schema.message_key().unwrap().to_string()
905        );
906        assert_eq!(
907            "timestamp",
908            config
909                .global
910                .log_schema
911                .timestamp_key()
912                .unwrap()
913                .to_string()
914        );
915    }
916
917    #[test]
918    fn custom_schema() {
919        let config = load_from_str(
920            indoc! {r#"
921                [log_schema]
922                  host_key = "this"
923                  message_key = "that"
924                  timestamp_key = "then"
925
926                [sources.in]
927                  type = "test_basic"
928
929                [sinks.out]
930                  type = "test_basic"
931                  inputs = ["in"]
932            "#},
933            Format::Toml,
934        )
935        .unwrap();
936
937        assert_eq!(
938            "this",
939            config.global.log_schema.host_key().unwrap().to_string()
940        );
941        assert_eq!(
942            "that",
943            config.global.log_schema.message_key().unwrap().to_string()
944        );
945        assert_eq!(
946            "then",
947            config
948                .global
949                .log_schema
950                .timestamp_key()
951                .unwrap()
952                .to_string()
953        );
954    }
955
956    #[test]
957    fn config_append() {
958        let mut config: ConfigBuilder = format::deserialize(
959            indoc! {r#"
960                [sources.in]
961                  type = "test_basic"
962
963                [sinks.out]
964                  type = "test_basic"
965                  inputs = ["in"]
966            "#},
967            Format::Toml,
968        )
969        .unwrap();
970
971        assert_eq!(
972            config.append(
973                format::deserialize(
974                    indoc! {r#"
975                        data_dir = "/foobar"
976
977                        [proxy]
978                          http = "http://proxy.inc:3128"
979
980                        [transforms.foo]
981                          type = "test_basic"
982                          inputs = [ "in" ]
983                          suffix = "foo"
984                          increase = 1.25
985
986                        [[tests]]
987                          name = "check_simple_log"
988                          [tests.input]
989                            insert_at = "foo"
990                            type = "raw"
991                            value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
992                          [[tests.outputs]]
993                            extract_from = "foo"
994                            [[tests.outputs.conditions]]
995                              type = "vrl"
996                              source = ".message == \"Sorry, I'm busy this week Cecil\""
997                    "#},
998                    Format::Toml,
999                )
1000                .unwrap()
1001            ),
1002            Ok(())
1003        );
1004
1005        assert!(config.global.proxy.http.is_some());
1006        assert!(config.global.proxy.https.is_none());
1007        assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
1008        assert!(config.sources.contains_key(&ComponentKey::from("in")));
1009        assert!(config.sinks.contains_key(&ComponentKey::from("out")));
1010        assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
1011        assert_eq!(config.tests.len(), 1);
1012    }
1013
1014    #[test]
1015    fn config_append_collisions() {
1016        let mut config: ConfigBuilder = format::deserialize(
1017            indoc! {r#"
1018                [sources.in]
1019                  type = "test_basic"
1020
1021                [sinks.out]
1022                  type = "test_basic"
1023                  inputs = ["in"]
1024            "#},
1025            Format::Toml,
1026        )
1027        .unwrap();
1028
1029        assert_eq!(
1030            config.append(
1031                format::deserialize(
1032                    indoc! {r#"
1033                        [sources.in]
1034                          type = "test_basic"
1035
1036                        [transforms.foo]
1037                          type = "test_basic"
1038                          inputs = [ "in" ]
1039                          suffix = "foo"
1040                          increase = 1.25
1041
1042                        [sinks.out]
1043                          type = "test_basic"
1044                          inputs = ["in"]
1045                    "#},
1046                    Format::Toml,
1047                )
1048                .unwrap()
1049            ),
1050            Err(vec![
1051                "duplicate source id found: in".into(),
1052                "duplicate sink id found: out".into(),
1053            ])
1054        );
1055    }
1056
1057    #[test]
1058    fn with_proxy() {
1059        let config: ConfigBuilder = format::deserialize(
1060            indoc! {r#"
1061                [proxy]
1062                  http = "http://server:3128"
1063                  https = "http://other:3128"
1064                  no_proxy = ["localhost", "127.0.0.1"]
1065
1066                [sources.in]
1067                  type = "nginx_metrics"
1068                  endpoints = ["http://localhost:8000/basic_status"]
1069                  proxy.http = "http://server:3128"
1070                  proxy.https = "http://other:3128"
1071                  proxy.no_proxy = ["localhost", "127.0.0.1"]
1072
1073                [sinks.out]
1074                  type = "console"
1075                  inputs = ["in"]
1076                  encoding.codec = "json"
1077            "#},
1078            Format::Toml,
1079        )
1080        .unwrap();
1081        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1082        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1083        assert!(config.global.proxy.no_proxy.matches("localhost"));
1084        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1085        assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1086        assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1087        assert!(source.proxy.no_proxy.matches("localhost"));
1088    }
1089
1090    #[test]
1091    fn with_partial_global_proxy() {
1092        let config: ConfigBuilder = format::deserialize(
1093            indoc! {r#"
1094                [proxy]
1095                  http = "http://server:3128"
1096
1097                [sources.in]
1098                  type = "nginx_metrics"
1099                  endpoints = ["http://localhost:8000/basic_status"]
1100
1101                [sources.in.proxy]
1102                  http = "http://server:3129"
1103                  https = "http://other:3129"
1104                  no_proxy = ["localhost", "127.0.0.1"]
1105
1106                [sinks.out]
1107                  type = "console"
1108                  inputs = ["in"]
1109                  encoding.codec = "json"
1110            "#},
1111            Format::Toml,
1112        )
1113        .unwrap();
1114        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1115        assert_eq!(config.global.proxy.https, None);
1116        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1117        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1118        assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1119        assert!(source.proxy.no_proxy.matches("localhost"));
1120    }
1121
1122    #[test]
1123    fn with_partial_source_proxy() {
1124        let config: ConfigBuilder = format::deserialize(
1125            indoc! {r#"
1126                [proxy]
1127                  http = "http://server:3128"
1128                  https = "http://other:3128"
1129
1130                [sources.in]
1131                  type = "nginx_metrics"
1132                  endpoints = ["http://localhost:8000/basic_status"]
1133
1134                [sources.in.proxy]
1135                  http = "http://server:3129"
1136                  no_proxy = ["localhost", "127.0.0.1"]
1137
1138                [sinks.out]
1139                  type = "console"
1140                  inputs = ["in"]
1141                  encoding.codec = "json"
1142            "#},
1143            Format::Toml,
1144        )
1145        .unwrap();
1146        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1147        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1148        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1149        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1150        assert_eq!(source.proxy.https, None);
1151        assert!(source.proxy.no_proxy.matches("localhost"));
1152    }
1153}
1154
1155#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1156mod acknowledgements_tests {
1157    use indoc::indoc;
1158
1159    use super::*;
1160
1161    #[test]
1162    fn propagates_settings() {
1163        // The topology:
1164        // in1 => out1
1165        // in2 => out2 (acks enabled)
1166        // in3 => parse3 => out3 (acks enabled)
1167        let config: ConfigBuilder = format::deserialize(
1168            indoc! {r#"
1169                data_dir = "/tmp"
1170                [sources.in1]
1171                    type = "file"
1172                    include = ["/var/log/**/*.log"]
1173                [sources.in2]
1174                    type = "file"
1175                    include = ["/var/log/**/*.log"]
1176                [sources.in3]
1177                    type = "file"
1178                    include = ["/var/log/**/*.log"]
1179                [transforms.parse3]
1180                    type = "test_basic"
1181                    inputs = ["in3"]
1182                    increase = 0.0
1183                    suffix = ""
1184                [sinks.out1]
1185                    type = "file"
1186                    inputs = ["in1"]
1187                    encoding.codec = "text"
1188                    path = "/path/to/out1"
1189                [sinks.out2]
1190                    type = "file"
1191                    inputs = ["in2"]
1192                    encoding.codec = "text"
1193                    path = "/path/to/out2"
1194                    acknowledgements = true
1195                [sinks.out3]
1196                    type = "file"
1197                    inputs = ["parse3"]
1198                    encoding.codec = "text"
1199                    path = "/path/to/out3"
1200                    acknowledgements.enabled = true
1201            "#},
1202            Format::Toml,
1203        )
1204        .unwrap();
1205
1206        for source in config.sources.values() {
1207            assert!(
1208                !source.sink_acknowledgements,
1209                "Source `sink_acknowledgements` should be `false` before propagation"
1210            );
1211        }
1212
1213        let config = config.build().unwrap();
1214
1215        let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1216        assert!(!get("in1").sink_acknowledgements);
1217        assert!(get("in2").sink_acknowledgements);
1218        assert!(get("in3").sink_acknowledgements);
1219    }
1220}
1221
1222#[cfg(test)]
1223mod resource_tests {
1224    use std::{
1225        collections::{HashMap, HashSet},
1226        net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1227    };
1228
1229    use proptest::prelude::*;
1230
1231    use super::Resource;
1232
1233    fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1234        Resource::tcp(SocketAddr::new(addr.into(), port))
1235    }
1236
1237    fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1238        Resource::udp(SocketAddr::new(addr.into(), port))
1239    }
1240
1241    fn unspecified() -> impl Strategy<Value = IpAddr> {
1242        prop_oneof![
1243            Just(Ipv4Addr::UNSPECIFIED.into()),
1244            Just(Ipv6Addr::UNSPECIFIED.into()),
1245        ]
1246    }
1247
1248    fn specaddr() -> impl Strategy<Value = IpAddr> {
1249        any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1250    }
1251
1252    fn specport() -> impl Strategy<Value = u16> {
1253        any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1254    }
1255
1256    fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1257        conflicts
1258            .into_iter()
1259            .map(|(key, values)| (key, values.into_iter().collect()))
1260            .collect()
1261    }
1262
1263    proptest! {
1264        #[test]
1265        fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1266            prop_assume!(port1 != port2);
1267            let components = vec![
1268                ("sink_0", vec![tcp(addr, 0)]),
1269                ("sink_1", vec![tcp(addr, port1)]),
1270                ("sink_2", vec![tcp(addr, port2)]),
1271            ];
1272            let conflicting = Resource::conflicts(components);
1273            assert_eq!(conflicting, HashMap::new());
1274        }
1275
1276        #[test]
1277        fn conflicting_pair(addr: IpAddr, port in specport()) {
1278            let components = vec![
1279                ("sink_0", vec![tcp(addr, 0)]),
1280                ("sink_1", vec![tcp(addr, port)]),
1281                ("sink_2", vec![tcp(addr, port)]),
1282            ];
1283            let conflicting = Resource::conflicts(components);
1284            assert_eq!(
1285                conflicting,
1286                hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1287            );
1288        }
1289
1290        #[test]
1291        fn conflicting_multi(addr: IpAddr, port in specport()) {
1292            let components = vec![
1293                ("sink_0", vec![tcp(addr, 0)]),
1294                ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1295                ("sink_2", vec![tcp(addr, port)]),
1296            ];
1297            let conflicting = Resource::conflicts(components);
1298            assert_eq!(
1299                conflicting,
1300                hashmap(vec![
1301                    (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1302                    (tcp(addr, port), vec!["sink_1", "sink_2"])
1303                ])
1304            );
1305        }
1306
1307        #[test]
1308        fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1309            prop_assume!(addr1 != addr2);
1310            let components = vec![
1311                ("sink_0", vec![tcp(addr1, port)]),
1312                ("sink_1", vec![tcp(addr2, port)]),
1313            ];
1314            let conflicting = Resource::conflicts(components);
1315            assert_eq!(conflicting, HashMap::new());
1316        }
1317
1318        #[test]
1319        fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1320            let components = vec![
1321                ("sink_0", vec![tcp(addr, port)]),
1322                ("sink_1", vec![tcp(unspec, port)]),
1323            ];
1324            let conflicting = Resource::conflicts(components);
1325            assert_eq!(conflicting, HashMap::new());
1326        }
1327
1328        #[test]
1329        fn different_protocol(addr: IpAddr) {
1330            let components = vec![
1331                ("sink_0", vec![tcp(addr, 0)]),
1332                ("sink_1", vec![udp(addr, 0)]),
1333            ];
1334            let conflicting = Resource::conflicts(components);
1335            assert_eq!(conflicting, HashMap::new());
1336        }
1337    }
1338
1339    #[test]
1340    fn different_unspecified_ip_version() {
1341        let components = vec![
1342            ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1343            ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1344        ];
1345        let conflicting = Resource::conflicts(components);
1346        assert_eq!(conflicting, HashMap::new());
1347    }
1348}
1349
1350#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1351mod resource_config_tests {
1352    use indoc::indoc;
1353    use vector_lib::configurable::schema::generate_root_schema;
1354
1355    use super::{Format, load_from_str};
1356
1357    #[test]
1358    fn config_conflict_detected() {
1359        assert!(
1360            load_from_str(
1361                indoc! {r#"
1362                [sources.in0]
1363                  type = "stdin"
1364
1365                [sources.in1]
1366                  type = "stdin"
1367
1368                [sinks.out]
1369                  type = "console"
1370                  inputs = ["in0","in1"]
1371                  encoding.codec = "json"
1372            "#},
1373                Format::Toml,
1374            )
1375            .is_err()
1376        );
1377    }
1378
1379    #[test]
1380    #[ignore]
1381    #[allow(clippy::print_stdout)]
1382    #[allow(clippy::print_stderr)]
1383    fn generate_component_config_schema() {
1384        use indexmap::IndexMap;
1385        use vector_lib::{config::ComponentKey, configurable::configurable_component};
1386
1387        use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1388
1389        /// Top-level Vector configuration.
1390        #[configurable_component]
1391        #[derive(Clone)]
1392        struct ComponentsOnlyConfig {
1393            /// Configured sources.
1394            #[serde(default)]
1395            pub sources: IndexMap<ComponentKey, SourceOuter>,
1396
1397            /// Configured transforms.
1398            #[serde(default)]
1399            pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1400
1401            /// Configured sinks.
1402            #[serde(default)]
1403            pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1404        }
1405
1406        match generate_root_schema::<ComponentsOnlyConfig>() {
1407            Ok(schema) => {
1408                let json = serde_json::to_string_pretty(&schema)
1409                    .expect("rendering root schema to JSON should not fail");
1410
1411                println!("{json}");
1412            }
1413            Err(e) => eprintln!("error while generating schema: {e:?}"),
1414        }
1415    }
1416}