1#![allow(missing_docs)]
2use std::{
3 collections::{HashMap, HashSet},
4 fmt::{self, Display, Formatter},
5 fs,
6 hash::Hash,
7 net::SocketAddr,
8 path::PathBuf,
9 time::Duration,
10};
11
12use crate::{
13 conditions,
14 event::{Metric, Value},
15 secrets::SecretBackends,
16 serde::OneOrMany,
17};
18
19use indexmap::IndexMap;
20use serde::Serialize;
21
22use vector_config::configurable_component;
23pub use vector_lib::config::{
24 AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
25 SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
26};
27pub use vector_lib::configurable::component::{
28 GenerateConfig, SinkDescription, TransformDescription,
29};
30
31pub mod api;
32mod builder;
33mod cmd;
34mod compiler;
35mod diff;
36pub mod dot_graph;
37mod enrichment_table;
38pub mod format;
39mod graph;
40mod loading;
41pub mod provider;
42pub mod schema;
43mod secret;
44mod sink;
45mod source;
46mod transform;
47pub mod unit_test;
48mod validation;
49mod vars;
50pub mod watcher;
51
52pub use builder::ConfigBuilder;
53pub use cmd::{cmd, Opts};
54pub use diff::ConfigDiff;
55pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
56pub use format::{Format, FormatHint};
57pub use loading::{
58 load, load_builder_from_paths, load_from_paths, load_from_paths_with_provider_and_secrets,
59 load_from_str, load_source_from_paths, merge_path_lists, process_paths, COLLECTOR,
60 CONFIG_PATHS,
61};
62pub use provider::ProviderConfig;
63pub use secret::SecretBackend;
64pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
65pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
66pub use transform::{
67 get_transform_output_ids, BoxedTransform, TransformConfig, TransformContext, TransformOuter,
68};
69pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
70pub use validation::warnings;
71pub use vars::{interpolate, ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX};
72pub use vector_lib::{
73 config::{
74 init_log_schema, init_telemetry, log_schema, proxy::ProxyConfig, telemetry, ComponentKey,
75 LogSchema, OutputId,
76 },
77 id::Inputs,
78};
79
80#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
81pub enum ComponentType {
83 Transform,
84 Sink,
85 EnrichmentTable,
86}
87
88#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
89pub struct ComponentConfig {
90 pub config_paths: Vec<PathBuf>,
91 pub component_key: ComponentKey,
92 pub component_type: ComponentType,
93}
94
95impl ComponentConfig {
96 pub fn new(
97 config_paths: Vec<PathBuf>,
98 component_key: ComponentKey,
99 component_type: ComponentType,
100 ) -> Self {
101 let canonicalized_paths = config_paths
102 .into_iter()
103 .filter_map(|p| fs::canonicalize(p).ok())
104 .collect();
105
106 Self {
107 config_paths: canonicalized_paths,
108 component_key,
109 component_type,
110 }
111 }
112
113 pub fn contains(&self, config_paths: &[PathBuf]) -> Option<(ComponentKey, ComponentType)> {
114 if config_paths.iter().any(|p| self.config_paths.contains(p)) {
115 return Some((self.component_key.clone(), self.component_type.clone()));
116 }
117 None
118 }
119}
120
121#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
122pub enum ConfigPath {
123 File(PathBuf, FormatHint),
124 Dir(PathBuf),
125}
126
127impl<'a> From<&'a ConfigPath> for &'a PathBuf {
128 fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
129 match config_path {
130 ConfigPath::File(path, _) => path,
131 ConfigPath::Dir(path) => path,
132 }
133 }
134}
135
136impl ConfigPath {
137 pub const fn as_dir(&self) -> Option<&PathBuf> {
138 match self {
139 Self::Dir(path) => Some(path),
140 _ => None,
141 }
142 }
143}
144
145#[derive(Debug, Default, Serialize)]
146pub struct Config {
147 #[cfg(feature = "api")]
148 pub api: api::Options,
149 pub schema: schema::Options,
150 pub global: GlobalOptions,
151 pub healthchecks: HealthcheckOptions,
152 sources: IndexMap<ComponentKey, SourceOuter>,
153 sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
154 transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
155 pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
156 tests: Vec<TestDefinition>,
157 secret: IndexMap<ComponentKey, SecretBackends>,
158 pub graceful_shutdown_duration: Option<Duration>,
159}
160
161impl Config {
162 pub fn builder() -> builder::ConfigBuilder {
163 Default::default()
164 }
165
166 pub fn is_empty(&self) -> bool {
167 self.sources.is_empty()
168 }
169
170 pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
171 self.sources.iter()
172 }
173
174 pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
175 self.sources.get(id)
176 }
177
178 pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
179 self.transforms.iter()
180 }
181
182 pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
183 self.transforms.get(id)
184 }
185
186 pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
187 self.sinks.iter()
188 }
189
190 pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
191 self.sinks.get(id)
192 }
193
194 pub fn enrichment_tables(
195 &self,
196 ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
197 self.enrichment_tables.iter()
198 }
199
200 pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
201 self.enrichment_tables.get(id)
202 }
203
204 pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
205 self.transforms
206 .get(id)
207 .map(|t| &t.inputs[..])
208 .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
209 .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
210 }
211
212 pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
213 let inputs: Vec<_> = self
214 .sinks
215 .iter()
216 .filter(|(_, sink)| {
217 sink.inner
218 .acknowledgements()
219 .merge_default(&self.global.acknowledgements)
220 .enabled()
221 })
222 .flat_map(|(name, sink)| {
223 sink.inputs
224 .iter()
225 .map(|input| (name.clone(), input.clone()))
226 })
227 .collect();
228 self.propagate_acks_rec(inputs);
229 Ok(())
230 }
231
232 fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
233 for (sink, input) in sink_inputs {
234 let component = &input.component;
235 if let Some(source) = self.sources.get_mut(component) {
236 if source.inner.can_acknowledge() {
237 source.sink_acknowledgements = true;
238 } else {
239 warn!(
240 message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
241 source = component.id(),
242 sink = sink.id(),
243 );
244 }
245 } else if let Some(transform) = self.transforms.get(component) {
246 let inputs = transform
247 .inputs
248 .iter()
249 .map(|input| (sink.clone(), input.clone()))
250 .collect();
251 self.propagate_acks_rec(inputs);
252 }
253 }
254 }
255}
256
257#[configurable_component]
259#[derive(Clone, Copy, Debug)]
260#[serde(default)]
261pub struct HealthcheckOptions {
262 pub enabled: bool,
266
267 pub require_healthy: bool,
273}
274
275impl HealthcheckOptions {
276 pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
277 if let Some(require_healthy) = require_healthy.into() {
278 self.require_healthy = require_healthy;
279 }
280 }
281
282 const fn merge(&mut self, other: Self) {
283 self.enabled &= other.enabled;
284 self.require_healthy |= other.require_healthy;
285 }
286}
287
288impl Default for HealthcheckOptions {
289 fn default() -> Self {
290 Self {
291 enabled: true,
292 require_healthy: false,
293 }
294 }
295}
296
297#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
299pub enum Resource {
300 Port(SocketAddr, Protocol),
301 SystemFdOffset(usize),
302 Fd(u32),
303 DiskBuffer(String),
304}
305
306#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
307pub enum Protocol {
308 Tcp,
309 Udp,
310}
311
312impl Resource {
313 pub const fn tcp(addr: SocketAddr) -> Self {
314 Self::Port(addr, Protocol::Tcp)
315 }
316
317 pub const fn udp(addr: SocketAddr) -> Self {
318 Self::Port(addr, Protocol::Udp)
319 }
320
321 pub fn conflicts<K: Eq + Hash + Clone>(
323 components: impl IntoIterator<Item = (K, Vec<Resource>)>,
324 ) -> HashMap<Resource, HashSet<K>> {
325 let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
326 let mut unspecified = Vec::new();
327
328 for (key, resources) in components {
330 for resource in resources {
331 if let Resource::Port(address, protocol) = &resource {
332 if address.ip().is_unspecified() {
333 unspecified.push((key.clone(), *address, *protocol));
334 }
335 }
336
337 resource_map
338 .entry(resource)
339 .or_default()
340 .insert(key.clone());
341 }
342 }
343
344 for (key, address0, protocol0) in unspecified {
348 for (resource, components) in resource_map.iter_mut() {
349 if let Resource::Port(address, protocol) = resource {
350 if &address0 == address && &protocol0 == protocol {
354 components.insert(key.clone());
355 }
356 }
357 }
358 }
359
360 resource_map.retain(|_, components| components.len() > 1);
361
362 resource_map
363 }
364}
365
366impl Display for Protocol {
367 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
368 match self {
369 Protocol::Udp => write!(fmt, "udp"),
370 Protocol::Tcp => write!(fmt, "tcp"),
371 }
372 }
373}
374
375impl Display for Resource {
376 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
377 match self {
378 Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
379 Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
380 Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
381 Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
382 }
383 }
384}
385
386#[configurable_component]
388#[derive(Clone, Debug)]
389#[serde(deny_unknown_fields)]
390pub struct TestDefinition<T: 'static = OutputId> {
391 pub name: String,
393
394 pub input: Option<TestInput>,
396
397 #[serde(default)]
399 pub inputs: Vec<TestInput>,
400
401 #[serde(default)]
403 pub outputs: Vec<TestOutput<T>>,
404
405 #[serde(default)]
407 pub no_outputs_from: Vec<T>,
408}
409
410impl TestDefinition<String> {
411 fn resolve_outputs(
412 self,
413 graph: &graph::Graph,
414 ) -> Result<TestDefinition<OutputId>, Vec<String>> {
415 let TestDefinition {
416 name,
417 input,
418 inputs,
419 outputs,
420 no_outputs_from,
421 } = self;
422 let mut errors = Vec::new();
423
424 let output_map = graph.input_map().expect("ambiguous outputs");
425
426 let outputs = outputs
427 .into_iter()
428 .map(|old| {
429 let TestOutput {
430 extract_from,
431 conditions,
432 } = old;
433
434 (extract_from.to_vec(), conditions)
435 })
436 .filter_map(|(extract_from, conditions)| {
437 let mut outputs = Vec::new();
438 for from in extract_from {
439 if let Some(output_id) = output_map.get(&from) {
440 outputs.push(output_id.clone());
441 } else {
442 errors.push(format!(
443 r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
444 ));
445 }
446 }
447 if outputs.is_empty() {
448 None
449 } else {
450 Some(TestOutput {
451 extract_from: outputs.into(),
452 conditions,
453 })
454 }
455 })
456 .collect();
457
458 let no_outputs_from = no_outputs_from
459 .into_iter()
460 .filter_map(|o| {
461 if let Some(output_id) = output_map.get(&o) {
462 Some(output_id.clone())
463 } else {
464 errors.push(format!(
465 r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
466 ));
467 None
468 }
469 })
470 .collect();
471
472 if errors.is_empty() {
473 Ok(TestDefinition {
474 name,
475 input,
476 inputs,
477 outputs,
478 no_outputs_from,
479 })
480 } else {
481 Err(errors)
482 }
483 }
484}
485
486impl TestDefinition<OutputId> {
487 fn stringify(self) -> TestDefinition<String> {
488 let TestDefinition {
489 name,
490 input,
491 inputs,
492 outputs,
493 no_outputs_from,
494 } = self;
495
496 let outputs = outputs
497 .into_iter()
498 .map(|old| TestOutput {
499 extract_from: old
500 .extract_from
501 .to_vec()
502 .into_iter()
503 .map(|item| item.to_string())
504 .collect::<Vec<_>>()
505 .into(),
506 conditions: old.conditions,
507 })
508 .collect();
509
510 let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
511
512 TestDefinition {
513 name,
514 input,
515 inputs,
516 outputs,
517 no_outputs_from,
518 }
519 }
520}
521
522#[configurable_component]
527#[derive(Clone, Debug)]
528#[serde(deny_unknown_fields)]
529pub struct TestInput {
530 pub insert_at: ComponentKey,
532
533 #[serde(default = "default_test_input_type", rename = "type")]
537 pub type_str: String,
538
539 pub value: Option<String>,
544
545 pub source: Option<String>,
549
550 pub log_fields: Option<IndexMap<String, Value>>,
554
555 pub metric: Option<Metric>,
559}
560
561fn default_test_input_type() -> String {
562 "raw".to_string()
563}
564
565#[configurable_component]
570#[derive(Clone, Debug)]
571#[serde(deny_unknown_fields)]
572pub struct TestOutput<T: 'static = OutputId> {
573 pub extract_from: OneOrMany<T>,
575
576 pub conditions: Option<Vec<conditions::AnyCondition>>,
578}
579
580#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
581mod tests {
582 use std::{collections::HashMap, path::PathBuf};
583
584 use crate::{config, topology};
585 use indoc::indoc;
586
587 use super::{builder::ConfigBuilder, format, load_from_str, ComponentKey, ConfigDiff, Format};
588
589 async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
590 match config::load_from_str(config, format) {
591 Ok(c) => {
592 let diff = ConfigDiff::initial(&c);
593 let c2 = config::load_from_str(config, format).unwrap();
594 match (
595 config::warnings(&c2),
596 topology::TopologyPieces::build(&c, &diff, HashMap::new(), Default::default())
597 .await,
598 ) {
599 (warnings, Ok(_pieces)) => Ok(warnings),
600 (_, Err(errors)) => Err(errors),
601 }
602 }
603 Err(error) => Err(error),
604 }
605 }
606
607 #[tokio::test]
608 async fn bad_inputs() {
609 let err = load(
610 r#"
611 [sources.in]
612 type = "test_basic"
613
614 [transforms.sample]
615 type = "test_basic"
616 inputs = []
617 suffix = "foo"
618 increase = 1.25
619
620 [transforms.sample2]
621 type = "test_basic"
622 inputs = ["qwerty"]
623 suffix = "foo"
624 increase = 1.25
625
626 [sinks.out]
627 type = "test_basic"
628 inputs = ["asdf", "in", "in"]
629 "#,
630 Format::Toml,
631 )
632 .await
633 .unwrap_err();
634
635 assert_eq!(
636 vec![
637 "Sink \"out\" has input \"in\" duplicated 2 times",
638 "Transform \"sample\" has no inputs",
639 "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
640 "Input \"asdf\" for sink \"out\" doesn't match any components.",
641 ],
642 err,
643 );
644 }
645
646 #[tokio::test]
647 async fn duplicate_name() {
648 let err = load(
649 r#"
650 [sources.foo]
651 type = "test_basic"
652
653 [sources.bar]
654 type = "test_basic"
655
656 [transforms.foo]
657 type = "test_basic"
658 inputs = ["bar"]
659 suffix = "foo"
660 increase = 1.25
661
662 [sinks.out]
663 type = "test_basic"
664 inputs = ["foo"]
665 "#,
666 Format::Toml,
667 )
668 .await
669 .unwrap_err();
670
671 assert_eq!(
672 err,
673 vec!["More than one component with name \"foo\" (source, transform).",]
674 );
675 }
676
677 #[tokio::test]
678 #[cfg(unix)]
679 async fn conflicting_stdin_and_fd_resources() {
680 let errors = load(
681 r#"
682 [sources.stdin]
683 type = "stdin"
684
685 [sources.file_descriptor]
686 type = "file_descriptor"
687 fd = 0
688
689 [sinks.out]
690 type = "test_basic"
691 inputs = ["stdin", "file_descriptor"]
692 "#,
693 Format::Toml,
694 )
695 .await
696 .unwrap_err();
697
698 assert_eq!(errors.len(), 1);
699 let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
700 assert!(errors[0].starts_with(expected_prefix));
701 }
702
703 #[tokio::test]
704 #[cfg(unix)]
705 async fn conflicting_fd_resources() {
706 let errors = load(
707 r#"
708 [sources.file_descriptor1]
709 type = "file_descriptor"
710 fd = 10
711 [sources.file_descriptor2]
712 type = "file_descriptor"
713 fd = 10
714 [sinks.out]
715 type = "test_basic"
716 inputs = ["file_descriptor1", "file_descriptor2"]
717 "#,
718 Format::Toml,
719 )
720 .await
721 .unwrap_err();
722
723 assert_eq!(errors.len(), 1);
724 let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
725 assert!(errors[0].starts_with(expected_prefix));
726 }
727
728 #[tokio::test]
729 #[cfg(all(unix, feature = "sources-file_descriptor"))]
730 async fn no_conflict_fd_resources() {
731 use crate::sources::file_descriptors::file_descriptor::null_fd;
732 let fd1 = null_fd().unwrap();
733 let fd2 = null_fd().unwrap();
734 let result = load(
735 &format!(
736 r#"
737 [sources.file_descriptor1]
738 type = "file_descriptor"
739 fd = {fd1}
740
741 [sources.file_descriptor2]
742 type = "file_descriptor"
743 fd = {fd2}
744
745 [sinks.out]
746 type = "test_basic"
747 inputs = ["file_descriptor1", "file_descriptor2"]
748 "#
749 ),
750 Format::Toml,
751 )
752 .await;
753
754 let expected = Ok(vec![]);
755 assert_eq!(result, expected);
756 }
757
758 #[tokio::test]
759 async fn warnings() {
760 let warnings = load(
761 r#"
762 [sources.in1]
763 type = "test_basic"
764
765 [sources.in2]
766 type = "test_basic"
767
768 [transforms.sample1]
769 type = "test_basic"
770 inputs = ["in1"]
771 suffix = "foo"
772 increase = 1.25
773
774 [transforms.sample2]
775 type = "test_basic"
776 inputs = ["in1"]
777 suffix = "foo"
778 increase = 1.25
779
780 [sinks.out]
781 type = "test_basic"
782 inputs = ["sample1"]
783 "#,
784 Format::Toml,
785 )
786 .await
787 .unwrap();
788
789 assert_eq!(
790 warnings,
791 vec![
792 "Transform \"sample2\" has no consumers",
793 "Source \"in2\" has no consumers",
794 ]
795 )
796 }
797
798 #[tokio::test]
799 async fn cycle() {
800 let errors = load(
801 r#"
802 [sources.in]
803 type = "test_basic"
804
805 [transforms.one]
806 type = "test_basic"
807 inputs = ["in"]
808 suffix = "foo"
809 increase = 1.25
810
811 [transforms.two]
812 type = "test_basic"
813 inputs = ["one", "four"]
814 suffix = "foo"
815 increase = 1.25
816
817 [transforms.three]
818 type = "test_basic"
819 inputs = ["two"]
820 suffix = "foo"
821 increase = 1.25
822
823 [transforms.four]
824 type = "test_basic"
825 inputs = ["three"]
826 suffix = "foo"
827 increase = 1.25
828
829 [sinks.out]
830 type = "test_basic"
831 inputs = ["four"]
832 "#,
833 Format::Toml,
834 )
835 .await
836 .unwrap_err();
837
838 assert_eq!(
839 errors,
840 vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
841 )
842 }
843
844 #[test]
845 fn default_data_dir() {
846 let config = load_from_str(
847 indoc! {r#"
848 [sources.in]
849 type = "test_basic"
850
851 [sinks.out]
852 type = "test_basic"
853 inputs = ["in"]
854 "#},
855 Format::Toml,
856 )
857 .unwrap();
858
859 assert_eq!(
860 Some(PathBuf::from("/var/lib/vector")),
861 config.global.data_dir
862 )
863 }
864
865 #[test]
866 fn default_schema() {
867 let config = load_from_str(
868 indoc! {r#"
869 [sources.in]
870 type = "test_basic"
871
872 [sinks.out]
873 type = "test_basic"
874 inputs = ["in"]
875 "#},
876 Format::Toml,
877 )
878 .unwrap();
879
880 assert_eq!(
881 "host",
882 config.global.log_schema.host_key().unwrap().to_string()
883 );
884 assert_eq!(
885 "message",
886 config.global.log_schema.message_key().unwrap().to_string()
887 );
888 assert_eq!(
889 "timestamp",
890 config
891 .global
892 .log_schema
893 .timestamp_key()
894 .unwrap()
895 .to_string()
896 );
897 }
898
899 #[test]
900 fn custom_schema() {
901 let config = load_from_str(
902 indoc! {r#"
903 [log_schema]
904 host_key = "this"
905 message_key = "that"
906 timestamp_key = "then"
907
908 [sources.in]
909 type = "test_basic"
910
911 [sinks.out]
912 type = "test_basic"
913 inputs = ["in"]
914 "#},
915 Format::Toml,
916 )
917 .unwrap();
918
919 assert_eq!(
920 "this",
921 config.global.log_schema.host_key().unwrap().to_string()
922 );
923 assert_eq!(
924 "that",
925 config.global.log_schema.message_key().unwrap().to_string()
926 );
927 assert_eq!(
928 "then",
929 config
930 .global
931 .log_schema
932 .timestamp_key()
933 .unwrap()
934 .to_string()
935 );
936 }
937
938 #[test]
939 fn config_append() {
940 let mut config: ConfigBuilder = format::deserialize(
941 indoc! {r#"
942 [sources.in]
943 type = "test_basic"
944
945 [sinks.out]
946 type = "test_basic"
947 inputs = ["in"]
948 "#},
949 Format::Toml,
950 )
951 .unwrap();
952
953 assert_eq!(
954 config.append(
955 format::deserialize(
956 indoc! {r#"
957 data_dir = "/foobar"
958
959 [proxy]
960 http = "http://proxy.inc:3128"
961
962 [transforms.foo]
963 type = "test_basic"
964 inputs = [ "in" ]
965 suffix = "foo"
966 increase = 1.25
967
968 [[tests]]
969 name = "check_simple_log"
970 [tests.input]
971 insert_at = "foo"
972 type = "raw"
973 value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
974 [[tests.outputs]]
975 extract_from = "foo"
976 [[tests.outputs.conditions]]
977 type = "vrl"
978 source = ".message == \"Sorry, I'm busy this week Cecil\""
979 "#},
980 Format::Toml,
981 )
982 .unwrap()
983 ),
984 Ok(())
985 );
986
987 assert!(config.global.proxy.http.is_some());
988 assert!(config.global.proxy.https.is_none());
989 assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
990 assert!(config.sources.contains_key(&ComponentKey::from("in")));
991 assert!(config.sinks.contains_key(&ComponentKey::from("out")));
992 assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
993 assert_eq!(config.tests.len(), 1);
994 }
995
996 #[test]
997 fn config_append_collisions() {
998 let mut config: ConfigBuilder = format::deserialize(
999 indoc! {r#"
1000 [sources.in]
1001 type = "test_basic"
1002
1003 [sinks.out]
1004 type = "test_basic"
1005 inputs = ["in"]
1006 "#},
1007 Format::Toml,
1008 )
1009 .unwrap();
1010
1011 assert_eq!(
1012 config.append(
1013 format::deserialize(
1014 indoc! {r#"
1015 [sources.in]
1016 type = "test_basic"
1017
1018 [transforms.foo]
1019 type = "test_basic"
1020 inputs = [ "in" ]
1021 suffix = "foo"
1022 increase = 1.25
1023
1024 [sinks.out]
1025 type = "test_basic"
1026 inputs = ["in"]
1027 "#},
1028 Format::Toml,
1029 )
1030 .unwrap()
1031 ),
1032 Err(vec![
1033 "duplicate source id found: in".into(),
1034 "duplicate sink id found: out".into(),
1035 ])
1036 );
1037 }
1038
1039 #[test]
1040 fn with_proxy() {
1041 let config: ConfigBuilder = format::deserialize(
1042 indoc! {r#"
1043 [proxy]
1044 http = "http://server:3128"
1045 https = "http://other:3128"
1046 no_proxy = ["localhost", "127.0.0.1"]
1047
1048 [sources.in]
1049 type = "nginx_metrics"
1050 endpoints = ["http://localhost:8000/basic_status"]
1051 proxy.http = "http://server:3128"
1052 proxy.https = "http://other:3128"
1053 proxy.no_proxy = ["localhost", "127.0.0.1"]
1054
1055 [sinks.out]
1056 type = "console"
1057 inputs = ["in"]
1058 encoding.codec = "json"
1059 "#},
1060 Format::Toml,
1061 )
1062 .unwrap();
1063 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1064 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1065 assert!(config.global.proxy.no_proxy.matches("localhost"));
1066 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1067 assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1068 assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1069 assert!(source.proxy.no_proxy.matches("localhost"));
1070 }
1071
1072 #[test]
1073 fn with_partial_global_proxy() {
1074 let config: ConfigBuilder = format::deserialize(
1075 indoc! {r#"
1076 [proxy]
1077 http = "http://server:3128"
1078
1079 [sources.in]
1080 type = "nginx_metrics"
1081 endpoints = ["http://localhost:8000/basic_status"]
1082
1083 [sources.in.proxy]
1084 http = "http://server:3129"
1085 https = "http://other:3129"
1086 no_proxy = ["localhost", "127.0.0.1"]
1087
1088 [sinks.out]
1089 type = "console"
1090 inputs = ["in"]
1091 encoding.codec = "json"
1092 "#},
1093 Format::Toml,
1094 )
1095 .unwrap();
1096 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1097 assert_eq!(config.global.proxy.https, None);
1098 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1099 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1100 assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1101 assert!(source.proxy.no_proxy.matches("localhost"));
1102 }
1103
1104 #[test]
1105 fn with_partial_source_proxy() {
1106 let config: ConfigBuilder = format::deserialize(
1107 indoc! {r#"
1108 [proxy]
1109 http = "http://server:3128"
1110 https = "http://other:3128"
1111
1112 [sources.in]
1113 type = "nginx_metrics"
1114 endpoints = ["http://localhost:8000/basic_status"]
1115
1116 [sources.in.proxy]
1117 http = "http://server:3129"
1118 no_proxy = ["localhost", "127.0.0.1"]
1119
1120 [sinks.out]
1121 type = "console"
1122 inputs = ["in"]
1123 encoding.codec = "json"
1124 "#},
1125 Format::Toml,
1126 )
1127 .unwrap();
1128 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1129 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1130 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1131 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1132 assert_eq!(source.proxy.https, None);
1133 assert!(source.proxy.no_proxy.matches("localhost"));
1134 }
1135}
1136
1137#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1138mod acknowledgements_tests {
1139 use indoc::indoc;
1140
1141 use super::*;
1142
1143 #[test]
1144 fn propagates_settings() {
1145 let config: ConfigBuilder = format::deserialize(
1150 indoc! {r#"
1151 data_dir = "/tmp"
1152 [sources.in1]
1153 type = "file"
1154 include = ["/var/log/**/*.log"]
1155 [sources.in2]
1156 type = "file"
1157 include = ["/var/log/**/*.log"]
1158 [sources.in3]
1159 type = "file"
1160 include = ["/var/log/**/*.log"]
1161 [transforms.parse3]
1162 type = "test_basic"
1163 inputs = ["in3"]
1164 increase = 0.0
1165 suffix = ""
1166 [sinks.out1]
1167 type = "file"
1168 inputs = ["in1"]
1169 encoding.codec = "text"
1170 path = "/path/to/out1"
1171 [sinks.out2]
1172 type = "file"
1173 inputs = ["in2"]
1174 encoding.codec = "text"
1175 path = "/path/to/out2"
1176 acknowledgements = true
1177 [sinks.out3]
1178 type = "file"
1179 inputs = ["parse3"]
1180 encoding.codec = "text"
1181 path = "/path/to/out3"
1182 acknowledgements.enabled = true
1183 "#},
1184 Format::Toml,
1185 )
1186 .unwrap();
1187
1188 for source in config.sources.values() {
1189 assert!(
1190 !source.sink_acknowledgements,
1191 "Source `sink_acknowledgements` should be `false` before propagation"
1192 );
1193 }
1194
1195 let config = config.build().unwrap();
1196
1197 let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1198 assert!(!get("in1").sink_acknowledgements);
1199 assert!(get("in2").sink_acknowledgements);
1200 assert!(get("in3").sink_acknowledgements);
1201 }
1202}
1203
1204#[cfg(test)]
1205mod resource_tests {
1206 use std::{
1207 collections::{HashMap, HashSet},
1208 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1209 };
1210
1211 use proptest::prelude::*;
1212
1213 use super::Resource;
1214
1215 fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1216 Resource::tcp(SocketAddr::new(addr.into(), port))
1217 }
1218
1219 fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1220 Resource::udp(SocketAddr::new(addr.into(), port))
1221 }
1222
1223 fn unspecified() -> impl Strategy<Value = IpAddr> {
1224 prop_oneof![
1225 Just(Ipv4Addr::UNSPECIFIED.into()),
1226 Just(Ipv6Addr::UNSPECIFIED.into()),
1227 ]
1228 }
1229
1230 fn specaddr() -> impl Strategy<Value = IpAddr> {
1231 any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1232 }
1233
1234 fn specport() -> impl Strategy<Value = u16> {
1235 any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1236 }
1237
1238 fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1239 conflicts
1240 .into_iter()
1241 .map(|(key, values)| (key, values.into_iter().collect()))
1242 .collect()
1243 }
1244
1245 proptest! {
1246 #[test]
1247 fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1248 prop_assume!(port1 != port2);
1249 let components = vec![
1250 ("sink_0", vec![tcp(addr, 0)]),
1251 ("sink_1", vec![tcp(addr, port1)]),
1252 ("sink_2", vec![tcp(addr, port2)]),
1253 ];
1254 let conflicting = Resource::conflicts(components);
1255 assert_eq!(conflicting, HashMap::new());
1256 }
1257
1258 #[test]
1259 fn conflicting_pair(addr: IpAddr, port in specport()) {
1260 let components = vec![
1261 ("sink_0", vec![tcp(addr, 0)]),
1262 ("sink_1", vec![tcp(addr, port)]),
1263 ("sink_2", vec![tcp(addr, port)]),
1264 ];
1265 let conflicting = Resource::conflicts(components);
1266 assert_eq!(
1267 conflicting,
1268 hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1269 );
1270 }
1271
1272 #[test]
1273 fn conflicting_multi(addr: IpAddr, port in specport()) {
1274 let components = vec![
1275 ("sink_0", vec![tcp(addr, 0)]),
1276 ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1277 ("sink_2", vec![tcp(addr, port)]),
1278 ];
1279 let conflicting = Resource::conflicts(components);
1280 assert_eq!(
1281 conflicting,
1282 hashmap(vec![
1283 (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1284 (tcp(addr, port), vec!["sink_1", "sink_2"])
1285 ])
1286 );
1287 }
1288
1289 #[test]
1290 fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1291 prop_assume!(addr1 != addr2);
1292 let components = vec![
1293 ("sink_0", vec![tcp(addr1, port)]),
1294 ("sink_1", vec![tcp(addr2, port)]),
1295 ];
1296 let conflicting = Resource::conflicts(components);
1297 assert_eq!(conflicting, HashMap::new());
1298 }
1299
1300 #[test]
1301 fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1302 let components = vec![
1303 ("sink_0", vec![tcp(addr, port)]),
1304 ("sink_1", vec![tcp(unspec, port)]),
1305 ];
1306 let conflicting = Resource::conflicts(components);
1307 assert_eq!(conflicting, HashMap::new());
1308 }
1309
1310 #[test]
1311 fn different_protocol(addr: IpAddr) {
1312 let components = vec![
1313 ("sink_0", vec![tcp(addr, 0)]),
1314 ("sink_1", vec![udp(addr, 0)]),
1315 ];
1316 let conflicting = Resource::conflicts(components);
1317 assert_eq!(conflicting, HashMap::new());
1318 }
1319 }
1320
1321 #[test]
1322 fn different_unspecified_ip_version() {
1323 let components = vec![
1324 ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1325 ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1326 ];
1327 let conflicting = Resource::conflicts(components);
1328 assert_eq!(conflicting, HashMap::new());
1329 }
1330}
1331
1332#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1333mod resource_config_tests {
1334 use indoc::indoc;
1335 use vector_lib::configurable::schema::generate_root_schema;
1336
1337 use super::{load_from_str, Format};
1338
1339 #[test]
1340 fn config_conflict_detected() {
1341 assert!(load_from_str(
1342 indoc! {r#"
1343 [sources.in0]
1344 type = "stdin"
1345
1346 [sources.in1]
1347 type = "stdin"
1348
1349 [sinks.out]
1350 type = "console"
1351 inputs = ["in0","in1"]
1352 encoding.codec = "json"
1353 "#},
1354 Format::Toml,
1355 )
1356 .is_err());
1357 }
1358
1359 #[test]
1360 #[ignore]
1361 #[allow(clippy::print_stdout)]
1362 #[allow(clippy::print_stderr)]
1363 fn generate_component_config_schema() {
1364 use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1365 use indexmap::IndexMap;
1366 use vector_lib::config::ComponentKey;
1367 use vector_lib::configurable::configurable_component;
1368
1369 #[configurable_component]
1371 #[derive(Clone)]
1372 struct ComponentsOnlyConfig {
1373 #[serde(default)]
1375 pub sources: IndexMap<ComponentKey, SourceOuter>,
1376
1377 #[serde(default)]
1379 pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1380
1381 #[serde(default)]
1383 pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1384 }
1385
1386 match generate_root_schema::<ComponentsOnlyConfig>() {
1387 Ok(schema) => {
1388 let json = serde_json::to_string_pretty(&schema)
1389 .expect("rendering root schema to JSON should not fail");
1390
1391 println!("{json}");
1392 }
1393 Err(e) => eprintln!("error while generating schema: {e:?}"),
1394 }
1395 }
1396}