1#![allow(missing_docs)]
2use std::{
3 collections::{HashMap, HashSet},
4 fmt::{self, Display, Formatter},
5 fs,
6 hash::Hash,
7 net::SocketAddr,
8 path::PathBuf,
9 time::Duration,
10};
11
12use indexmap::IndexMap;
13use serde::Serialize;
14use vector_config::configurable_component;
15pub use vector_lib::{
16 config::{
17 AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
18 SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
19 },
20 configurable::component::{GenerateConfig, SinkDescription, TransformDescription},
21};
22
23use crate::{
24 conditions,
25 event::{Metric, Value},
26 secrets::SecretBackends,
27 serde::OneOrMany,
28};
29
30pub mod api;
31mod builder;
32mod cmd;
33mod compiler;
34mod diff;
35pub mod dot_graph;
36mod enrichment_table;
37pub mod format;
38mod graph;
39mod loading;
40pub mod provider;
41pub mod schema;
42mod secret;
43mod sink;
44mod source;
45mod transform;
46pub mod unit_test;
47mod validation;
48mod vars;
49pub mod watcher;
50
51pub use builder::ConfigBuilder;
52pub use cmd::{Opts, cmd};
53pub use diff::ConfigDiff;
54pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
55pub use format::{Format, FormatHint};
56pub use loading::{
57 COLLECTOR, CONFIG_PATHS, load, load_builder_from_paths, load_from_paths,
58 load_from_paths_with_provider_and_secrets, load_from_str, load_source_from_paths,
59 merge_path_lists, process_paths,
60};
61pub use provider::ProviderConfig;
62pub use secret::SecretBackend;
63pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
64pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
65pub use transform::{
66 BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids,
67};
68pub use unit_test::{UnitTestResult, build_unit_tests, build_unit_tests_main};
69pub use validation::warnings;
70pub use vars::{ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX, interpolate};
71pub use vector_lib::{
72 config::{
73 ComponentKey, LogSchema, OutputId, init_log_schema, init_telemetry, log_schema,
74 proxy::ProxyConfig, telemetry,
75 },
76 id::Inputs,
77};
78
79#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80pub enum ComponentType {
82 Transform,
83 Sink,
84 EnrichmentTable,
85}
86
87#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
88pub struct ComponentConfig {
89 pub config_paths: Vec<PathBuf>,
90 pub component_key: ComponentKey,
91 pub component_type: ComponentType,
92}
93
94impl ComponentConfig {
95 pub fn new(
96 config_paths: Vec<PathBuf>,
97 component_key: ComponentKey,
98 component_type: ComponentType,
99 ) -> Self {
100 let canonicalized_paths = config_paths
101 .into_iter()
102 .filter_map(|p| fs::canonicalize(p).ok())
103 .collect();
104
105 Self {
106 config_paths: canonicalized_paths,
107 component_key,
108 component_type,
109 }
110 }
111
112 pub fn contains(&self, config_paths: &[PathBuf]) -> Option<(ComponentKey, ComponentType)> {
113 if config_paths.iter().any(|p| self.config_paths.contains(p)) {
114 return Some((self.component_key.clone(), self.component_type.clone()));
115 }
116 None
117 }
118}
119
120#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
121pub enum ConfigPath {
122 File(PathBuf, FormatHint),
123 Dir(PathBuf),
124}
125
126impl<'a> From<&'a ConfigPath> for &'a PathBuf {
127 fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
128 match config_path {
129 ConfigPath::File(path, _) => path,
130 ConfigPath::Dir(path) => path,
131 }
132 }
133}
134
135impl ConfigPath {
136 pub const fn as_dir(&self) -> Option<&PathBuf> {
137 match self {
138 Self::Dir(path) => Some(path),
139 _ => None,
140 }
141 }
142}
143
144#[derive(Debug, Default, Serialize)]
145pub struct Config {
146 #[cfg(feature = "api")]
147 pub api: api::Options,
148 pub schema: schema::Options,
149 pub global: GlobalOptions,
150 pub healthchecks: HealthcheckOptions,
151 sources: IndexMap<ComponentKey, SourceOuter>,
152 sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
153 transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
154 pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
155 tests: Vec<TestDefinition>,
156 secret: IndexMap<ComponentKey, SecretBackends>,
157 pub graceful_shutdown_duration: Option<Duration>,
158}
159
160impl Config {
161 pub fn builder() -> builder::ConfigBuilder {
162 Default::default()
163 }
164
165 pub fn is_empty(&self) -> bool {
166 self.sources.is_empty()
167 }
168
169 pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
170 self.sources.iter()
171 }
172
173 pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
174 self.sources.get(id)
175 }
176
177 pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
178 self.transforms.iter()
179 }
180
181 pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
182 self.transforms.get(id)
183 }
184
185 pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
186 self.sinks.iter()
187 }
188
189 pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
190 self.sinks.get(id)
191 }
192
193 pub fn enrichment_tables(
194 &self,
195 ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
196 self.enrichment_tables.iter()
197 }
198
199 pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
200 self.enrichment_tables.get(id)
201 }
202
203 pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
204 self.transforms
205 .get(id)
206 .map(|t| &t.inputs[..])
207 .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
208 .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
209 }
210
211 pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
212 let inputs: Vec<_> = self
213 .sinks
214 .iter()
215 .filter(|(_, sink)| {
216 sink.inner
217 .acknowledgements()
218 .merge_default(&self.global.acknowledgements)
219 .enabled()
220 })
221 .flat_map(|(name, sink)| {
222 sink.inputs
223 .iter()
224 .map(|input| (name.clone(), input.clone()))
225 })
226 .collect();
227 self.propagate_acks_rec(inputs);
228 Ok(())
229 }
230
231 fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
232 for (sink, input) in sink_inputs {
233 let component = &input.component;
234 if let Some(source) = self.sources.get_mut(component) {
235 if source.inner.can_acknowledge() {
236 source.sink_acknowledgements = true;
237 } else {
238 warn!(
239 message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
240 source = component.id(),
241 sink = sink.id(),
242 );
243 }
244 } else if let Some(transform) = self.transforms.get(component) {
245 let inputs = transform
246 .inputs
247 .iter()
248 .map(|input| (sink.clone(), input.clone()))
249 .collect();
250 self.propagate_acks_rec(inputs);
251 }
252 }
253 }
254}
255
256#[configurable_component]
258#[derive(Clone, Copy, Debug)]
259#[serde(default)]
260pub struct HealthcheckOptions {
261 pub enabled: bool,
265
266 pub require_healthy: bool,
272}
273
274impl HealthcheckOptions {
275 pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
276 if let Some(require_healthy) = require_healthy.into() {
277 self.require_healthy = require_healthy;
278 }
279 }
280
281 const fn merge(&mut self, other: Self) {
282 self.enabled &= other.enabled;
283 self.require_healthy |= other.require_healthy;
284 }
285}
286
287impl Default for HealthcheckOptions {
288 fn default() -> Self {
289 Self {
290 enabled: true,
291 require_healthy: false,
292 }
293 }
294}
295
296#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
298pub enum Resource {
299 Port(SocketAddr, Protocol),
300 SystemFdOffset(usize),
301 Fd(u32),
302 DiskBuffer(String),
303}
304
305#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
306pub enum Protocol {
307 Tcp,
308 Udp,
309}
310
311impl Resource {
312 pub const fn tcp(addr: SocketAddr) -> Self {
313 Self::Port(addr, Protocol::Tcp)
314 }
315
316 pub const fn udp(addr: SocketAddr) -> Self {
317 Self::Port(addr, Protocol::Udp)
318 }
319
320 pub fn conflicts<K: Eq + Hash + Clone>(
322 components: impl IntoIterator<Item = (K, Vec<Resource>)>,
323 ) -> HashMap<Resource, HashSet<K>> {
324 let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
325 let mut unspecified = Vec::new();
326
327 for (key, resources) in components {
329 for resource in resources {
330 if let Resource::Port(address, protocol) = &resource
331 && address.ip().is_unspecified()
332 {
333 unspecified.push((key.clone(), *address, *protocol));
334 }
335
336 resource_map
337 .entry(resource)
338 .or_default()
339 .insert(key.clone());
340 }
341 }
342
343 for (key, address0, protocol0) in unspecified {
347 for (resource, components) in resource_map.iter_mut() {
348 if let Resource::Port(address, protocol) = resource {
349 if &address0 == address && &protocol0 == protocol {
353 components.insert(key.clone());
354 }
355 }
356 }
357 }
358
359 resource_map.retain(|_, components| components.len() > 1);
360
361 resource_map
362 }
363}
364
365impl Display for Protocol {
366 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
367 match self {
368 Protocol::Udp => write!(fmt, "udp"),
369 Protocol::Tcp => write!(fmt, "tcp"),
370 }
371 }
372}
373
374impl Display for Resource {
375 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
376 match self {
377 Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
378 Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
379 Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
380 Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
381 }
382 }
383}
384
385#[configurable_component]
387#[derive(Clone, Debug)]
388#[serde(deny_unknown_fields)]
389pub struct TestDefinition<T: 'static = OutputId> {
390 pub name: String,
392
393 pub input: Option<TestInput>,
395
396 #[serde(default)]
398 pub inputs: Vec<TestInput>,
399
400 #[serde(default)]
402 pub outputs: Vec<TestOutput<T>>,
403
404 #[serde(default)]
406 pub no_outputs_from: Vec<T>,
407}
408
409impl TestDefinition<String> {
410 fn resolve_outputs(
411 self,
412 graph: &graph::Graph,
413 ) -> Result<TestDefinition<OutputId>, Vec<String>> {
414 let TestDefinition {
415 name,
416 input,
417 inputs,
418 outputs,
419 no_outputs_from,
420 } = self;
421 let mut errors = Vec::new();
422
423 let output_map = graph.input_map().expect("ambiguous outputs");
424
425 let outputs = outputs
426 .into_iter()
427 .map(|old| {
428 let TestOutput {
429 extract_from,
430 conditions,
431 } = old;
432
433 (extract_from.to_vec(), conditions)
434 })
435 .filter_map(|(extract_from, conditions)| {
436 let mut outputs = Vec::new();
437 for from in extract_from {
438 if no_outputs_from.contains(&from) {
439 errors.push(format!(
440 r#"Invalid extract_from target in test '{name}': '{from}' listed in no_outputs_from"#
441 ));
442 } else if let Some(output_id) = output_map.get(&from) {
443 outputs.push(output_id.clone());
444 } else {
445 errors.push(format!(
446 r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
447 ));
448 }
449 }
450 if outputs.is_empty() {
451 None
452 } else {
453 Some(TestOutput {
454 extract_from: outputs.into(),
455 conditions,
456 })
457 }
458 })
459 .collect();
460
461 let no_outputs_from = no_outputs_from
462 .into_iter()
463 .filter_map(|o| {
464 if let Some(output_id) = output_map.get(&o) {
465 Some(output_id.clone())
466 } else {
467 errors.push(format!(
468 r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
469 ));
470 None
471 }
472 })
473 .collect();
474
475 if errors.is_empty() {
476 Ok(TestDefinition {
477 name,
478 input,
479 inputs,
480 outputs,
481 no_outputs_from,
482 })
483 } else {
484 Err(errors)
485 }
486 }
487}
488
489impl TestDefinition<OutputId> {
490 fn stringify(self) -> TestDefinition<String> {
491 let TestDefinition {
492 name,
493 input,
494 inputs,
495 outputs,
496 no_outputs_from,
497 } = self;
498
499 let outputs = outputs
500 .into_iter()
501 .map(|old| TestOutput {
502 extract_from: old
503 .extract_from
504 .to_vec()
505 .into_iter()
506 .map(|item| item.to_string())
507 .collect::<Vec<_>>()
508 .into(),
509 conditions: old.conditions,
510 })
511 .collect();
512
513 let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
514
515 TestDefinition {
516 name,
517 input,
518 inputs,
519 outputs,
520 no_outputs_from,
521 }
522 }
523}
524
525#[configurable_component]
530#[derive(Clone, Debug)]
531#[serde(deny_unknown_fields)]
532pub struct TestInput {
533 pub insert_at: ComponentKey,
535
536 #[serde(default = "default_test_input_type", rename = "type")]
540 pub type_str: String,
541
542 pub value: Option<String>,
547
548 pub source: Option<String>,
552
553 pub log_fields: Option<IndexMap<String, Value>>,
557
558 pub metric: Option<Metric>,
562}
563
564fn default_test_input_type() -> String {
565 "raw".to_string()
566}
567
568#[configurable_component]
573#[derive(Clone, Debug)]
574#[serde(deny_unknown_fields)]
575pub struct TestOutput<T: 'static = OutputId> {
576 pub extract_from: OneOrMany<T>,
578
579 pub conditions: Option<Vec<conditions::AnyCondition>>,
581}
582
583#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
584mod tests {
585 use std::{collections::HashMap, path::PathBuf};
586
587 use indoc::indoc;
588
589 use super::{ComponentKey, ConfigDiff, Format, builder::ConfigBuilder, format, load_from_str};
590 use crate::{config, topology};
591
592 async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
593 match config::load_from_str(config, format) {
594 Ok(c) => {
595 let diff = ConfigDiff::initial(&c);
596 let c2 = config::load_from_str(config, format).unwrap();
597 match (
598 config::warnings(&c2),
599 topology::TopologyPieces::build(&c, &diff, HashMap::new(), Default::default())
600 .await,
601 ) {
602 (warnings, Ok(_pieces)) => Ok(warnings),
603 (_, Err(errors)) => Err(errors),
604 }
605 }
606 Err(error) => Err(error),
607 }
608 }
609
610 #[tokio::test]
611 async fn bad_inputs() {
612 let err = load(
613 r#"
614 [sources.in]
615 type = "test_basic"
616
617 [transforms.sample]
618 type = "test_basic"
619 inputs = []
620 suffix = "foo"
621 increase = 1.25
622
623 [transforms.sample2]
624 type = "test_basic"
625 inputs = ["qwerty"]
626 suffix = "foo"
627 increase = 1.25
628
629 [sinks.out]
630 type = "test_basic"
631 inputs = ["asdf", "in", "in"]
632 "#,
633 Format::Toml,
634 )
635 .await
636 .unwrap_err();
637
638 assert_eq!(
639 vec![
640 "Sink \"out\" has input \"in\" duplicated 2 times",
641 "Transform \"sample\" has no inputs",
642 "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
643 "Input \"asdf\" for sink \"out\" doesn't match any components.",
644 ],
645 err,
646 );
647 }
648
649 #[tokio::test]
650 async fn duplicate_name() {
651 let err = load(
652 r#"
653 [sources.foo]
654 type = "test_basic"
655
656 [sources.bar]
657 type = "test_basic"
658
659 [transforms.foo]
660 type = "test_basic"
661 inputs = ["bar"]
662 suffix = "foo"
663 increase = 1.25
664
665 [sinks.out]
666 type = "test_basic"
667 inputs = ["foo"]
668 "#,
669 Format::Toml,
670 )
671 .await
672 .unwrap_err();
673
674 assert_eq!(
675 err,
676 vec!["More than one component with name \"foo\" (source, transform).",]
677 );
678 }
679
680 #[tokio::test]
681 #[cfg(unix)]
682 async fn conflicting_stdin_and_fd_resources() {
683 let errors = load(
684 r#"
685 [sources.stdin]
686 type = "stdin"
687
688 [sources.file_descriptor]
689 type = "file_descriptor"
690 fd = 0
691
692 [sinks.out]
693 type = "test_basic"
694 inputs = ["stdin", "file_descriptor"]
695 "#,
696 Format::Toml,
697 )
698 .await
699 .unwrap_err();
700
701 assert_eq!(errors.len(), 1);
702 let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
703 assert!(errors[0].starts_with(expected_prefix));
704 }
705
706 #[tokio::test]
707 #[cfg(unix)]
708 async fn conflicting_fd_resources() {
709 let errors = load(
710 r#"
711 [sources.file_descriptor1]
712 type = "file_descriptor"
713 fd = 10
714 [sources.file_descriptor2]
715 type = "file_descriptor"
716 fd = 10
717 [sinks.out]
718 type = "test_basic"
719 inputs = ["file_descriptor1", "file_descriptor2"]
720 "#,
721 Format::Toml,
722 )
723 .await
724 .unwrap_err();
725
726 assert_eq!(errors.len(), 1);
727 let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
728 assert!(errors[0].starts_with(expected_prefix));
729 }
730
731 #[tokio::test]
732 #[cfg(all(unix, feature = "sources-file_descriptor"))]
733 async fn no_conflict_fd_resources() {
734 use crate::sources::file_descriptors::file_descriptor::null_fd;
735 let fd1 = null_fd().unwrap();
736 let fd2 = null_fd().unwrap();
737 let result = load(
738 &format!(
739 r#"
740 [sources.file_descriptor1]
741 type = "file_descriptor"
742 fd = {fd1}
743
744 [sources.file_descriptor2]
745 type = "file_descriptor"
746 fd = {fd2}
747
748 [sinks.out]
749 type = "test_basic"
750 inputs = ["file_descriptor1", "file_descriptor2"]
751 "#
752 ),
753 Format::Toml,
754 )
755 .await;
756
757 let expected = Ok(vec![]);
758 assert_eq!(result, expected);
759 }
760
761 #[tokio::test]
762 async fn warnings() {
763 let warnings = load(
764 r#"
765 [sources.in1]
766 type = "test_basic"
767
768 [sources.in2]
769 type = "test_basic"
770
771 [transforms.sample1]
772 type = "test_basic"
773 inputs = ["in1"]
774 suffix = "foo"
775 increase = 1.25
776
777 [transforms.sample2]
778 type = "test_basic"
779 inputs = ["in1"]
780 suffix = "foo"
781 increase = 1.25
782
783 [sinks.out]
784 type = "test_basic"
785 inputs = ["sample1"]
786 "#,
787 Format::Toml,
788 )
789 .await
790 .unwrap();
791
792 assert_eq!(
793 warnings,
794 vec![
795 "Transform \"sample2\" has no consumers",
796 "Source \"in2\" has no consumers",
797 ]
798 )
799 }
800
801 #[tokio::test]
802 async fn cycle() {
803 let errors = load(
804 r#"
805 [sources.in]
806 type = "test_basic"
807
808 [transforms.one]
809 type = "test_basic"
810 inputs = ["in"]
811 suffix = "foo"
812 increase = 1.25
813
814 [transforms.two]
815 type = "test_basic"
816 inputs = ["one", "four"]
817 suffix = "foo"
818 increase = 1.25
819
820 [transforms.three]
821 type = "test_basic"
822 inputs = ["two"]
823 suffix = "foo"
824 increase = 1.25
825
826 [transforms.four]
827 type = "test_basic"
828 inputs = ["three"]
829 suffix = "foo"
830 increase = 1.25
831
832 [sinks.out]
833 type = "test_basic"
834 inputs = ["four"]
835 "#,
836 Format::Toml,
837 )
838 .await
839 .unwrap_err();
840
841 assert_eq!(
842 errors,
843 vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
844 )
845 }
846
847 #[test]
848 fn default_data_dir() {
849 let config = load_from_str(
850 indoc! {r#"
851 [sources.in]
852 type = "test_basic"
853
854 [sinks.out]
855 type = "test_basic"
856 inputs = ["in"]
857 "#},
858 Format::Toml,
859 )
860 .unwrap();
861
862 assert_eq!(
863 Some(PathBuf::from("/var/lib/vector")),
864 config.global.data_dir
865 )
866 }
867
868 #[test]
869 fn default_schema() {
870 let config = load_from_str(
871 indoc! {r#"
872 [sources.in]
873 type = "test_basic"
874
875 [sinks.out]
876 type = "test_basic"
877 inputs = ["in"]
878 "#},
879 Format::Toml,
880 )
881 .unwrap();
882
883 assert_eq!(
884 "host",
885 config.global.log_schema.host_key().unwrap().to_string()
886 );
887 assert_eq!(
888 "message",
889 config.global.log_schema.message_key().unwrap().to_string()
890 );
891 assert_eq!(
892 "timestamp",
893 config
894 .global
895 .log_schema
896 .timestamp_key()
897 .unwrap()
898 .to_string()
899 );
900 }
901
902 #[test]
903 fn custom_schema() {
904 let config = load_from_str(
905 indoc! {r#"
906 [log_schema]
907 host_key = "this"
908 message_key = "that"
909 timestamp_key = "then"
910
911 [sources.in]
912 type = "test_basic"
913
914 [sinks.out]
915 type = "test_basic"
916 inputs = ["in"]
917 "#},
918 Format::Toml,
919 )
920 .unwrap();
921
922 assert_eq!(
923 "this",
924 config.global.log_schema.host_key().unwrap().to_string()
925 );
926 assert_eq!(
927 "that",
928 config.global.log_schema.message_key().unwrap().to_string()
929 );
930 assert_eq!(
931 "then",
932 config
933 .global
934 .log_schema
935 .timestamp_key()
936 .unwrap()
937 .to_string()
938 );
939 }
940
941 #[test]
942 fn config_append() {
943 let mut config: ConfigBuilder = format::deserialize(
944 indoc! {r#"
945 [sources.in]
946 type = "test_basic"
947
948 [sinks.out]
949 type = "test_basic"
950 inputs = ["in"]
951 "#},
952 Format::Toml,
953 )
954 .unwrap();
955
956 assert_eq!(
957 config.append(
958 format::deserialize(
959 indoc! {r#"
960 data_dir = "/foobar"
961
962 [proxy]
963 http = "http://proxy.inc:3128"
964
965 [transforms.foo]
966 type = "test_basic"
967 inputs = [ "in" ]
968 suffix = "foo"
969 increase = 1.25
970
971 [[tests]]
972 name = "check_simple_log"
973 [tests.input]
974 insert_at = "foo"
975 type = "raw"
976 value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
977 [[tests.outputs]]
978 extract_from = "foo"
979 [[tests.outputs.conditions]]
980 type = "vrl"
981 source = ".message == \"Sorry, I'm busy this week Cecil\""
982 "#},
983 Format::Toml,
984 )
985 .unwrap()
986 ),
987 Ok(())
988 );
989
990 assert!(config.global.proxy.http.is_some());
991 assert!(config.global.proxy.https.is_none());
992 assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
993 assert!(config.sources.contains_key(&ComponentKey::from("in")));
994 assert!(config.sinks.contains_key(&ComponentKey::from("out")));
995 assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
996 assert_eq!(config.tests.len(), 1);
997 }
998
999 #[test]
1000 fn config_append_collisions() {
1001 let mut config: ConfigBuilder = format::deserialize(
1002 indoc! {r#"
1003 [sources.in]
1004 type = "test_basic"
1005
1006 [sinks.out]
1007 type = "test_basic"
1008 inputs = ["in"]
1009 "#},
1010 Format::Toml,
1011 )
1012 .unwrap();
1013
1014 assert_eq!(
1015 config.append(
1016 format::deserialize(
1017 indoc! {r#"
1018 [sources.in]
1019 type = "test_basic"
1020
1021 [transforms.foo]
1022 type = "test_basic"
1023 inputs = [ "in" ]
1024 suffix = "foo"
1025 increase = 1.25
1026
1027 [sinks.out]
1028 type = "test_basic"
1029 inputs = ["in"]
1030 "#},
1031 Format::Toml,
1032 )
1033 .unwrap()
1034 ),
1035 Err(vec![
1036 "duplicate source id found: in".into(),
1037 "duplicate sink id found: out".into(),
1038 ])
1039 );
1040 }
1041
1042 #[test]
1043 fn with_proxy() {
1044 let config: ConfigBuilder = format::deserialize(
1045 indoc! {r#"
1046 [proxy]
1047 http = "http://server:3128"
1048 https = "http://other:3128"
1049 no_proxy = ["localhost", "127.0.0.1"]
1050
1051 [sources.in]
1052 type = "nginx_metrics"
1053 endpoints = ["http://localhost:8000/basic_status"]
1054 proxy.http = "http://server:3128"
1055 proxy.https = "http://other:3128"
1056 proxy.no_proxy = ["localhost", "127.0.0.1"]
1057
1058 [sinks.out]
1059 type = "console"
1060 inputs = ["in"]
1061 encoding.codec = "json"
1062 "#},
1063 Format::Toml,
1064 )
1065 .unwrap();
1066 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1067 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1068 assert!(config.global.proxy.no_proxy.matches("localhost"));
1069 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1070 assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1071 assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1072 assert!(source.proxy.no_proxy.matches("localhost"));
1073 }
1074
1075 #[test]
1076 fn with_partial_global_proxy() {
1077 let config: ConfigBuilder = format::deserialize(
1078 indoc! {r#"
1079 [proxy]
1080 http = "http://server:3128"
1081
1082 [sources.in]
1083 type = "nginx_metrics"
1084 endpoints = ["http://localhost:8000/basic_status"]
1085
1086 [sources.in.proxy]
1087 http = "http://server:3129"
1088 https = "http://other:3129"
1089 no_proxy = ["localhost", "127.0.0.1"]
1090
1091 [sinks.out]
1092 type = "console"
1093 inputs = ["in"]
1094 encoding.codec = "json"
1095 "#},
1096 Format::Toml,
1097 )
1098 .unwrap();
1099 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1100 assert_eq!(config.global.proxy.https, None);
1101 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1102 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1103 assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1104 assert!(source.proxy.no_proxy.matches("localhost"));
1105 }
1106
1107 #[test]
1108 fn with_partial_source_proxy() {
1109 let config: ConfigBuilder = format::deserialize(
1110 indoc! {r#"
1111 [proxy]
1112 http = "http://server:3128"
1113 https = "http://other:3128"
1114
1115 [sources.in]
1116 type = "nginx_metrics"
1117 endpoints = ["http://localhost:8000/basic_status"]
1118
1119 [sources.in.proxy]
1120 http = "http://server:3129"
1121 no_proxy = ["localhost", "127.0.0.1"]
1122
1123 [sinks.out]
1124 type = "console"
1125 inputs = ["in"]
1126 encoding.codec = "json"
1127 "#},
1128 Format::Toml,
1129 )
1130 .unwrap();
1131 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1132 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1133 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1134 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1135 assert_eq!(source.proxy.https, None);
1136 assert!(source.proxy.no_proxy.matches("localhost"));
1137 }
1138}
1139
1140#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1141mod acknowledgements_tests {
1142 use indoc::indoc;
1143
1144 use super::*;
1145
1146 #[test]
1147 fn propagates_settings() {
1148 let config: ConfigBuilder = format::deserialize(
1153 indoc! {r#"
1154 data_dir = "/tmp"
1155 [sources.in1]
1156 type = "file"
1157 include = ["/var/log/**/*.log"]
1158 [sources.in2]
1159 type = "file"
1160 include = ["/var/log/**/*.log"]
1161 [sources.in3]
1162 type = "file"
1163 include = ["/var/log/**/*.log"]
1164 [transforms.parse3]
1165 type = "test_basic"
1166 inputs = ["in3"]
1167 increase = 0.0
1168 suffix = ""
1169 [sinks.out1]
1170 type = "file"
1171 inputs = ["in1"]
1172 encoding.codec = "text"
1173 path = "/path/to/out1"
1174 [sinks.out2]
1175 type = "file"
1176 inputs = ["in2"]
1177 encoding.codec = "text"
1178 path = "/path/to/out2"
1179 acknowledgements = true
1180 [sinks.out3]
1181 type = "file"
1182 inputs = ["parse3"]
1183 encoding.codec = "text"
1184 path = "/path/to/out3"
1185 acknowledgements.enabled = true
1186 "#},
1187 Format::Toml,
1188 )
1189 .unwrap();
1190
1191 for source in config.sources.values() {
1192 assert!(
1193 !source.sink_acknowledgements,
1194 "Source `sink_acknowledgements` should be `false` before propagation"
1195 );
1196 }
1197
1198 let config = config.build().unwrap();
1199
1200 let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1201 assert!(!get("in1").sink_acknowledgements);
1202 assert!(get("in2").sink_acknowledgements);
1203 assert!(get("in3").sink_acknowledgements);
1204 }
1205}
1206
1207#[cfg(test)]
1208mod resource_tests {
1209 use std::{
1210 collections::{HashMap, HashSet},
1211 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1212 };
1213
1214 use proptest::prelude::*;
1215
1216 use super::Resource;
1217
1218 fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1219 Resource::tcp(SocketAddr::new(addr.into(), port))
1220 }
1221
1222 fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1223 Resource::udp(SocketAddr::new(addr.into(), port))
1224 }
1225
1226 fn unspecified() -> impl Strategy<Value = IpAddr> {
1227 prop_oneof![
1228 Just(Ipv4Addr::UNSPECIFIED.into()),
1229 Just(Ipv6Addr::UNSPECIFIED.into()),
1230 ]
1231 }
1232
1233 fn specaddr() -> impl Strategy<Value = IpAddr> {
1234 any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1235 }
1236
1237 fn specport() -> impl Strategy<Value = u16> {
1238 any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1239 }
1240
1241 fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1242 conflicts
1243 .into_iter()
1244 .map(|(key, values)| (key, values.into_iter().collect()))
1245 .collect()
1246 }
1247
1248 proptest! {
1249 #[test]
1250 fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1251 prop_assume!(port1 != port2);
1252 let components = vec![
1253 ("sink_0", vec![tcp(addr, 0)]),
1254 ("sink_1", vec![tcp(addr, port1)]),
1255 ("sink_2", vec![tcp(addr, port2)]),
1256 ];
1257 let conflicting = Resource::conflicts(components);
1258 assert_eq!(conflicting, HashMap::new());
1259 }
1260
1261 #[test]
1262 fn conflicting_pair(addr: IpAddr, port in specport()) {
1263 let components = vec![
1264 ("sink_0", vec![tcp(addr, 0)]),
1265 ("sink_1", vec![tcp(addr, port)]),
1266 ("sink_2", vec![tcp(addr, port)]),
1267 ];
1268 let conflicting = Resource::conflicts(components);
1269 assert_eq!(
1270 conflicting,
1271 hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1272 );
1273 }
1274
1275 #[test]
1276 fn conflicting_multi(addr: IpAddr, port in specport()) {
1277 let components = vec![
1278 ("sink_0", vec![tcp(addr, 0)]),
1279 ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1280 ("sink_2", vec![tcp(addr, port)]),
1281 ];
1282 let conflicting = Resource::conflicts(components);
1283 assert_eq!(
1284 conflicting,
1285 hashmap(vec![
1286 (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1287 (tcp(addr, port), vec!["sink_1", "sink_2"])
1288 ])
1289 );
1290 }
1291
1292 #[test]
1293 fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1294 prop_assume!(addr1 != addr2);
1295 let components = vec![
1296 ("sink_0", vec![tcp(addr1, port)]),
1297 ("sink_1", vec![tcp(addr2, port)]),
1298 ];
1299 let conflicting = Resource::conflicts(components);
1300 assert_eq!(conflicting, HashMap::new());
1301 }
1302
1303 #[test]
1304 fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1305 let components = vec![
1306 ("sink_0", vec![tcp(addr, port)]),
1307 ("sink_1", vec![tcp(unspec, port)]),
1308 ];
1309 let conflicting = Resource::conflicts(components);
1310 assert_eq!(conflicting, HashMap::new());
1311 }
1312
1313 #[test]
1314 fn different_protocol(addr: IpAddr) {
1315 let components = vec![
1316 ("sink_0", vec![tcp(addr, 0)]),
1317 ("sink_1", vec![udp(addr, 0)]),
1318 ];
1319 let conflicting = Resource::conflicts(components);
1320 assert_eq!(conflicting, HashMap::new());
1321 }
1322 }
1323
1324 #[test]
1325 fn different_unspecified_ip_version() {
1326 let components = vec![
1327 ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1328 ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1329 ];
1330 let conflicting = Resource::conflicts(components);
1331 assert_eq!(conflicting, HashMap::new());
1332 }
1333}
1334
1335#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1336mod resource_config_tests {
1337 use indoc::indoc;
1338 use vector_lib::configurable::schema::generate_root_schema;
1339
1340 use super::{Format, load_from_str};
1341
1342 #[test]
1343 fn config_conflict_detected() {
1344 assert!(
1345 load_from_str(
1346 indoc! {r#"
1347 [sources.in0]
1348 type = "stdin"
1349
1350 [sources.in1]
1351 type = "stdin"
1352
1353 [sinks.out]
1354 type = "console"
1355 inputs = ["in0","in1"]
1356 encoding.codec = "json"
1357 "#},
1358 Format::Toml,
1359 )
1360 .is_err()
1361 );
1362 }
1363
1364 #[test]
1365 #[ignore]
1366 #[allow(clippy::print_stdout)]
1367 #[allow(clippy::print_stderr)]
1368 fn generate_component_config_schema() {
1369 use indexmap::IndexMap;
1370 use vector_lib::{config::ComponentKey, configurable::configurable_component};
1371
1372 use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1373
1374 #[configurable_component]
1376 #[derive(Clone)]
1377 struct ComponentsOnlyConfig {
1378 #[serde(default)]
1380 pub sources: IndexMap<ComponentKey, SourceOuter>,
1381
1382 #[serde(default)]
1384 pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1385
1386 #[serde(default)]
1388 pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1389 }
1390
1391 match generate_root_schema::<ComponentsOnlyConfig>() {
1392 Ok(schema) => {
1393 let json = serde_json::to_string_pretty(&schema)
1394 .expect("rendering root schema to JSON should not fail");
1395
1396 println!("{json}");
1397 }
1398 Err(e) => eprintln!("error while generating schema: {e:?}"),
1399 }
1400 }
1401}