1#![allow(missing_docs)]
2use std::{
3 collections::{HashMap, HashSet},
4 fmt::{self, Display, Formatter},
5 fs,
6 hash::Hash,
7 net::SocketAddr,
8 path::PathBuf,
9 time::Duration,
10};
11
12use indexmap::IndexMap;
13use serde::Serialize;
14use vector_config::configurable_component;
15pub use vector_lib::{
16 config::{
17 AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
18 SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
19 },
20 configurable::component::{GenerateConfig, SinkDescription, TransformDescription},
21};
22
23use crate::{
24 conditions,
25 event::{Metric, Value},
26 secrets::SecretBackends,
27 serde::OneOrMany,
28};
29
30pub mod api;
31mod builder;
32mod cmd;
33mod compiler;
34mod diff;
35pub mod dot_graph;
36mod enrichment_table;
37pub mod format;
38mod graph;
39mod loading;
40pub mod provider;
41pub mod schema;
42mod secret;
43mod sink;
44mod source;
45mod transform;
46pub mod unit_test;
47mod validation;
48mod vars;
49pub mod watcher;
50
51pub use builder::ConfigBuilder;
52pub use cmd::{Opts, cmd};
53pub use diff::ConfigDiff;
54pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
55pub use format::{Format, FormatHint};
56pub use loading::{
57 COLLECTOR, CONFIG_PATHS, load, load_builder_from_paths, load_from_paths,
58 load_from_paths_with_provider_and_secrets, load_from_str, load_from_str_with_secrets,
59 load_source_from_paths, merge_path_lists, process_paths,
60};
61pub use provider::ProviderConfig;
62pub use secret::SecretBackend;
63pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
64pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
65pub use transform::{
66 BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids,
67};
68pub use unit_test::{UnitTestResult, build_unit_tests, build_unit_tests_main};
69pub use validation::warnings;
70pub use vars::{ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX, interpolate};
71pub use vector_lib::{
72 config::{
73 ComponentKey, LogSchema, OutputId, init_log_schema, init_telemetry, log_schema,
74 proxy::ProxyConfig, telemetry,
75 },
76 id::Inputs,
77};
78
79#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80pub enum ComponentType {
82 Transform,
83 Sink,
84 EnrichmentTable,
85}
86
87#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
88pub struct ComponentConfig {
89 pub config_paths: Vec<PathBuf>,
90 pub component_key: ComponentKey,
91 pub component_type: ComponentType,
92}
93
94impl ComponentConfig {
95 pub fn new(
96 config_paths: Vec<PathBuf>,
97 component_key: ComponentKey,
98 component_type: ComponentType,
99 ) -> Self {
100 let canonicalized_paths = config_paths
101 .into_iter()
102 .filter_map(|p| fs::canonicalize(p).ok())
103 .collect();
104
105 Self {
106 config_paths: canonicalized_paths,
107 component_key,
108 component_type,
109 }
110 }
111
112 pub fn contains(
113 &self,
114 config_paths: &HashSet<PathBuf>,
115 ) -> Option<(ComponentKey, ComponentType)> {
116 if config_paths.iter().any(|p| self.config_paths.contains(p)) {
117 return Some((self.component_key.clone(), self.component_type.clone()));
118 }
119 None
120 }
121}
122
123#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
124pub enum ConfigPath {
125 File(PathBuf, FormatHint),
126 Dir(PathBuf),
127}
128
129impl<'a> From<&'a ConfigPath> for &'a PathBuf {
130 fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
131 match config_path {
132 ConfigPath::File(path, _) => path,
133 ConfigPath::Dir(path) => path,
134 }
135 }
136}
137
138impl ConfigPath {
139 pub const fn as_dir(&self) -> Option<&PathBuf> {
140 match self {
141 Self::Dir(path) => Some(path),
142 _ => None,
143 }
144 }
145}
146
147#[derive(Debug, Default, Serialize)]
148pub struct Config {
149 #[cfg(feature = "api")]
150 pub api: api::Options,
151 pub schema: schema::Options,
152 pub global: GlobalOptions,
153 pub healthchecks: HealthcheckOptions,
154 sources: IndexMap<ComponentKey, SourceOuter>,
155 sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
156 transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
157 pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
158 tests: Vec<TestDefinition>,
159 secret: IndexMap<ComponentKey, SecretBackends>,
160 pub graceful_shutdown_duration: Option<Duration>,
161}
162
163impl Config {
164 pub fn builder() -> builder::ConfigBuilder {
165 Default::default()
166 }
167
168 pub fn is_empty(&self) -> bool {
169 self.sources.is_empty()
170 }
171
172 pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
173 self.sources.iter()
174 }
175
176 pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
177 self.sources.get(id)
178 }
179
180 pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
181 self.transforms.iter()
182 }
183
184 pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
185 self.transforms.get(id)
186 }
187
188 pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
189 self.sinks.iter()
190 }
191
192 pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
193 self.sinks.get(id)
194 }
195
196 pub fn enrichment_tables(
197 &self,
198 ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
199 self.enrichment_tables.iter()
200 }
201
202 pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
203 self.enrichment_tables.get(id)
204 }
205
206 pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
207 self.transforms
208 .get(id)
209 .map(|t| &t.inputs[..])
210 .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
211 .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
212 }
213
214 pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
215 let inputs: Vec<_> = self
216 .sinks
217 .iter()
218 .filter(|(_, sink)| {
219 sink.inner
220 .acknowledgements()
221 .merge_default(&self.global.acknowledgements)
222 .enabled()
223 })
224 .flat_map(|(name, sink)| {
225 sink.inputs
226 .iter()
227 .map(|input| (name.clone(), input.clone()))
228 })
229 .collect();
230 self.propagate_acks_rec(inputs);
231 Ok(())
232 }
233
234 fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
235 for (sink, input) in sink_inputs {
236 let component = &input.component;
237 if let Some(source) = self.sources.get_mut(component) {
238 if source.inner.can_acknowledge() {
239 source.sink_acknowledgements = true;
240 } else {
241 warn!(
242 message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
243 source = component.id(),
244 sink = sink.id(),
245 );
246 }
247 } else if let Some(transform) = self.transforms.get(component) {
248 let inputs = transform
249 .inputs
250 .iter()
251 .map(|input| (sink.clone(), input.clone()))
252 .collect();
253 self.propagate_acks_rec(inputs);
254 }
255 }
256 }
257
258 pub fn transform_keys_with_external_files(&self) -> HashSet<ComponentKey> {
259 self.transforms
260 .iter()
261 .filter_map(|(name, transform_outer)| {
262 if !transform_outer.inner.files_to_watch().is_empty() {
263 Some(name.clone())
264 } else {
265 None
266 }
267 })
268 .collect()
269 }
270}
271
272#[configurable_component(global_option("healthchecks"))]
274#[derive(Clone, Copy, Debug)]
275#[serde(default)]
276pub struct HealthcheckOptions {
277 pub enabled: bool,
281
282 pub require_healthy: bool,
288}
289
290impl HealthcheckOptions {
291 pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
292 if let Some(require_healthy) = require_healthy.into() {
293 self.require_healthy = require_healthy;
294 }
295 }
296
297 const fn merge(&mut self, other: Self) {
298 self.enabled &= other.enabled;
299 self.require_healthy |= other.require_healthy;
300 }
301}
302
303impl Default for HealthcheckOptions {
304 fn default() -> Self {
305 Self {
306 enabled: true,
307 require_healthy: false,
308 }
309 }
310}
311
312impl_generate_config_from_default!(HealthcheckOptions);
313
314#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
316pub enum Resource {
317 Port(SocketAddr, Protocol),
318 SystemFdOffset(usize),
319 Fd(u32),
320 DiskBuffer(String),
321}
322
323#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
324pub enum Protocol {
325 Tcp,
326 Udp,
327}
328
329impl Resource {
330 pub const fn tcp(addr: SocketAddr) -> Self {
331 Self::Port(addr, Protocol::Tcp)
332 }
333
334 pub const fn udp(addr: SocketAddr) -> Self {
335 Self::Port(addr, Protocol::Udp)
336 }
337
338 pub fn conflicts<K: Eq + Hash + Clone>(
340 components: impl IntoIterator<Item = (K, Vec<Resource>)>,
341 ) -> HashMap<Resource, HashSet<K>> {
342 let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
343 let mut unspecified = Vec::new();
344
345 for (key, resources) in components {
347 for resource in resources {
348 if let Resource::Port(address, protocol) = &resource
349 && address.ip().is_unspecified()
350 {
351 unspecified.push((key.clone(), *address, *protocol));
352 }
353
354 resource_map
355 .entry(resource)
356 .or_default()
357 .insert(key.clone());
358 }
359 }
360
361 for (key, address0, protocol0) in unspecified {
365 for (resource, components) in resource_map.iter_mut() {
366 if let Resource::Port(address, protocol) = resource {
367 if &address0 == address && &protocol0 == protocol {
371 components.insert(key.clone());
372 }
373 }
374 }
375 }
376
377 resource_map.retain(|_, components| components.len() > 1);
378
379 resource_map
380 }
381}
382
383impl Display for Protocol {
384 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
385 match self {
386 Protocol::Udp => write!(fmt, "udp"),
387 Protocol::Tcp => write!(fmt, "tcp"),
388 }
389 }
390}
391
392impl Display for Resource {
393 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
394 match self {
395 Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
396 Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
397 Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
398 Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
399 }
400 }
401}
402
403#[configurable_component]
405#[derive(Clone, Debug)]
406#[serde(deny_unknown_fields)]
407pub struct TestDefinition<T: 'static = OutputId> {
408 pub name: String,
410
411 pub input: Option<TestInput>,
413
414 #[serde(default)]
416 pub inputs: Vec<TestInput>,
417
418 #[serde(default)]
420 pub outputs: Vec<TestOutput<T>>,
421
422 #[serde(default)]
424 pub no_outputs_from: Vec<T>,
425}
426
427impl TestDefinition<String> {
428 fn resolve_outputs(
429 self,
430 graph: &graph::Graph,
431 ) -> Result<TestDefinition<OutputId>, Vec<String>> {
432 let TestDefinition {
433 name,
434 input,
435 inputs,
436 outputs,
437 no_outputs_from,
438 } = self;
439 let mut errors = Vec::new();
440
441 let output_map = graph.input_map().expect("ambiguous outputs");
442
443 let outputs = outputs
444 .into_iter()
445 .map(|old| {
446 let TestOutput {
447 extract_from,
448 conditions,
449 } = old;
450
451 (extract_from.to_vec(), conditions)
452 })
453 .filter_map(|(extract_from, conditions)| {
454 let mut outputs = Vec::new();
455 for from in extract_from {
456 if no_outputs_from.contains(&from) {
457 errors.push(format!(
458 r#"Invalid extract_from target in test '{name}': '{from}' listed in no_outputs_from"#
459 ));
460 } else if let Some(output_id) = output_map.get(&from) {
461 outputs.push(output_id.clone());
462 } else {
463 errors.push(format!(
464 r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
465 ));
466 }
467 }
468 if outputs.is_empty() {
469 None
470 } else {
471 Some(TestOutput {
472 extract_from: outputs.into(),
473 conditions,
474 })
475 }
476 })
477 .collect();
478
479 let no_outputs_from = no_outputs_from
480 .into_iter()
481 .filter_map(|o| {
482 if let Some(output_id) = output_map.get(&o) {
483 Some(output_id.clone())
484 } else {
485 errors.push(format!(
486 r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
487 ));
488 None
489 }
490 })
491 .collect();
492
493 if errors.is_empty() {
494 Ok(TestDefinition {
495 name,
496 input,
497 inputs,
498 outputs,
499 no_outputs_from,
500 })
501 } else {
502 Err(errors)
503 }
504 }
505}
506
507impl TestDefinition<OutputId> {
508 fn stringify(self) -> TestDefinition<String> {
509 let TestDefinition {
510 name,
511 input,
512 inputs,
513 outputs,
514 no_outputs_from,
515 } = self;
516
517 let outputs = outputs
518 .into_iter()
519 .map(|old| TestOutput {
520 extract_from: old
521 .extract_from
522 .to_vec()
523 .into_iter()
524 .map(|item| item.to_string())
525 .collect::<Vec<_>>()
526 .into(),
527 conditions: old.conditions,
528 })
529 .collect();
530
531 let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
532
533 TestDefinition {
534 name,
535 input,
536 inputs,
537 outputs,
538 no_outputs_from,
539 }
540 }
541}
542
543#[configurable_component]
548#[derive(Clone, Debug)]
549#[serde(deny_unknown_fields)]
550pub struct TestInput {
551 pub insert_at: ComponentKey,
553
554 #[serde(default = "default_test_input_type", rename = "type")]
558 pub type_str: String,
559
560 pub value: Option<String>,
565
566 pub source: Option<String>,
570
571 pub log_fields: Option<IndexMap<String, Value>>,
575
576 pub metric: Option<Metric>,
580}
581
582fn default_test_input_type() -> String {
583 "raw".to_string()
584}
585
586#[configurable_component]
591#[derive(Clone, Debug)]
592#[serde(deny_unknown_fields)]
593pub struct TestOutput<T: 'static = OutputId> {
594 pub extract_from: OneOrMany<T>,
596
597 pub conditions: Option<Vec<conditions::AnyCondition>>,
599}
600
601#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
602mod tests {
603 use std::path::PathBuf;
604
605 use indoc::indoc;
606
607 use super::{ComponentKey, ConfigDiff, Format, builder::ConfigBuilder, format, load_from_str};
608 use crate::{config, topology::builder::TopologyPiecesBuilder};
609
610 async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
611 match config::load_from_str(config, format) {
612 Ok(c) => {
613 let diff = ConfigDiff::initial(&c);
614 let c2 = config::load_from_str(config, format).unwrap();
615 match (
616 config::warnings(&c2),
617 TopologyPiecesBuilder::new(&c, &diff).build().await,
618 ) {
619 (warnings, Ok(_pieces)) => Ok(warnings),
620 (_, Err(errors)) => Err(errors),
621 }
622 }
623 Err(error) => Err(error),
624 }
625 }
626
627 #[tokio::test]
628 async fn bad_inputs() {
629 let err = load(
630 r#"
631 [sources.in]
632 type = "test_basic"
633
634 [transforms.sample]
635 type = "test_basic"
636 inputs = []
637 suffix = "foo"
638 increase = 1.25
639
640 [transforms.sample2]
641 type = "test_basic"
642 inputs = ["qwerty"]
643 suffix = "foo"
644 increase = 1.25
645
646 [sinks.out]
647 type = "test_basic"
648 inputs = ["asdf", "in", "in"]
649 "#,
650 Format::Toml,
651 )
652 .await
653 .unwrap_err();
654
655 assert_eq!(
656 vec![
657 "Sink \"out\" has input \"in\" duplicated 2 times",
658 "Transform \"sample\" has no inputs",
659 "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
660 "Input \"asdf\" for sink \"out\" doesn't match any components.",
661 ],
662 err,
663 );
664 }
665
666 #[tokio::test]
667 async fn duplicate_name() {
668 let err = load(
669 r#"
670 [sources.foo]
671 type = "test_basic"
672
673 [sources.bar]
674 type = "test_basic"
675
676 [transforms.foo]
677 type = "test_basic"
678 inputs = ["bar"]
679 suffix = "foo"
680 increase = 1.25
681
682 [sinks.out]
683 type = "test_basic"
684 inputs = ["foo"]
685 "#,
686 Format::Toml,
687 )
688 .await
689 .unwrap_err();
690
691 assert_eq!(
692 err,
693 vec!["More than one component with name \"foo\" (source, transform).",]
694 );
695 }
696
697 #[tokio::test]
698 #[cfg(unix)]
699 async fn conflicting_stdin_and_fd_resources() {
700 let errors = load(
701 r#"
702 [sources.stdin]
703 type = "stdin"
704
705 [sources.file_descriptor]
706 type = "file_descriptor"
707 fd = 0
708
709 [sinks.out]
710 type = "test_basic"
711 inputs = ["stdin", "file_descriptor"]
712 "#,
713 Format::Toml,
714 )
715 .await
716 .unwrap_err();
717
718 assert_eq!(errors.len(), 1);
719 let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
720 assert!(errors[0].starts_with(expected_prefix));
721 }
722
723 #[tokio::test]
724 #[cfg(unix)]
725 async fn conflicting_fd_resources() {
726 let errors = load(
727 r#"
728 [sources.file_descriptor1]
729 type = "file_descriptor"
730 fd = 10
731 [sources.file_descriptor2]
732 type = "file_descriptor"
733 fd = 10
734 [sinks.out]
735 type = "test_basic"
736 inputs = ["file_descriptor1", "file_descriptor2"]
737 "#,
738 Format::Toml,
739 )
740 .await
741 .unwrap_err();
742
743 assert_eq!(errors.len(), 1);
744 let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
745 assert!(errors[0].starts_with(expected_prefix));
746 }
747
748 #[tokio::test]
749 #[cfg(all(unix, feature = "sources-file_descriptor"))]
750 async fn no_conflict_fd_resources() {
751 use crate::sources::file_descriptors::file_descriptor::null_fd;
752 let fd1 = null_fd().unwrap();
753 let fd2 = null_fd().unwrap();
754 let result = load(
755 &format!(
756 r#"
757 [sources.file_descriptor1]
758 type = "file_descriptor"
759 fd = {fd1}
760
761 [sources.file_descriptor2]
762 type = "file_descriptor"
763 fd = {fd2}
764
765 [sinks.out]
766 type = "test_basic"
767 inputs = ["file_descriptor1", "file_descriptor2"]
768 "#
769 ),
770 Format::Toml,
771 )
772 .await;
773
774 let expected = Ok(vec![]);
775 assert_eq!(result, expected);
776 }
777
778 #[tokio::test]
779 async fn warnings() {
780 let warnings = load(
781 r#"
782 [sources.in1]
783 type = "test_basic"
784
785 [sources.in2]
786 type = "test_basic"
787
788 [transforms.sample1]
789 type = "test_basic"
790 inputs = ["in1"]
791 suffix = "foo"
792 increase = 1.25
793
794 [transforms.sample2]
795 type = "test_basic"
796 inputs = ["in1"]
797 suffix = "foo"
798 increase = 1.25
799
800 [sinks.out]
801 type = "test_basic"
802 inputs = ["sample1"]
803 "#,
804 Format::Toml,
805 )
806 .await
807 .unwrap();
808
809 assert_eq!(
810 warnings,
811 vec![
812 "Transform \"sample2\" has no consumers",
813 "Source \"in2\" has no consumers",
814 ]
815 )
816 }
817
818 #[tokio::test]
819 async fn cycle() {
820 let errors = load(
821 r#"
822 [sources.in]
823 type = "test_basic"
824
825 [transforms.one]
826 type = "test_basic"
827 inputs = ["in"]
828 suffix = "foo"
829 increase = 1.25
830
831 [transforms.two]
832 type = "test_basic"
833 inputs = ["one", "four"]
834 suffix = "foo"
835 increase = 1.25
836
837 [transforms.three]
838 type = "test_basic"
839 inputs = ["two"]
840 suffix = "foo"
841 increase = 1.25
842
843 [transforms.four]
844 type = "test_basic"
845 inputs = ["three"]
846 suffix = "foo"
847 increase = 1.25
848
849 [sinks.out]
850 type = "test_basic"
851 inputs = ["four"]
852 "#,
853 Format::Toml,
854 )
855 .await
856 .unwrap_err();
857
858 assert_eq!(
859 errors,
860 vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
861 )
862 }
863
864 #[test]
865 fn default_data_dir() {
866 let config = load_from_str(
867 indoc! {r#"
868 [sources.in]
869 type = "test_basic"
870
871 [sinks.out]
872 type = "test_basic"
873 inputs = ["in"]
874 "#},
875 Format::Toml,
876 )
877 .unwrap();
878
879 assert_eq!(
880 Some(PathBuf::from("/var/lib/vector")),
881 config.global.data_dir
882 )
883 }
884
885 #[test]
886 fn default_schema() {
887 let config = load_from_str(
888 indoc! {r#"
889 [sources.in]
890 type = "test_basic"
891
892 [sinks.out]
893 type = "test_basic"
894 inputs = ["in"]
895 "#},
896 Format::Toml,
897 )
898 .unwrap();
899
900 assert_eq!(
901 "host",
902 config.global.log_schema.host_key().unwrap().to_string()
903 );
904 assert_eq!(
905 "message",
906 config.global.log_schema.message_key().unwrap().to_string()
907 );
908 assert_eq!(
909 "timestamp",
910 config
911 .global
912 .log_schema
913 .timestamp_key()
914 .unwrap()
915 .to_string()
916 );
917 }
918
919 #[test]
920 fn custom_schema() {
921 let config = load_from_str(
922 indoc! {r#"
923 [log_schema]
924 host_key = "this"
925 message_key = "that"
926 timestamp_key = "then"
927
928 [sources.in]
929 type = "test_basic"
930
931 [sinks.out]
932 type = "test_basic"
933 inputs = ["in"]
934 "#},
935 Format::Toml,
936 )
937 .unwrap();
938
939 assert_eq!(
940 "this",
941 config.global.log_schema.host_key().unwrap().to_string()
942 );
943 assert_eq!(
944 "that",
945 config.global.log_schema.message_key().unwrap().to_string()
946 );
947 assert_eq!(
948 "then",
949 config
950 .global
951 .log_schema
952 .timestamp_key()
953 .unwrap()
954 .to_string()
955 );
956 }
957
958 #[test]
959 fn config_append() {
960 let mut config: ConfigBuilder = format::deserialize(
961 indoc! {r#"
962 [sources.in]
963 type = "test_basic"
964
965 [sinks.out]
966 type = "test_basic"
967 inputs = ["in"]
968 "#},
969 Format::Toml,
970 )
971 .unwrap();
972
973 assert_eq!(
974 config.append(
975 format::deserialize(
976 indoc! {r#"
977 data_dir = "/foobar"
978
979 [proxy]
980 http = "http://proxy.inc:3128"
981
982 [transforms.foo]
983 type = "test_basic"
984 inputs = [ "in" ]
985 suffix = "foo"
986 increase = 1.25
987
988 [[tests]]
989 name = "check_simple_log"
990 [tests.input]
991 insert_at = "foo"
992 type = "raw"
993 value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
994 [[tests.outputs]]
995 extract_from = "foo"
996 [[tests.outputs.conditions]]
997 type = "vrl"
998 source = ".message == \"Sorry, I'm busy this week Cecil\""
999 "#},
1000 Format::Toml,
1001 )
1002 .unwrap()
1003 ),
1004 Ok(())
1005 );
1006
1007 assert!(config.global.proxy.http.is_some());
1008 assert!(config.global.proxy.https.is_none());
1009 assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
1010 assert!(config.sources.contains_key(&ComponentKey::from("in")));
1011 assert!(config.sinks.contains_key(&ComponentKey::from("out")));
1012 assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
1013 assert_eq!(config.tests.len(), 1);
1014 }
1015
1016 #[test]
1017 fn config_append_collisions() {
1018 let mut config: ConfigBuilder = format::deserialize(
1019 indoc! {r#"
1020 [sources.in]
1021 type = "test_basic"
1022
1023 [sinks.out]
1024 type = "test_basic"
1025 inputs = ["in"]
1026 "#},
1027 Format::Toml,
1028 )
1029 .unwrap();
1030
1031 assert_eq!(
1032 config.append(
1033 format::deserialize(
1034 indoc! {r#"
1035 [sources.in]
1036 type = "test_basic"
1037
1038 [transforms.foo]
1039 type = "test_basic"
1040 inputs = [ "in" ]
1041 suffix = "foo"
1042 increase = 1.25
1043
1044 [sinks.out]
1045 type = "test_basic"
1046 inputs = ["in"]
1047 "#},
1048 Format::Toml,
1049 )
1050 .unwrap()
1051 ),
1052 Err(vec![
1053 "duplicate source id found: in".into(),
1054 "duplicate sink id found: out".into(),
1055 ])
1056 );
1057 }
1058
1059 #[test]
1060 fn with_proxy() {
1061 let config: ConfigBuilder = format::deserialize(
1062 indoc! {r#"
1063 [proxy]
1064 http = "http://server:3128"
1065 https = "http://other:3128"
1066 no_proxy = ["localhost", "127.0.0.1"]
1067
1068 [sources.in]
1069 type = "nginx_metrics"
1070 endpoints = ["http://localhost:8000/basic_status"]
1071 proxy.http = "http://server:3128"
1072 proxy.https = "http://other:3128"
1073 proxy.no_proxy = ["localhost", "127.0.0.1"]
1074
1075 [sinks.out]
1076 type = "console"
1077 inputs = ["in"]
1078 encoding.codec = "json"
1079 "#},
1080 Format::Toml,
1081 )
1082 .unwrap();
1083 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1084 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1085 assert!(config.global.proxy.no_proxy.matches("localhost"));
1086 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1087 assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1088 assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1089 assert!(source.proxy.no_proxy.matches("localhost"));
1090 }
1091
1092 #[test]
1093 fn with_partial_global_proxy() {
1094 let config: ConfigBuilder = format::deserialize(
1095 indoc! {r#"
1096 [proxy]
1097 http = "http://server:3128"
1098
1099 [sources.in]
1100 type = "nginx_metrics"
1101 endpoints = ["http://localhost:8000/basic_status"]
1102
1103 [sources.in.proxy]
1104 http = "http://server:3129"
1105 https = "http://other:3129"
1106 no_proxy = ["localhost", "127.0.0.1"]
1107
1108 [sinks.out]
1109 type = "console"
1110 inputs = ["in"]
1111 encoding.codec = "json"
1112 "#},
1113 Format::Toml,
1114 )
1115 .unwrap();
1116 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1117 assert_eq!(config.global.proxy.https, None);
1118 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1119 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1120 assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1121 assert!(source.proxy.no_proxy.matches("localhost"));
1122 }
1123
1124 #[test]
1125 fn with_partial_source_proxy() {
1126 let config: ConfigBuilder = format::deserialize(
1127 indoc! {r#"
1128 [proxy]
1129 http = "http://server:3128"
1130 https = "http://other:3128"
1131
1132 [sources.in]
1133 type = "nginx_metrics"
1134 endpoints = ["http://localhost:8000/basic_status"]
1135
1136 [sources.in.proxy]
1137 http = "http://server:3129"
1138 no_proxy = ["localhost", "127.0.0.1"]
1139
1140 [sinks.out]
1141 type = "console"
1142 inputs = ["in"]
1143 encoding.codec = "json"
1144 "#},
1145 Format::Toml,
1146 )
1147 .unwrap();
1148 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1149 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1150 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1151 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1152 assert_eq!(source.proxy.https, None);
1153 assert!(source.proxy.no_proxy.matches("localhost"));
1154 }
1155}
1156
1157#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1158mod acknowledgements_tests {
1159 use indoc::indoc;
1160
1161 use super::*;
1162
1163 #[test]
1164 fn propagates_settings() {
1165 let config: ConfigBuilder = format::deserialize(
1170 indoc! {r#"
1171 data_dir = "/tmp"
1172 [sources.in1]
1173 type = "file"
1174 include = ["/var/log/**/*.log"]
1175 [sources.in2]
1176 type = "file"
1177 include = ["/var/log/**/*.log"]
1178 [sources.in3]
1179 type = "file"
1180 include = ["/var/log/**/*.log"]
1181 [transforms.parse3]
1182 type = "test_basic"
1183 inputs = ["in3"]
1184 increase = 0.0
1185 suffix = ""
1186 [sinks.out1]
1187 type = "file"
1188 inputs = ["in1"]
1189 encoding.codec = "text"
1190 path = "/path/to/out1"
1191 [sinks.out2]
1192 type = "file"
1193 inputs = ["in2"]
1194 encoding.codec = "text"
1195 path = "/path/to/out2"
1196 acknowledgements = true
1197 [sinks.out3]
1198 type = "file"
1199 inputs = ["parse3"]
1200 encoding.codec = "text"
1201 path = "/path/to/out3"
1202 acknowledgements.enabled = true
1203 "#},
1204 Format::Toml,
1205 )
1206 .unwrap();
1207
1208 for source in config.sources.values() {
1209 assert!(
1210 !source.sink_acknowledgements,
1211 "Source `sink_acknowledgements` should be `false` before propagation"
1212 );
1213 }
1214
1215 let config = config.build().unwrap();
1216
1217 let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1218 assert!(!get("in1").sink_acknowledgements);
1219 assert!(get("in2").sink_acknowledgements);
1220 assert!(get("in3").sink_acknowledgements);
1221 }
1222}
1223
1224#[cfg(test)]
1225mod resource_tests {
1226 use std::{
1227 collections::{HashMap, HashSet},
1228 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1229 };
1230
1231 use proptest::prelude::*;
1232
1233 use super::Resource;
1234
1235 fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1236 Resource::tcp(SocketAddr::new(addr.into(), port))
1237 }
1238
1239 fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1240 Resource::udp(SocketAddr::new(addr.into(), port))
1241 }
1242
1243 fn unspecified() -> impl Strategy<Value = IpAddr> {
1244 prop_oneof![
1245 Just(Ipv4Addr::UNSPECIFIED.into()),
1246 Just(Ipv6Addr::UNSPECIFIED.into()),
1247 ]
1248 }
1249
1250 fn specaddr() -> impl Strategy<Value = IpAddr> {
1251 any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1252 }
1253
1254 fn specport() -> impl Strategy<Value = u16> {
1255 any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1256 }
1257
1258 fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1259 conflicts
1260 .into_iter()
1261 .map(|(key, values)| (key, values.into_iter().collect()))
1262 .collect()
1263 }
1264
1265 proptest! {
1266 #[test]
1267 fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1268 prop_assume!(port1 != port2);
1269 let components = vec![
1270 ("sink_0", vec![tcp(addr, 0)]),
1271 ("sink_1", vec![tcp(addr, port1)]),
1272 ("sink_2", vec![tcp(addr, port2)]),
1273 ];
1274 let conflicting = Resource::conflicts(components);
1275 assert_eq!(conflicting, HashMap::new());
1276 }
1277
1278 #[test]
1279 fn conflicting_pair(addr: IpAddr, port in specport()) {
1280 let components = vec![
1281 ("sink_0", vec![tcp(addr, 0)]),
1282 ("sink_1", vec![tcp(addr, port)]),
1283 ("sink_2", vec![tcp(addr, port)]),
1284 ];
1285 let conflicting = Resource::conflicts(components);
1286 assert_eq!(
1287 conflicting,
1288 hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1289 );
1290 }
1291
1292 #[test]
1293 fn conflicting_multi(addr: IpAddr, port in specport()) {
1294 let components = vec![
1295 ("sink_0", vec![tcp(addr, 0)]),
1296 ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1297 ("sink_2", vec![tcp(addr, port)]),
1298 ];
1299 let conflicting = Resource::conflicts(components);
1300 assert_eq!(
1301 conflicting,
1302 hashmap(vec![
1303 (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1304 (tcp(addr, port), vec!["sink_1", "sink_2"])
1305 ])
1306 );
1307 }
1308
1309 #[test]
1310 fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1311 prop_assume!(addr1 != addr2);
1312 let components = vec![
1313 ("sink_0", vec![tcp(addr1, port)]),
1314 ("sink_1", vec![tcp(addr2, port)]),
1315 ];
1316 let conflicting = Resource::conflicts(components);
1317 assert_eq!(conflicting, HashMap::new());
1318 }
1319
1320 #[test]
1321 fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1322 let components = vec![
1323 ("sink_0", vec![tcp(addr, port)]),
1324 ("sink_1", vec![tcp(unspec, port)]),
1325 ];
1326 let conflicting = Resource::conflicts(components);
1327 assert_eq!(conflicting, HashMap::new());
1328 }
1329
1330 #[test]
1331 fn different_protocol(addr: IpAddr) {
1332 let components = vec![
1333 ("sink_0", vec![tcp(addr, 0)]),
1334 ("sink_1", vec![udp(addr, 0)]),
1335 ];
1336 let conflicting = Resource::conflicts(components);
1337 assert_eq!(conflicting, HashMap::new());
1338 }
1339 }
1340
1341 #[test]
1342 fn different_unspecified_ip_version() {
1343 let components = vec![
1344 ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1345 ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1346 ];
1347 let conflicting = Resource::conflicts(components);
1348 assert_eq!(conflicting, HashMap::new());
1349 }
1350}
1351
1352#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1353mod resource_config_tests {
1354 use indoc::indoc;
1355 use vector_lib::configurable::schema::generate_root_schema;
1356
1357 use super::{Format, load_from_str};
1358
1359 #[test]
1360 fn config_conflict_detected() {
1361 assert!(
1362 load_from_str(
1363 indoc! {r#"
1364 [sources.in0]
1365 type = "stdin"
1366
1367 [sources.in1]
1368 type = "stdin"
1369
1370 [sinks.out]
1371 type = "console"
1372 inputs = ["in0","in1"]
1373 encoding.codec = "json"
1374 "#},
1375 Format::Toml,
1376 )
1377 .is_err()
1378 );
1379 }
1380
1381 #[test]
1382 #[ignore]
1383 #[allow(clippy::print_stdout)]
1384 #[allow(clippy::print_stderr)]
1385 fn generate_component_config_schema() {
1386 use indexmap::IndexMap;
1387 use vector_lib::{config::ComponentKey, configurable::configurable_component};
1388
1389 use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1390
1391 #[configurable_component]
1393 #[derive(Clone)]
1394 struct ComponentsOnlyConfig {
1395 #[serde(default)]
1397 pub sources: IndexMap<ComponentKey, SourceOuter>,
1398
1399 #[serde(default)]
1401 pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1402
1403 #[serde(default)]
1405 pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1406 }
1407
1408 match generate_root_schema::<ComponentsOnlyConfig>() {
1409 Ok(schema) => {
1410 let json = serde_json::to_string_pretty(&schema)
1411 .expect("rendering root schema to JSON should not fail");
1412
1413 println!("{json}");
1414 }
1415 Err(e) => eprintln!("error while generating schema: {e:?}"),
1416 }
1417 }
1418}