1#![allow(missing_docs)]
2use std::{
3 collections::{HashMap, HashSet},
4 fmt::{self, Display, Formatter},
5 fs,
6 hash::Hash,
7 net::SocketAddr,
8 path::PathBuf,
9 time::Duration,
10};
11
12use indexmap::IndexMap;
13use serde::Serialize;
14use vector_config::configurable_component;
15pub use vector_lib::{
16 config::{
17 AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
18 SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
19 },
20 configurable::component::{GenerateConfig, SinkDescription, TransformDescription},
21};
22
23use crate::{
24 conditions,
25 event::{Metric, Value},
26 secrets::SecretBackends,
27 serde::OneOrMany,
28};
29
30pub mod api;
31mod builder;
32mod cmd;
33mod compiler;
34mod diff;
35pub mod dot_graph;
36mod enrichment_table;
37pub mod format;
38mod graph;
39mod loading;
40pub mod provider;
41pub mod schema;
42mod secret;
43mod sink;
44mod source;
45mod transform;
46pub mod unit_test;
47mod validation;
48mod vars;
49pub mod watcher;
50
51pub use builder::ConfigBuilder;
52pub use cmd::{Opts, cmd};
53pub use diff::ConfigDiff;
54pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
55pub use format::{Format, FormatHint};
56pub use loading::{
57 COLLECTOR, CONFIG_PATHS, load, load_builder_from_paths, load_from_paths,
58 load_from_paths_with_provider_and_secrets, load_from_str, load_from_str_with_secrets,
59 load_source_from_paths, merge_path_lists, process_paths,
60};
61pub use provider::ProviderConfig;
62pub use secret::SecretBackend;
63pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
64pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
65pub use transform::{
66 BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids,
67};
68pub use unit_test::{UnitTestResult, build_unit_tests, build_unit_tests_main};
69pub use validation::warnings;
70pub use vars::{ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX, interpolate};
71pub use vector_lib::{
72 config::{
73 ComponentKey, LogSchema, OutputId, init_log_schema, init_telemetry, log_schema,
74 proxy::ProxyConfig, telemetry,
75 },
76 id::Inputs,
77};
78
79#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80pub enum ComponentType {
82 Transform,
83 Sink,
84 EnrichmentTable,
85}
86
87#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
88pub struct ComponentConfig {
89 pub config_paths: Vec<PathBuf>,
90 pub component_key: ComponentKey,
91 pub component_type: ComponentType,
92}
93
94impl ComponentConfig {
95 pub fn new(
96 config_paths: Vec<PathBuf>,
97 component_key: ComponentKey,
98 component_type: ComponentType,
99 ) -> Self {
100 let canonicalized_paths = config_paths
101 .into_iter()
102 .filter_map(|p| fs::canonicalize(p).ok())
103 .collect();
104
105 Self {
106 config_paths: canonicalized_paths,
107 component_key,
108 component_type,
109 }
110 }
111
112 pub fn contains(
113 &self,
114 config_paths: &HashSet<PathBuf>,
115 ) -> Option<(ComponentKey, ComponentType)> {
116 if config_paths.iter().any(|p| self.config_paths.contains(p)) {
117 return Some((self.component_key.clone(), self.component_type.clone()));
118 }
119 None
120 }
121}
122
123#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
124pub enum ConfigPath {
125 File(PathBuf, FormatHint),
126 Dir(PathBuf),
127}
128
129impl<'a> From<&'a ConfigPath> for &'a PathBuf {
130 fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
131 match config_path {
132 ConfigPath::File(path, _) => path,
133 ConfigPath::Dir(path) => path,
134 }
135 }
136}
137
138impl ConfigPath {
139 pub const fn as_dir(&self) -> Option<&PathBuf> {
140 match self {
141 Self::Dir(path) => Some(path),
142 _ => None,
143 }
144 }
145}
146
147#[derive(Debug, Default, Serialize)]
148pub struct Config {
149 #[cfg(feature = "api")]
150 pub api: api::Options,
151 pub schema: schema::Options,
152 pub global: GlobalOptions,
153 pub healthchecks: HealthcheckOptions,
154 sources: IndexMap<ComponentKey, SourceOuter>,
155 sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
156 transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
157 pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
158 tests: Vec<TestDefinition>,
159 secret: IndexMap<ComponentKey, SecretBackends>,
160 pub graceful_shutdown_duration: Option<Duration>,
161}
162
163impl Config {
164 pub fn builder() -> builder::ConfigBuilder {
165 Default::default()
166 }
167
168 pub fn is_empty(&self) -> bool {
169 self.sources.is_empty()
170 }
171
172 pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
173 self.sources.iter()
174 }
175
176 pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
177 self.sources.get(id)
178 }
179
180 pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
181 self.transforms.iter()
182 }
183
184 pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
185 self.transforms.get(id)
186 }
187
188 pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
189 self.sinks.iter()
190 }
191
192 pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
193 self.sinks.get(id)
194 }
195
196 pub fn enrichment_tables(
197 &self,
198 ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
199 self.enrichment_tables.iter()
200 }
201
202 pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
203 self.enrichment_tables.get(id)
204 }
205
206 pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
207 self.transforms
208 .get(id)
209 .map(|t| &t.inputs[..])
210 .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
211 .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
212 }
213
214 pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
215 let inputs: Vec<_> = self
216 .sinks
217 .iter()
218 .filter(|(_, sink)| {
219 sink.inner
220 .acknowledgements()
221 .merge_default(&self.global.acknowledgements)
222 .enabled()
223 })
224 .flat_map(|(name, sink)| {
225 sink.inputs
226 .iter()
227 .map(|input| (name.clone(), input.clone()))
228 })
229 .collect();
230 self.propagate_acks_rec(inputs);
231 Ok(())
232 }
233
234 fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
235 for (sink, input) in sink_inputs {
236 let component = &input.component;
237 if let Some(source) = self.sources.get_mut(component) {
238 if source.inner.can_acknowledge() {
239 source.sink_acknowledgements = true;
240 } else {
241 warn!(
242 message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
243 source = component.id(),
244 sink = sink.id(),
245 );
246 }
247 } else if let Some(transform) = self.transforms.get(component) {
248 let inputs = transform
249 .inputs
250 .iter()
251 .map(|input| (sink.clone(), input.clone()))
252 .collect();
253 self.propagate_acks_rec(inputs);
254 }
255 }
256 }
257
258 pub fn transform_keys_with_external_files(&self) -> HashSet<ComponentKey> {
259 self.transforms
260 .iter()
261 .filter_map(|(name, transform_outer)| {
262 if !transform_outer.inner.files_to_watch().is_empty() {
263 Some(name.clone())
264 } else {
265 None
266 }
267 })
268 .collect()
269 }
270}
271
272#[configurable_component]
274#[derive(Clone, Copy, Debug)]
275#[serde(default)]
276pub struct HealthcheckOptions {
277 pub enabled: bool,
281
282 pub require_healthy: bool,
288}
289
290impl HealthcheckOptions {
291 pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
292 if let Some(require_healthy) = require_healthy.into() {
293 self.require_healthy = require_healthy;
294 }
295 }
296
297 const fn merge(&mut self, other: Self) {
298 self.enabled &= other.enabled;
299 self.require_healthy |= other.require_healthy;
300 }
301}
302
303impl Default for HealthcheckOptions {
304 fn default() -> Self {
305 Self {
306 enabled: true,
307 require_healthy: false,
308 }
309 }
310}
311
312#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
314pub enum Resource {
315 Port(SocketAddr, Protocol),
316 SystemFdOffset(usize),
317 Fd(u32),
318 DiskBuffer(String),
319}
320
321#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
322pub enum Protocol {
323 Tcp,
324 Udp,
325}
326
327impl Resource {
328 pub const fn tcp(addr: SocketAddr) -> Self {
329 Self::Port(addr, Protocol::Tcp)
330 }
331
332 pub const fn udp(addr: SocketAddr) -> Self {
333 Self::Port(addr, Protocol::Udp)
334 }
335
336 pub fn conflicts<K: Eq + Hash + Clone>(
338 components: impl IntoIterator<Item = (K, Vec<Resource>)>,
339 ) -> HashMap<Resource, HashSet<K>> {
340 let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
341 let mut unspecified = Vec::new();
342
343 for (key, resources) in components {
345 for resource in resources {
346 if let Resource::Port(address, protocol) = &resource
347 && address.ip().is_unspecified()
348 {
349 unspecified.push((key.clone(), *address, *protocol));
350 }
351
352 resource_map
353 .entry(resource)
354 .or_default()
355 .insert(key.clone());
356 }
357 }
358
359 for (key, address0, protocol0) in unspecified {
363 for (resource, components) in resource_map.iter_mut() {
364 if let Resource::Port(address, protocol) = resource {
365 if &address0 == address && &protocol0 == protocol {
369 components.insert(key.clone());
370 }
371 }
372 }
373 }
374
375 resource_map.retain(|_, components| components.len() > 1);
376
377 resource_map
378 }
379}
380
381impl Display for Protocol {
382 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
383 match self {
384 Protocol::Udp => write!(fmt, "udp"),
385 Protocol::Tcp => write!(fmt, "tcp"),
386 }
387 }
388}
389
390impl Display for Resource {
391 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
392 match self {
393 Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
394 Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
395 Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
396 Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
397 }
398 }
399}
400
401#[configurable_component]
403#[derive(Clone, Debug)]
404#[serde(deny_unknown_fields)]
405pub struct TestDefinition<T: 'static = OutputId> {
406 pub name: String,
408
409 pub input: Option<TestInput>,
411
412 #[serde(default)]
414 pub inputs: Vec<TestInput>,
415
416 #[serde(default)]
418 pub outputs: Vec<TestOutput<T>>,
419
420 #[serde(default)]
422 pub no_outputs_from: Vec<T>,
423}
424
425impl TestDefinition<String> {
426 fn resolve_outputs(
427 self,
428 graph: &graph::Graph,
429 ) -> Result<TestDefinition<OutputId>, Vec<String>> {
430 let TestDefinition {
431 name,
432 input,
433 inputs,
434 outputs,
435 no_outputs_from,
436 } = self;
437 let mut errors = Vec::new();
438
439 let output_map = graph.input_map().expect("ambiguous outputs");
440
441 let outputs = outputs
442 .into_iter()
443 .map(|old| {
444 let TestOutput {
445 extract_from,
446 conditions,
447 } = old;
448
449 (extract_from.to_vec(), conditions)
450 })
451 .filter_map(|(extract_from, conditions)| {
452 let mut outputs = Vec::new();
453 for from in extract_from {
454 if no_outputs_from.contains(&from) {
455 errors.push(format!(
456 r#"Invalid extract_from target in test '{name}': '{from}' listed in no_outputs_from"#
457 ));
458 } else if let Some(output_id) = output_map.get(&from) {
459 outputs.push(output_id.clone());
460 } else {
461 errors.push(format!(
462 r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
463 ));
464 }
465 }
466 if outputs.is_empty() {
467 None
468 } else {
469 Some(TestOutput {
470 extract_from: outputs.into(),
471 conditions,
472 })
473 }
474 })
475 .collect();
476
477 let no_outputs_from = no_outputs_from
478 .into_iter()
479 .filter_map(|o| {
480 if let Some(output_id) = output_map.get(&o) {
481 Some(output_id.clone())
482 } else {
483 errors.push(format!(
484 r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
485 ));
486 None
487 }
488 })
489 .collect();
490
491 if errors.is_empty() {
492 Ok(TestDefinition {
493 name,
494 input,
495 inputs,
496 outputs,
497 no_outputs_from,
498 })
499 } else {
500 Err(errors)
501 }
502 }
503}
504
505impl TestDefinition<OutputId> {
506 fn stringify(self) -> TestDefinition<String> {
507 let TestDefinition {
508 name,
509 input,
510 inputs,
511 outputs,
512 no_outputs_from,
513 } = self;
514
515 let outputs = outputs
516 .into_iter()
517 .map(|old| TestOutput {
518 extract_from: old
519 .extract_from
520 .to_vec()
521 .into_iter()
522 .map(|item| item.to_string())
523 .collect::<Vec<_>>()
524 .into(),
525 conditions: old.conditions,
526 })
527 .collect();
528
529 let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
530
531 TestDefinition {
532 name,
533 input,
534 inputs,
535 outputs,
536 no_outputs_from,
537 }
538 }
539}
540
541#[configurable_component]
546#[derive(Clone, Debug)]
547#[serde(deny_unknown_fields)]
548pub struct TestInput {
549 pub insert_at: ComponentKey,
551
552 #[serde(default = "default_test_input_type", rename = "type")]
556 pub type_str: String,
557
558 pub value: Option<String>,
563
564 pub source: Option<String>,
568
569 pub log_fields: Option<IndexMap<String, Value>>,
573
574 pub metric: Option<Metric>,
578}
579
580fn default_test_input_type() -> String {
581 "raw".to_string()
582}
583
584#[configurable_component]
589#[derive(Clone, Debug)]
590#[serde(deny_unknown_fields)]
591pub struct TestOutput<T: 'static = OutputId> {
592 pub extract_from: OneOrMany<T>,
594
595 pub conditions: Option<Vec<conditions::AnyCondition>>,
597}
598
599#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
600mod tests {
601 use std::path::PathBuf;
602
603 use indoc::indoc;
604
605 use super::{ComponentKey, ConfigDiff, Format, builder::ConfigBuilder, format, load_from_str};
606 use crate::{config, topology::builder::TopologyPiecesBuilder};
607
608 async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
609 match config::load_from_str(config, format) {
610 Ok(c) => {
611 let diff = ConfigDiff::initial(&c);
612 let c2 = config::load_from_str(config, format).unwrap();
613 match (
614 config::warnings(&c2),
615 TopologyPiecesBuilder::new(&c, &diff).build().await,
616 ) {
617 (warnings, Ok(_pieces)) => Ok(warnings),
618 (_, Err(errors)) => Err(errors),
619 }
620 }
621 Err(error) => Err(error),
622 }
623 }
624
625 #[tokio::test]
626 async fn bad_inputs() {
627 let err = load(
628 r#"
629 [sources.in]
630 type = "test_basic"
631
632 [transforms.sample]
633 type = "test_basic"
634 inputs = []
635 suffix = "foo"
636 increase = 1.25
637
638 [transforms.sample2]
639 type = "test_basic"
640 inputs = ["qwerty"]
641 suffix = "foo"
642 increase = 1.25
643
644 [sinks.out]
645 type = "test_basic"
646 inputs = ["asdf", "in", "in"]
647 "#,
648 Format::Toml,
649 )
650 .await
651 .unwrap_err();
652
653 assert_eq!(
654 vec![
655 "Sink \"out\" has input \"in\" duplicated 2 times",
656 "Transform \"sample\" has no inputs",
657 "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
658 "Input \"asdf\" for sink \"out\" doesn't match any components.",
659 ],
660 err,
661 );
662 }
663
664 #[tokio::test]
665 async fn duplicate_name() {
666 let err = load(
667 r#"
668 [sources.foo]
669 type = "test_basic"
670
671 [sources.bar]
672 type = "test_basic"
673
674 [transforms.foo]
675 type = "test_basic"
676 inputs = ["bar"]
677 suffix = "foo"
678 increase = 1.25
679
680 [sinks.out]
681 type = "test_basic"
682 inputs = ["foo"]
683 "#,
684 Format::Toml,
685 )
686 .await
687 .unwrap_err();
688
689 assert_eq!(
690 err,
691 vec!["More than one component with name \"foo\" (source, transform).",]
692 );
693 }
694
695 #[tokio::test]
696 #[cfg(unix)]
697 async fn conflicting_stdin_and_fd_resources() {
698 let errors = load(
699 r#"
700 [sources.stdin]
701 type = "stdin"
702
703 [sources.file_descriptor]
704 type = "file_descriptor"
705 fd = 0
706
707 [sinks.out]
708 type = "test_basic"
709 inputs = ["stdin", "file_descriptor"]
710 "#,
711 Format::Toml,
712 )
713 .await
714 .unwrap_err();
715
716 assert_eq!(errors.len(), 1);
717 let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
718 assert!(errors[0].starts_with(expected_prefix));
719 }
720
721 #[tokio::test]
722 #[cfg(unix)]
723 async fn conflicting_fd_resources() {
724 let errors = load(
725 r#"
726 [sources.file_descriptor1]
727 type = "file_descriptor"
728 fd = 10
729 [sources.file_descriptor2]
730 type = "file_descriptor"
731 fd = 10
732 [sinks.out]
733 type = "test_basic"
734 inputs = ["file_descriptor1", "file_descriptor2"]
735 "#,
736 Format::Toml,
737 )
738 .await
739 .unwrap_err();
740
741 assert_eq!(errors.len(), 1);
742 let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
743 assert!(errors[0].starts_with(expected_prefix));
744 }
745
746 #[tokio::test]
747 #[cfg(all(unix, feature = "sources-file_descriptor"))]
748 async fn no_conflict_fd_resources() {
749 use crate::sources::file_descriptors::file_descriptor::null_fd;
750 let fd1 = null_fd().unwrap();
751 let fd2 = null_fd().unwrap();
752 let result = load(
753 &format!(
754 r#"
755 [sources.file_descriptor1]
756 type = "file_descriptor"
757 fd = {fd1}
758
759 [sources.file_descriptor2]
760 type = "file_descriptor"
761 fd = {fd2}
762
763 [sinks.out]
764 type = "test_basic"
765 inputs = ["file_descriptor1", "file_descriptor2"]
766 "#
767 ),
768 Format::Toml,
769 )
770 .await;
771
772 let expected = Ok(vec![]);
773 assert_eq!(result, expected);
774 }
775
776 #[tokio::test]
777 async fn warnings() {
778 let warnings = load(
779 r#"
780 [sources.in1]
781 type = "test_basic"
782
783 [sources.in2]
784 type = "test_basic"
785
786 [transforms.sample1]
787 type = "test_basic"
788 inputs = ["in1"]
789 suffix = "foo"
790 increase = 1.25
791
792 [transforms.sample2]
793 type = "test_basic"
794 inputs = ["in1"]
795 suffix = "foo"
796 increase = 1.25
797
798 [sinks.out]
799 type = "test_basic"
800 inputs = ["sample1"]
801 "#,
802 Format::Toml,
803 )
804 .await
805 .unwrap();
806
807 assert_eq!(
808 warnings,
809 vec![
810 "Transform \"sample2\" has no consumers",
811 "Source \"in2\" has no consumers",
812 ]
813 )
814 }
815
816 #[tokio::test]
817 async fn cycle() {
818 let errors = load(
819 r#"
820 [sources.in]
821 type = "test_basic"
822
823 [transforms.one]
824 type = "test_basic"
825 inputs = ["in"]
826 suffix = "foo"
827 increase = 1.25
828
829 [transforms.two]
830 type = "test_basic"
831 inputs = ["one", "four"]
832 suffix = "foo"
833 increase = 1.25
834
835 [transforms.three]
836 type = "test_basic"
837 inputs = ["two"]
838 suffix = "foo"
839 increase = 1.25
840
841 [transforms.four]
842 type = "test_basic"
843 inputs = ["three"]
844 suffix = "foo"
845 increase = 1.25
846
847 [sinks.out]
848 type = "test_basic"
849 inputs = ["four"]
850 "#,
851 Format::Toml,
852 )
853 .await
854 .unwrap_err();
855
856 assert_eq!(
857 errors,
858 vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
859 )
860 }
861
862 #[test]
863 fn default_data_dir() {
864 let config = load_from_str(
865 indoc! {r#"
866 [sources.in]
867 type = "test_basic"
868
869 [sinks.out]
870 type = "test_basic"
871 inputs = ["in"]
872 "#},
873 Format::Toml,
874 )
875 .unwrap();
876
877 assert_eq!(
878 Some(PathBuf::from("/var/lib/vector")),
879 config.global.data_dir
880 )
881 }
882
883 #[test]
884 fn default_schema() {
885 let config = load_from_str(
886 indoc! {r#"
887 [sources.in]
888 type = "test_basic"
889
890 [sinks.out]
891 type = "test_basic"
892 inputs = ["in"]
893 "#},
894 Format::Toml,
895 )
896 .unwrap();
897
898 assert_eq!(
899 "host",
900 config.global.log_schema.host_key().unwrap().to_string()
901 );
902 assert_eq!(
903 "message",
904 config.global.log_schema.message_key().unwrap().to_string()
905 );
906 assert_eq!(
907 "timestamp",
908 config
909 .global
910 .log_schema
911 .timestamp_key()
912 .unwrap()
913 .to_string()
914 );
915 }
916
917 #[test]
918 fn custom_schema() {
919 let config = load_from_str(
920 indoc! {r#"
921 [log_schema]
922 host_key = "this"
923 message_key = "that"
924 timestamp_key = "then"
925
926 [sources.in]
927 type = "test_basic"
928
929 [sinks.out]
930 type = "test_basic"
931 inputs = ["in"]
932 "#},
933 Format::Toml,
934 )
935 .unwrap();
936
937 assert_eq!(
938 "this",
939 config.global.log_schema.host_key().unwrap().to_string()
940 );
941 assert_eq!(
942 "that",
943 config.global.log_schema.message_key().unwrap().to_string()
944 );
945 assert_eq!(
946 "then",
947 config
948 .global
949 .log_schema
950 .timestamp_key()
951 .unwrap()
952 .to_string()
953 );
954 }
955
956 #[test]
957 fn config_append() {
958 let mut config: ConfigBuilder = format::deserialize(
959 indoc! {r#"
960 [sources.in]
961 type = "test_basic"
962
963 [sinks.out]
964 type = "test_basic"
965 inputs = ["in"]
966 "#},
967 Format::Toml,
968 )
969 .unwrap();
970
971 assert_eq!(
972 config.append(
973 format::deserialize(
974 indoc! {r#"
975 data_dir = "/foobar"
976
977 [proxy]
978 http = "http://proxy.inc:3128"
979
980 [transforms.foo]
981 type = "test_basic"
982 inputs = [ "in" ]
983 suffix = "foo"
984 increase = 1.25
985
986 [[tests]]
987 name = "check_simple_log"
988 [tests.input]
989 insert_at = "foo"
990 type = "raw"
991 value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
992 [[tests.outputs]]
993 extract_from = "foo"
994 [[tests.outputs.conditions]]
995 type = "vrl"
996 source = ".message == \"Sorry, I'm busy this week Cecil\""
997 "#},
998 Format::Toml,
999 )
1000 .unwrap()
1001 ),
1002 Ok(())
1003 );
1004
1005 assert!(config.global.proxy.http.is_some());
1006 assert!(config.global.proxy.https.is_none());
1007 assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
1008 assert!(config.sources.contains_key(&ComponentKey::from("in")));
1009 assert!(config.sinks.contains_key(&ComponentKey::from("out")));
1010 assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
1011 assert_eq!(config.tests.len(), 1);
1012 }
1013
1014 #[test]
1015 fn config_append_collisions() {
1016 let mut config: ConfigBuilder = format::deserialize(
1017 indoc! {r#"
1018 [sources.in]
1019 type = "test_basic"
1020
1021 [sinks.out]
1022 type = "test_basic"
1023 inputs = ["in"]
1024 "#},
1025 Format::Toml,
1026 )
1027 .unwrap();
1028
1029 assert_eq!(
1030 config.append(
1031 format::deserialize(
1032 indoc! {r#"
1033 [sources.in]
1034 type = "test_basic"
1035
1036 [transforms.foo]
1037 type = "test_basic"
1038 inputs = [ "in" ]
1039 suffix = "foo"
1040 increase = 1.25
1041
1042 [sinks.out]
1043 type = "test_basic"
1044 inputs = ["in"]
1045 "#},
1046 Format::Toml,
1047 )
1048 .unwrap()
1049 ),
1050 Err(vec![
1051 "duplicate source id found: in".into(),
1052 "duplicate sink id found: out".into(),
1053 ])
1054 );
1055 }
1056
1057 #[test]
1058 fn with_proxy() {
1059 let config: ConfigBuilder = format::deserialize(
1060 indoc! {r#"
1061 [proxy]
1062 http = "http://server:3128"
1063 https = "http://other:3128"
1064 no_proxy = ["localhost", "127.0.0.1"]
1065
1066 [sources.in]
1067 type = "nginx_metrics"
1068 endpoints = ["http://localhost:8000/basic_status"]
1069 proxy.http = "http://server:3128"
1070 proxy.https = "http://other:3128"
1071 proxy.no_proxy = ["localhost", "127.0.0.1"]
1072
1073 [sinks.out]
1074 type = "console"
1075 inputs = ["in"]
1076 encoding.codec = "json"
1077 "#},
1078 Format::Toml,
1079 )
1080 .unwrap();
1081 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1082 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1083 assert!(config.global.proxy.no_proxy.matches("localhost"));
1084 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1085 assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1086 assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1087 assert!(source.proxy.no_proxy.matches("localhost"));
1088 }
1089
1090 #[test]
1091 fn with_partial_global_proxy() {
1092 let config: ConfigBuilder = format::deserialize(
1093 indoc! {r#"
1094 [proxy]
1095 http = "http://server:3128"
1096
1097 [sources.in]
1098 type = "nginx_metrics"
1099 endpoints = ["http://localhost:8000/basic_status"]
1100
1101 [sources.in.proxy]
1102 http = "http://server:3129"
1103 https = "http://other:3129"
1104 no_proxy = ["localhost", "127.0.0.1"]
1105
1106 [sinks.out]
1107 type = "console"
1108 inputs = ["in"]
1109 encoding.codec = "json"
1110 "#},
1111 Format::Toml,
1112 )
1113 .unwrap();
1114 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1115 assert_eq!(config.global.proxy.https, None);
1116 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1117 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1118 assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1119 assert!(source.proxy.no_proxy.matches("localhost"));
1120 }
1121
1122 #[test]
1123 fn with_partial_source_proxy() {
1124 let config: ConfigBuilder = format::deserialize(
1125 indoc! {r#"
1126 [proxy]
1127 http = "http://server:3128"
1128 https = "http://other:3128"
1129
1130 [sources.in]
1131 type = "nginx_metrics"
1132 endpoints = ["http://localhost:8000/basic_status"]
1133
1134 [sources.in.proxy]
1135 http = "http://server:3129"
1136 no_proxy = ["localhost", "127.0.0.1"]
1137
1138 [sinks.out]
1139 type = "console"
1140 inputs = ["in"]
1141 encoding.codec = "json"
1142 "#},
1143 Format::Toml,
1144 )
1145 .unwrap();
1146 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1147 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1148 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1149 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1150 assert_eq!(source.proxy.https, None);
1151 assert!(source.proxy.no_proxy.matches("localhost"));
1152 }
1153}
1154
1155#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1156mod acknowledgements_tests {
1157 use indoc::indoc;
1158
1159 use super::*;
1160
1161 #[test]
1162 fn propagates_settings() {
1163 let config: ConfigBuilder = format::deserialize(
1168 indoc! {r#"
1169 data_dir = "/tmp"
1170 [sources.in1]
1171 type = "file"
1172 include = ["/var/log/**/*.log"]
1173 [sources.in2]
1174 type = "file"
1175 include = ["/var/log/**/*.log"]
1176 [sources.in3]
1177 type = "file"
1178 include = ["/var/log/**/*.log"]
1179 [transforms.parse3]
1180 type = "test_basic"
1181 inputs = ["in3"]
1182 increase = 0.0
1183 suffix = ""
1184 [sinks.out1]
1185 type = "file"
1186 inputs = ["in1"]
1187 encoding.codec = "text"
1188 path = "/path/to/out1"
1189 [sinks.out2]
1190 type = "file"
1191 inputs = ["in2"]
1192 encoding.codec = "text"
1193 path = "/path/to/out2"
1194 acknowledgements = true
1195 [sinks.out3]
1196 type = "file"
1197 inputs = ["parse3"]
1198 encoding.codec = "text"
1199 path = "/path/to/out3"
1200 acknowledgements.enabled = true
1201 "#},
1202 Format::Toml,
1203 )
1204 .unwrap();
1205
1206 for source in config.sources.values() {
1207 assert!(
1208 !source.sink_acknowledgements,
1209 "Source `sink_acknowledgements` should be `false` before propagation"
1210 );
1211 }
1212
1213 let config = config.build().unwrap();
1214
1215 let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1216 assert!(!get("in1").sink_acknowledgements);
1217 assert!(get("in2").sink_acknowledgements);
1218 assert!(get("in3").sink_acknowledgements);
1219 }
1220}
1221
1222#[cfg(test)]
1223mod resource_tests {
1224 use std::{
1225 collections::{HashMap, HashSet},
1226 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1227 };
1228
1229 use proptest::prelude::*;
1230
1231 use super::Resource;
1232
1233 fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1234 Resource::tcp(SocketAddr::new(addr.into(), port))
1235 }
1236
1237 fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1238 Resource::udp(SocketAddr::new(addr.into(), port))
1239 }
1240
1241 fn unspecified() -> impl Strategy<Value = IpAddr> {
1242 prop_oneof![
1243 Just(Ipv4Addr::UNSPECIFIED.into()),
1244 Just(Ipv6Addr::UNSPECIFIED.into()),
1245 ]
1246 }
1247
1248 fn specaddr() -> impl Strategy<Value = IpAddr> {
1249 any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1250 }
1251
1252 fn specport() -> impl Strategy<Value = u16> {
1253 any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1254 }
1255
1256 fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1257 conflicts
1258 .into_iter()
1259 .map(|(key, values)| (key, values.into_iter().collect()))
1260 .collect()
1261 }
1262
1263 proptest! {
1264 #[test]
1265 fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1266 prop_assume!(port1 != port2);
1267 let components = vec![
1268 ("sink_0", vec![tcp(addr, 0)]),
1269 ("sink_1", vec![tcp(addr, port1)]),
1270 ("sink_2", vec![tcp(addr, port2)]),
1271 ];
1272 let conflicting = Resource::conflicts(components);
1273 assert_eq!(conflicting, HashMap::new());
1274 }
1275
1276 #[test]
1277 fn conflicting_pair(addr: IpAddr, port in specport()) {
1278 let components = vec![
1279 ("sink_0", vec![tcp(addr, 0)]),
1280 ("sink_1", vec![tcp(addr, port)]),
1281 ("sink_2", vec![tcp(addr, port)]),
1282 ];
1283 let conflicting = Resource::conflicts(components);
1284 assert_eq!(
1285 conflicting,
1286 hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1287 );
1288 }
1289
1290 #[test]
1291 fn conflicting_multi(addr: IpAddr, port in specport()) {
1292 let components = vec![
1293 ("sink_0", vec![tcp(addr, 0)]),
1294 ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1295 ("sink_2", vec![tcp(addr, port)]),
1296 ];
1297 let conflicting = Resource::conflicts(components);
1298 assert_eq!(
1299 conflicting,
1300 hashmap(vec![
1301 (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1302 (tcp(addr, port), vec!["sink_1", "sink_2"])
1303 ])
1304 );
1305 }
1306
1307 #[test]
1308 fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1309 prop_assume!(addr1 != addr2);
1310 let components = vec![
1311 ("sink_0", vec![tcp(addr1, port)]),
1312 ("sink_1", vec![tcp(addr2, port)]),
1313 ];
1314 let conflicting = Resource::conflicts(components);
1315 assert_eq!(conflicting, HashMap::new());
1316 }
1317
1318 #[test]
1319 fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1320 let components = vec![
1321 ("sink_0", vec![tcp(addr, port)]),
1322 ("sink_1", vec![tcp(unspec, port)]),
1323 ];
1324 let conflicting = Resource::conflicts(components);
1325 assert_eq!(conflicting, HashMap::new());
1326 }
1327
1328 #[test]
1329 fn different_protocol(addr: IpAddr) {
1330 let components = vec![
1331 ("sink_0", vec![tcp(addr, 0)]),
1332 ("sink_1", vec![udp(addr, 0)]),
1333 ];
1334 let conflicting = Resource::conflicts(components);
1335 assert_eq!(conflicting, HashMap::new());
1336 }
1337 }
1338
1339 #[test]
1340 fn different_unspecified_ip_version() {
1341 let components = vec![
1342 ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1343 ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1344 ];
1345 let conflicting = Resource::conflicts(components);
1346 assert_eq!(conflicting, HashMap::new());
1347 }
1348}
1349
1350#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1351mod resource_config_tests {
1352 use indoc::indoc;
1353 use vector_lib::configurable::schema::generate_root_schema;
1354
1355 use super::{Format, load_from_str};
1356
1357 #[test]
1358 fn config_conflict_detected() {
1359 assert!(
1360 load_from_str(
1361 indoc! {r#"
1362 [sources.in0]
1363 type = "stdin"
1364
1365 [sources.in1]
1366 type = "stdin"
1367
1368 [sinks.out]
1369 type = "console"
1370 inputs = ["in0","in1"]
1371 encoding.codec = "json"
1372 "#},
1373 Format::Toml,
1374 )
1375 .is_err()
1376 );
1377 }
1378
1379 #[test]
1380 #[ignore]
1381 #[allow(clippy::print_stdout)]
1382 #[allow(clippy::print_stderr)]
1383 fn generate_component_config_schema() {
1384 use indexmap::IndexMap;
1385 use vector_lib::{config::ComponentKey, configurable::configurable_component};
1386
1387 use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1388
1389 #[configurable_component]
1391 #[derive(Clone)]
1392 struct ComponentsOnlyConfig {
1393 #[serde(default)]
1395 pub sources: IndexMap<ComponentKey, SourceOuter>,
1396
1397 #[serde(default)]
1399 pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1400
1401 #[serde(default)]
1403 pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1404 }
1405
1406 match generate_root_schema::<ComponentsOnlyConfig>() {
1407 Ok(schema) => {
1408 let json = serde_json::to_string_pretty(&schema)
1409 .expect("rendering root schema to JSON should not fail");
1410
1411 println!("{json}");
1412 }
1413 Err(e) => eprintln!("error while generating schema: {e:?}"),
1414 }
1415 }
1416}