vector/
generate.rs

1#![allow(missing_docs)]
2use std::{
3    fs::{create_dir_all, File},
4    io::Write,
5    path::{Path, PathBuf},
6};
7
8use clap::Parser;
9use colored::*;
10use indexmap::IndexMap;
11use serde::Serialize;
12use toml::{map::Map, Value};
13use vector_lib::configurable::component::{
14    SinkDescription, SourceDescription, TransformDescription,
15};
16use vector_lib::{buffers::BufferConfig, config::GlobalOptions, default_data_dir};
17
18use crate::config::{format, Format, SinkHealthcheckOptions};
19
20#[derive(Parser, Debug)]
21#[command(rename_all = "kebab-case")]
22pub struct Opts {
23    /// Whether to skip the generation of global fields.
24    #[arg(short, long)]
25    pub(crate) fragment: bool,
26
27    /// Generate expression, e.g. 'stdin/remap,filter/console'
28    ///
29    /// Three comma-separated lists of sources, transforms and sinks, divided by
30    /// forward slashes. If subsequent component types are not needed then
31    /// their dividers can be omitted from the expression.
32    ///
33    /// For example:
34    ///
35    /// `/filter` prints a `filter` transform.
36    ///
37    /// `//file,http` prints a `file` and `http` sink.
38    ///
39    /// `stdin//http` prints a `stdin` source and an `http` sink.
40    ///
41    /// Generated components are given incremental names (`source1`, `source2`,
42    /// etc) which should be replaced in order to provide better context. You
43    /// can optionally specify the names of components by prefixing them with
44    /// `<name>:`, e.g.:
45    ///
46    /// `foo:stdin/bar:test_basic/baz:http` prints a `stdin` source called
47    /// `foo`, a `test_basic` transform called `bar`, and an `http` sink
48    /// called `baz`.
49    ///
50    /// Vector makes a best attempt at constructing a sensible topology. The
51    /// first transform generated will consume from all sources and subsequent
52    /// transforms will consume from their predecessor. All sinks will consume
53    /// from the last transform or, if none are specified, from all sources. It
54    /// is then up to you to restructure the `inputs` of each component to build
55    /// the topology you need.
56    pub(crate) expression: String,
57
58    /// Generate config as a file
59    #[arg(long)]
60    pub(crate) file: Option<PathBuf>,
61
62    #[arg(long, default_value = "yaml")]
63    pub(crate) format: Format,
64}
65
66#[derive(Serialize)]
67pub struct SinkOuter {
68    pub inputs: Vec<String>,
69    #[serde(flatten)]
70    pub inner: Value,
71    pub healthcheck: SinkHealthcheckOptions,
72    pub buffer: BufferConfig,
73}
74
75#[derive(Serialize)]
76pub struct TransformOuter {
77    pub inputs: Vec<String>,
78    #[serde(flatten)]
79    pub inner: Value,
80}
81
82#[derive(Serialize, Default)]
83pub struct Config {
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub sources: Option<IndexMap<String, Value>>,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub transforms: Option<IndexMap<String, TransformOuter>>,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub sinks: Option<IndexMap<String, SinkOuter>>,
90}
91
92/// Controls how the resulting transform topology is wired up. This is not
93/// user-configurable.
94pub(crate) enum TransformInputsStrategy {
95    /// Default.
96    ///
97    /// The first transform generated will consume from all sources and
98    /// subsequent transforms will consume from their predecessor.
99    Auto,
100    /// Used for property testing `vector config`.
101    ///
102    /// All transforms use a list of all sources as inputs.
103    #[cfg(test)]
104    #[allow(dead_code)]
105    All,
106}
107
108#[derive(Serialize, Default)]
109struct FullConfig {
110    #[serde(flatten)]
111    global_options: Option<GlobalOptions>,
112    #[serde(flatten)]
113    config: Config,
114}
115
116pub(crate) fn generate_example(
117    opts: &Opts,
118    transform_inputs_strategy: TransformInputsStrategy,
119) -> Result<String, Vec<String>> {
120    let components: Vec<Vec<_>> = opts
121        .expression
122        .split(['|', '/'])
123        .map(|s| {
124            s.split(',')
125                .map(|s| s.trim().to_string())
126                .filter(|s| !s.is_empty())
127                .collect()
128        })
129        .collect();
130
131    let mut config = Config::default();
132
133    let mut errs = Vec::new();
134
135    let mut source_names = Vec::new();
136    if let Some(source_types) = components.first() {
137        let mut sources = IndexMap::new();
138
139        for (i, source_expr) in source_types.iter().enumerate() {
140            let (name, source_type) = if let Some(c_index) = source_expr.find(':') {
141                if c_index == 0 {
142                    errs.push(format!(
143                        "failed to generate source '{source_expr}': empty name is not allowed"
144                    ));
145                    continue;
146                }
147                let mut chopped_expr = source_expr.clone();
148                (
149                    chopped_expr.drain(..c_index).collect(),
150                    chopped_expr.drain(1..).collect(),
151                )
152            } else {
153                (format!("source{i}"), source_expr.clone())
154            };
155            source_names.push(name.clone());
156
157            let mut example = match SourceDescription::example(&source_type) {
158                Ok(example) => example,
159                Err(err) => {
160                    errs.push(format!("failed to generate source '{source_type}': {err}"));
161                    Value::Table(Map::new())
162                }
163            };
164            example
165                .as_table_mut()
166                .expect("examples are always tables")
167                .insert("type".into(), source_type.to_owned().into());
168
169            sources.insert(name, example);
170        }
171
172        if !sources.is_empty() {
173            config.sources = Some(sources);
174        }
175    }
176
177    let mut transform_names = Vec::new();
178    if let Some(transform_types) = components.get(1) {
179        let mut transforms = IndexMap::new();
180
181        for (i, transform_expr) in transform_types.iter().enumerate() {
182            let (name, transform_type) = if let Some(c_index) = transform_expr.find(':') {
183                if c_index == 0 {
184                    errs.push(format!(
185                        "failed to generate transform '{transform_expr}': empty name is not allowed"
186                    ));
187                    continue;
188                }
189                let mut chopped_expr = transform_expr.clone();
190                (
191                    chopped_expr.drain(..c_index).collect(),
192                    chopped_expr.drain(1..).collect(),
193                )
194            } else {
195                (format!("transform{i}"), transform_expr.clone())
196            };
197            transform_names.push(name.clone());
198
199            let targets = match transform_inputs_strategy {
200                TransformInputsStrategy::Auto => {
201                    if i == 0 {
202                        source_names.clone()
203                    } else {
204                        vec![transform_names
205                            .get(i - 1)
206                            .unwrap_or(&"component-id".to_owned())
207                            .to_owned()]
208                    }
209                }
210                #[cfg(test)]
211                TransformInputsStrategy::All => source_names.clone(),
212            };
213
214            let mut example = match TransformDescription::example(&transform_type) {
215                Ok(example) => example,
216                Err(err) => {
217                    errs.push(format!(
218                        "failed to generate transform '{transform_type}': {err}"
219                    ));
220                    Value::Table(Map::new())
221                }
222            };
223            example
224                .as_table_mut()
225                .expect("examples are always tables")
226                .insert("type".into(), transform_type.to_owned().into());
227
228            transforms.insert(
229                name,
230                TransformOuter {
231                    inputs: targets,
232                    inner: example,
233                },
234            );
235        }
236
237        if !transforms.is_empty() {
238            config.transforms = Some(transforms);
239        }
240    }
241
242    if let Some(sink_types) = components.get(2) {
243        let mut sinks = IndexMap::new();
244
245        for (i, sink_expr) in sink_types.iter().enumerate() {
246            let (name, sink_type) = if let Some(c_index) = sink_expr.find(':') {
247                if c_index == 0 {
248                    errs.push(format!(
249                        "failed to generate sink '{sink_expr}': empty name is not allowed"
250                    ));
251                    continue;
252                }
253                let mut chopped_expr = sink_expr.clone();
254                (
255                    chopped_expr.drain(..c_index).collect(),
256                    chopped_expr.drain(1..).collect(),
257                )
258            } else {
259                (format!("sink{i}"), sink_expr.clone())
260            };
261
262            let mut example = match SinkDescription::example(&sink_type) {
263                Ok(example) => example,
264                Err(err) => {
265                    errs.push(format!("failed to generate sink '{sink_type}': {err}"));
266                    Value::Table(Map::new())
267                }
268            };
269            example
270                .as_table_mut()
271                .expect("examples are always tables")
272                .insert("type".into(), sink_type.to_owned().into());
273
274            sinks.insert(
275                name,
276                SinkOuter {
277                    inputs: transform_names
278                        .last()
279                        .map(|s| vec![s.to_owned()])
280                        .or_else(|| {
281                            if !source_names.is_empty() {
282                                Some(source_names.clone())
283                            } else {
284                                None
285                            }
286                        })
287                        .unwrap_or_else(|| vec!["component-id".to_owned()]),
288                    buffer: BufferConfig::default(),
289                    healthcheck: SinkHealthcheckOptions::default(),
290                    inner: example,
291                },
292            );
293        }
294
295        if !sinks.is_empty() {
296            config.sinks = Some(sinks);
297        }
298    }
299
300    if !errs.is_empty() {
301        return Err(errs);
302    }
303
304    let full_config = FullConfig {
305        global_options: if !opts.fragment {
306            Some(GlobalOptions {
307                data_dir: default_data_dir(),
308                ..Default::default()
309            })
310        } else {
311            None
312        },
313        config,
314    };
315
316    let builder = match format::serialize(&full_config, opts.format) {
317        Ok(v) => v,
318        Err(e) => {
319            errs.push(format!("failed to marshal sources: {e}"));
320            return Err(errs);
321        }
322    };
323
324    let file = opts.file.as_ref();
325    if file.is_some() {
326        #[allow(clippy::print_stdout)]
327        match write_config(file.as_ref().unwrap(), &builder) {
328            Ok(_) => {
329                println!(
330                    "Config file written to {:?}",
331                    &file.as_ref().unwrap().join("\n")
332                )
333            }
334            Err(e) => errs.push(format!("failed to write to file: {e}")),
335        };
336    };
337
338    if !errs.is_empty() {
339        Err(errs)
340    } else {
341        Ok(builder)
342    }
343}
344
345pub fn cmd(opts: &Opts) -> exitcode::ExitCode {
346    match generate_example(opts, TransformInputsStrategy::Auto) {
347        Ok(s) => {
348            #[allow(clippy::print_stdout)]
349            {
350                println!("{s}");
351            }
352            exitcode::OK
353        }
354        Err(errs) => {
355            #[allow(clippy::print_stderr)]
356            {
357                errs.iter().for_each(|e| eprintln!("{}", e.red()));
358            }
359            exitcode::SOFTWARE
360        }
361    }
362}
363
364fn write_config(filepath: &Path, body: &str) -> Result<(), crate::Error> {
365    if filepath.exists() {
366        // If the file exists, we don't want to overwrite, that's just rude.
367        Err(format!("{:?} already exists", &filepath).into())
368    } else {
369        if let Some(directory) = filepath.parent() {
370            create_dir_all(directory)?;
371        }
372        File::create(filepath)
373            .and_then(|mut file| file.write_all(body.as_bytes()))
374            .map_err(Into::into)
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use crate::config::ConfigBuilder;
382    use rstest::rstest;
383
384    fn generate_and_deserialize(expression: String, format: Format) {
385        let opts = Opts {
386            fragment: false,
387            expression,
388            file: None,
389            format,
390        };
391        let cfg_string = generate_example(&opts, TransformInputsStrategy::Auto).unwrap();
392        if let Err(error) = format::deserialize::<ConfigBuilder>(&cfg_string, opts.format) {
393            panic!(
394                "Failed to generate example for {} with error: {error:?})",
395                opts.expression
396            );
397        }
398    }
399
400    #[rstest]
401    #[case(Format::Toml)]
402    #[case(Format::Json)]
403    #[case(Format::Yaml)]
404    #[test]
405    fn generate_all(#[case] format: Format) {
406        for name in SourceDescription::types() {
407            generate_and_deserialize(format!("{name}//"), format);
408        }
409
410        for name in TransformDescription::types() {
411            generate_and_deserialize(format!("/{name}/"), format);
412        }
413
414        for name in SinkDescription::types() {
415            generate_and_deserialize(format!("//{name}"), format);
416        }
417    }
418
419    #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))]
420    #[test]
421    fn generate_configfile() {
422        use std::fs;
423
424        use tempfile::tempdir;
425
426        let tempdir = tempdir().expect("Unable to create tempdir for config");
427        let filepath = tempdir.path().join("./config.example.toml");
428        let opts = Opts {
429            fragment: false,
430            expression: "stdin/test_basic/console".to_string(),
431            file: Some(filepath.clone()),
432            format: Format::Toml,
433        };
434
435        let cfg = generate_example(&opts, TransformInputsStrategy::Auto);
436        let filecontents = fs::read_to_string(
437            fs::canonicalize(&filepath).expect("Could not return canonicalized filepath"),
438        )
439        .expect("Could not read config file");
440        fs::remove_file(filepath).expect("Could not cleanup config file!");
441        assert_eq!(cfg.unwrap(), filecontents)
442    }
443
444    #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))]
445    #[test]
446    fn generate_basic_toml() {
447        let mut opts = Opts {
448            fragment: false,
449            expression: "stdin/test_basic/console".to_string(),
450            file: None,
451            format: Format::Toml,
452        };
453
454        assert_eq!(
455            generate_example(&opts, TransformInputsStrategy::Auto),
456            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
457
458                [sources.source0]
459                max_length = 102400
460                type = "stdin"
461
462                [sources.source0.decoding]
463                codec = "bytes"
464
465                [transforms.transform0]
466                inputs = ["source0"]
467                increase = 0.0
468                suffix = ""
469                type = "test_basic"
470
471                [sinks.sink0]
472                inputs = ["transform0"]
473                target = "stdout"
474                type = "console"
475
476                [sinks.sink0.encoding]
477                codec = "json"
478
479                [sinks.sink0.encoding.json]
480                pretty = false
481
482                [sinks.sink0.healthcheck]
483                enabled = true
484
485                [sinks.sink0.buffer]
486                type = "memory"
487                max_events = 500
488                when_full = "block"
489            "#}
490            .to_string())
491        );
492
493        opts.expression = "stdin|test_basic|console".to_string();
494        assert_eq!(
495            generate_example(&opts, TransformInputsStrategy::Auto),
496            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
497
498                [sources.source0]
499                max_length = 102400
500                type = "stdin"
501
502                [sources.source0.decoding]
503                codec = "bytes"
504
505                [transforms.transform0]
506                inputs = ["source0"]
507                increase = 0.0
508                suffix = ""
509                type = "test_basic"
510
511                [sinks.sink0]
512                inputs = ["transform0"]
513                target = "stdout"
514                type = "console"
515
516                [sinks.sink0.encoding]
517                codec = "json"
518
519                [sinks.sink0.encoding.json]
520                pretty = false
521
522                [sinks.sink0.healthcheck]
523                enabled = true
524
525                [sinks.sink0.buffer]
526                type = "memory"
527                max_events = 500
528                when_full = "block"
529            "#}
530            .to_string())
531        );
532
533        opts.expression = "stdin//console".to_string();
534        assert_eq!(
535            generate_example(&opts, TransformInputsStrategy::Auto),
536            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
537
538                [sources.source0]
539                max_length = 102400
540                type = "stdin"
541
542                [sources.source0.decoding]
543                codec = "bytes"
544
545                [sinks.sink0]
546                inputs = ["source0"]
547                target = "stdout"
548                type = "console"
549
550                [sinks.sink0.encoding]
551                codec = "json"
552
553                [sinks.sink0.encoding.json]
554                pretty = false
555
556                [sinks.sink0.healthcheck]
557                enabled = true
558
559                [sinks.sink0.buffer]
560                type = "memory"
561                max_events = 500
562                when_full = "block"
563            "#}
564            .to_string())
565        );
566
567        opts.expression = "//console".to_string();
568        assert_eq!(
569            generate_example(&opts, TransformInputsStrategy::Auto),
570            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
571
572                [sinks.sink0]
573                inputs = ["component-id"]
574                target = "stdout"
575                type = "console"
576
577                [sinks.sink0.encoding]
578                codec = "json"
579
580                [sinks.sink0.encoding.json]
581                pretty = false
582
583                [sinks.sink0.healthcheck]
584                enabled = true
585
586                [sinks.sink0.buffer]
587                type = "memory"
588                max_events = 500
589                when_full = "block"
590            "#}
591            .to_string())
592        );
593
594        opts.expression = "/test_basic,test_basic,test_basic".to_string();
595        assert_eq!(
596            generate_example(&opts, TransformInputsStrategy::Auto),
597            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
598
599                [transforms.transform0]
600                inputs = []
601                increase = 0.0
602                suffix = ""
603                type = "test_basic"
604
605                [transforms.transform1]
606                inputs = ["transform0"]
607                increase = 0.0
608                suffix = ""
609                type = "test_basic"
610
611                [transforms.transform2]
612                inputs = ["transform1"]
613                increase = 0.0
614                suffix = ""
615                type = "test_basic"
616            "#}
617            .to_string())
618        );
619
620        opts.fragment = true;
621        opts.expression = "/test_basic,test_basic,test_basic".to_string();
622        assert_eq!(
623            generate_example(&opts, TransformInputsStrategy::Auto),
624            Ok(indoc::indoc! {r#"
625                [transforms.transform0]
626                inputs = []
627                increase = 0.0
628                suffix = ""
629                type = "test_basic"
630
631                [transforms.transform1]
632                inputs = ["transform0"]
633                increase = 0.0
634                suffix = ""
635                type = "test_basic"
636
637                [transforms.transform2]
638                inputs = ["transform1"]
639                increase = 0.0
640                suffix = ""
641                type = "test_basic"
642            "#}
643            .to_string())
644        );
645    }
646
647    #[cfg(all(
648        feature = "sources-demo_logs",
649        feature = "transforms-remap",
650        feature = "sinks-console"
651    ))]
652    #[test]
653    fn generate_basic_yaml() {
654        let opts = Opts {
655            fragment: false,
656            expression: "demo_logs/remap/console".to_string(),
657            file: None,
658            format: Format::Yaml,
659        };
660
661        assert_eq!(
662            generate_example(&opts, TransformInputsStrategy::Auto).unwrap(),
663            indoc::indoc! {r"
664            data_dir: /var/lib/vector/
665            sources:
666              source0:
667                count: 9223372036854775807
668                decoding:
669                  codec: bytes
670                format: json
671                framing:
672                  method: bytes
673                interval: 1.0
674                type: demo_logs
675            transforms:
676              transform0:
677                inputs:
678                - source0
679                drop_on_abort: false
680                drop_on_error: false
681                metric_tag_values: single
682                reroute_dropped: false
683                runtime: ast
684                type: remap
685            sinks:
686              sink0:
687                inputs:
688                - transform0
689                encoding:
690                  codec: json
691                  json:
692                    pretty: false
693                target: stdout
694                type: console
695                healthcheck:
696                  enabled: true
697                  uri: null
698                buffer:
699                  type: memory
700                  max_events: 500
701                  when_full: block
702            "}
703        );
704    }
705
706    #[cfg(all(
707        feature = "sources-demo_logs",
708        feature = "transforms-remap",
709        feature = "sinks-console"
710    ))]
711    #[test]
712    fn generate_basic_json() {
713        let opts = Opts {
714            fragment: false,
715            expression: "demo_logs/remap/console".to_string(),
716            file: None,
717            format: Format::Json,
718        };
719
720        assert_eq!(
721            generate_example(&opts, TransformInputsStrategy::Auto).unwrap(),
722            indoc::indoc! {r#"
723            {
724              "data_dir": "/var/lib/vector/",
725              "sources": {
726                "source0": {
727                  "count": 9223372036854775807,
728                  "decoding": {
729                    "codec": "bytes"
730                  },
731                  "format": "json",
732                  "framing": {
733                    "method": "bytes"
734                  },
735                  "interval": 1.0,
736                  "type": "demo_logs"
737                }
738              },
739              "transforms": {
740                "transform0": {
741                  "inputs": [
742                    "source0"
743                  ],
744                  "drop_on_abort": false,
745                  "drop_on_error": false,
746                  "metric_tag_values": "single",
747                  "reroute_dropped": false,
748                  "runtime": "ast",
749                  "type": "remap"
750                }
751              },
752              "sinks": {
753                "sink0": {
754                  "inputs": [
755                    "transform0"
756                  ],
757                  "encoding": {
758                    "codec": "json",
759                    "json": {
760                      "pretty": false
761                    }
762                  },
763                  "target": "stdout",
764                  "type": "console",
765                  "healthcheck": {
766                    "enabled": true,
767                    "uri": null
768                  },
769                  "buffer": {
770                    "type": "memory",
771                    "max_events": 500,
772                    "when_full": "block"
773                  }
774                }
775              }
776            }"#}
777        );
778    }
779}