vector/
generate.rs

1#![allow(missing_docs)]
2use std::{
3    fs::{File, create_dir_all},
4    io::Write,
5    path::{Path, PathBuf},
6};
7
8use clap::Parser;
9use colored::*;
10use indexmap::IndexMap;
11use serde::Serialize;
12use toml::{Value, map::Map};
13use vector_lib::{
14    buffers::BufferConfig,
15    config::GlobalOptions,
16    configurable::component::{SinkDescription, SourceDescription, TransformDescription},
17    default_data_dir,
18};
19
20use crate::config::{Format, SinkHealthcheckOptions, format};
21
22#[derive(Parser, Debug)]
23#[command(rename_all = "kebab-case")]
24pub struct Opts {
25    /// Whether to skip the generation of global fields.
26    #[arg(short, long)]
27    pub(crate) fragment: bool,
28
29    /// Generate expression, e.g. 'stdin/remap,filter/console'
30    ///
31    /// Three comma-separated lists of sources, transforms and sinks, divided by
32    /// forward slashes. If subsequent component types are not needed then
33    /// their dividers can be omitted from the expression.
34    ///
35    /// For example:
36    ///
37    /// `/filter` prints a `filter` transform.
38    ///
39    /// `//file,http` prints a `file` and `http` sink.
40    ///
41    /// `stdin//http` prints a `stdin` source and an `http` sink.
42    ///
43    /// Generated components are given incremental names (`source1`, `source2`,
44    /// etc) which should be replaced in order to provide better context. You
45    /// can optionally specify the names of components by prefixing them with
46    /// `<name>:`, e.g.:
47    ///
48    /// `foo:stdin/bar:test_basic/baz:http` prints a `stdin` source called
49    /// `foo`, a `test_basic` transform called `bar`, and an `http` sink
50    /// called `baz`.
51    ///
52    /// Vector makes a best attempt at constructing a sensible topology. The
53    /// first transform generated will consume from all sources and subsequent
54    /// transforms will consume from their predecessor. All sinks will consume
55    /// from the last transform or, if none are specified, from all sources. It
56    /// is then up to you to restructure the `inputs` of each component to build
57    /// the topology you need.
58    pub(crate) expression: String,
59
60    /// Generate config as a file
61    #[arg(long)]
62    pub(crate) file: Option<PathBuf>,
63
64    #[arg(long, default_value = "yaml")]
65    pub(crate) format: Format,
66}
67
68#[derive(Serialize)]
69pub struct SinkOuter {
70    pub inputs: Vec<String>,
71    #[serde(flatten)]
72    pub inner: Value,
73    pub healthcheck: SinkHealthcheckOptions,
74    pub buffer: BufferConfig,
75}
76
77#[derive(Serialize)]
78pub struct TransformOuter {
79    pub inputs: Vec<String>,
80    #[serde(flatten)]
81    pub inner: Value,
82}
83
84#[derive(Serialize, Default)]
85pub struct Config {
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub sources: Option<IndexMap<String, Value>>,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub transforms: Option<IndexMap<String, TransformOuter>>,
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub sinks: Option<IndexMap<String, SinkOuter>>,
92}
93
94/// Controls how the resulting transform topology is wired up. This is not
95/// user-configurable.
96pub(crate) enum TransformInputsStrategy {
97    /// Default.
98    ///
99    /// The first transform generated will consume from all sources and
100    /// subsequent transforms will consume from their predecessor.
101    Auto,
102    /// Used for property testing `vector config`.
103    ///
104    /// All transforms use a list of all sources as inputs.
105    #[cfg(test)]
106    #[allow(dead_code)]
107    All,
108}
109
110#[derive(Serialize, Default)]
111struct FullConfig {
112    #[serde(flatten)]
113    global_options: Option<GlobalOptions>,
114    #[serde(flatten)]
115    config: Config,
116}
117
118pub(crate) fn generate_example(
119    opts: &Opts,
120    transform_inputs_strategy: TransformInputsStrategy,
121) -> Result<String, Vec<String>> {
122    let components: Vec<Vec<_>> = opts
123        .expression
124        .split(['|', '/'])
125        .map(|s| {
126            s.split(',')
127                .map(|s| s.trim().to_string())
128                .filter(|s| !s.is_empty())
129                .collect()
130        })
131        .collect();
132
133    let mut config = Config::default();
134
135    let mut errs = Vec::new();
136
137    let mut source_names = Vec::new();
138    if let Some(source_types) = components.first() {
139        let mut sources = IndexMap::new();
140
141        for (i, source_expr) in source_types.iter().enumerate() {
142            let (name, source_type) = if let Some(c_index) = source_expr.find(':') {
143                if c_index == 0 {
144                    errs.push(format!(
145                        "failed to generate source '{source_expr}': empty name is not allowed"
146                    ));
147                    continue;
148                }
149                let mut chopped_expr = source_expr.clone();
150                (
151                    chopped_expr.drain(..c_index).collect(),
152                    chopped_expr.drain(1..).collect(),
153                )
154            } else {
155                (format!("source{i}"), source_expr.clone())
156            };
157            source_names.push(name.clone());
158
159            let mut example = match SourceDescription::example(&source_type) {
160                Ok(example) => example,
161                Err(err) => {
162                    errs.push(format!("failed to generate source '{source_type}': {err}"));
163                    Value::Table(Map::new())
164                }
165            };
166            example
167                .as_table_mut()
168                .expect("examples are always tables")
169                .insert("type".into(), source_type.to_owned().into());
170
171            sources.insert(name, example);
172        }
173
174        if !sources.is_empty() {
175            config.sources = Some(sources);
176        }
177    }
178
179    let mut transform_names = Vec::new();
180    if let Some(transform_types) = components.get(1) {
181        let mut transforms = IndexMap::new();
182
183        for (i, transform_expr) in transform_types.iter().enumerate() {
184            let (name, transform_type) = if let Some(c_index) = transform_expr.find(':') {
185                if c_index == 0 {
186                    errs.push(format!(
187                        "failed to generate transform '{transform_expr}': empty name is not allowed"
188                    ));
189                    continue;
190                }
191                let mut chopped_expr = transform_expr.clone();
192                (
193                    chopped_expr.drain(..c_index).collect(),
194                    chopped_expr.drain(1..).collect(),
195                )
196            } else {
197                (format!("transform{i}"), transform_expr.clone())
198            };
199            transform_names.push(name.clone());
200
201            let targets = match transform_inputs_strategy {
202                TransformInputsStrategy::Auto => {
203                    if i == 0 {
204                        source_names.clone()
205                    } else {
206                        vec![
207                            transform_names
208                                .get(i - 1)
209                                .unwrap_or(&"component-id".to_owned())
210                                .to_owned(),
211                        ]
212                    }
213                }
214                #[cfg(test)]
215                TransformInputsStrategy::All => source_names.clone(),
216            };
217
218            let mut example = match TransformDescription::example(&transform_type) {
219                Ok(example) => example,
220                Err(err) => {
221                    errs.push(format!(
222                        "failed to generate transform '{transform_type}': {err}"
223                    ));
224                    Value::Table(Map::new())
225                }
226            };
227            example
228                .as_table_mut()
229                .expect("examples are always tables")
230                .insert("type".into(), transform_type.to_owned().into());
231
232            transforms.insert(
233                name,
234                TransformOuter {
235                    inputs: targets,
236                    inner: example,
237                },
238            );
239        }
240
241        if !transforms.is_empty() {
242            config.transforms = Some(transforms);
243        }
244    }
245
246    if let Some(sink_types) = components.get(2) {
247        let mut sinks = IndexMap::new();
248
249        for (i, sink_expr) in sink_types.iter().enumerate() {
250            let (name, sink_type) = if let Some(c_index) = sink_expr.find(':') {
251                if c_index == 0 {
252                    errs.push(format!(
253                        "failed to generate sink '{sink_expr}': empty name is not allowed"
254                    ));
255                    continue;
256                }
257                let mut chopped_expr = sink_expr.clone();
258                (
259                    chopped_expr.drain(..c_index).collect(),
260                    chopped_expr.drain(1..).collect(),
261                )
262            } else {
263                (format!("sink{i}"), sink_expr.clone())
264            };
265
266            let mut example = match SinkDescription::example(&sink_type) {
267                Ok(example) => example,
268                Err(err) => {
269                    errs.push(format!("failed to generate sink '{sink_type}': {err}"));
270                    Value::Table(Map::new())
271                }
272            };
273            example
274                .as_table_mut()
275                .expect("examples are always tables")
276                .insert("type".into(), sink_type.to_owned().into());
277
278            sinks.insert(
279                name,
280                SinkOuter {
281                    inputs: transform_names
282                        .last()
283                        .map(|s| vec![s.to_owned()])
284                        .or_else(|| {
285                            if !source_names.is_empty() {
286                                Some(source_names.clone())
287                            } else {
288                                None
289                            }
290                        })
291                        .unwrap_or_else(|| vec!["component-id".to_owned()]),
292                    buffer: BufferConfig::default(),
293                    healthcheck: SinkHealthcheckOptions::default(),
294                    inner: example,
295                },
296            );
297        }
298
299        if !sinks.is_empty() {
300            config.sinks = Some(sinks);
301        }
302    }
303
304    if !errs.is_empty() {
305        return Err(errs);
306    }
307
308    let full_config = FullConfig {
309        global_options: if !opts.fragment {
310            Some(GlobalOptions {
311                data_dir: default_data_dir(),
312                ..Default::default()
313            })
314        } else {
315            None
316        },
317        config,
318    };
319
320    let builder = match format::serialize(&full_config, opts.format) {
321        Ok(v) => v,
322        Err(e) => {
323            errs.push(format!("failed to marshal sources: {e}"));
324            return Err(errs);
325        }
326    };
327
328    let file = opts.file.as_ref();
329    if let Some(path) = file {
330        match write_config(path, &builder) {
331            #[allow(clippy::print_stdout)]
332            Ok(_) => {
333                println!(
334                    "Config file written to {:?}",
335                    &file.as_ref().unwrap().join("\n")
336                )
337            }
338            Err(e) => errs.push(format!("failed to write to file: {e}")),
339        };
340    };
341
342    if !errs.is_empty() {
343        Err(errs)
344    } else {
345        Ok(builder)
346    }
347}
348
349pub fn cmd(opts: &Opts) -> exitcode::ExitCode {
350    match generate_example(opts, TransformInputsStrategy::Auto) {
351        Ok(s) => {
352            #[allow(clippy::print_stdout)]
353            {
354                println!("{s}");
355            }
356            exitcode::OK
357        }
358        Err(errs) => {
359            #[allow(clippy::print_stderr)]
360            {
361                errs.iter().for_each(|e| eprintln!("{}", e.red()));
362            }
363            exitcode::SOFTWARE
364        }
365    }
366}
367
368fn write_config(filepath: &Path, body: &str) -> Result<(), crate::Error> {
369    if filepath.exists() {
370        // If the file exists, we don't want to overwrite, that's just rude.
371        Err(format!("{:?} already exists", &filepath).into())
372    } else {
373        if let Some(directory) = filepath.parent() {
374            create_dir_all(directory)?;
375        }
376        File::create(filepath)
377            .and_then(|mut file| file.write_all(body.as_bytes()))
378            .map_err(Into::into)
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use rstest::rstest;
385
386    use super::*;
387    use crate::config::ConfigBuilder;
388
389    fn generate_and_deserialize(expression: String, format: Format) {
390        let opts = Opts {
391            fragment: false,
392            expression,
393            file: None,
394            format,
395        };
396        let cfg_string = generate_example(&opts, TransformInputsStrategy::Auto).unwrap();
397        if let Err(error) = format::deserialize::<ConfigBuilder>(&cfg_string, opts.format) {
398            panic!(
399                "Failed to generate example for {} with error: {error:?})",
400                opts.expression
401            );
402        }
403    }
404
405    #[rstest]
406    #[case(Format::Toml)]
407    #[case(Format::Json)]
408    #[case(Format::Yaml)]
409    #[test]
410    fn generate_all(#[case] format: Format) {
411        for name in SourceDescription::types() {
412            generate_and_deserialize(format!("{name}//"), format);
413        }
414
415        for name in TransformDescription::types() {
416            generate_and_deserialize(format!("/{name}/"), format);
417        }
418
419        for name in SinkDescription::types() {
420            generate_and_deserialize(format!("//{name}"), format);
421        }
422    }
423
424    #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))]
425    #[test]
426    fn generate_configfile() {
427        use std::fs;
428
429        use tempfile::tempdir;
430
431        let tempdir = tempdir().expect("Unable to create tempdir for config");
432        let filepath = tempdir.path().join("./config.example.toml");
433        let opts = Opts {
434            fragment: false,
435            expression: "stdin/test_basic/console".to_string(),
436            file: Some(filepath.clone()),
437            format: Format::Toml,
438        };
439
440        let cfg = generate_example(&opts, TransformInputsStrategy::Auto);
441        let filecontents = fs::read_to_string(
442            fs::canonicalize(&filepath).expect("Could not return canonicalized filepath"),
443        )
444        .expect("Could not read config file");
445        fs::remove_file(filepath).expect("Could not cleanup config file!");
446        assert_eq!(cfg.unwrap(), filecontents)
447    }
448
449    #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))]
450    #[test]
451    fn generate_basic_toml() {
452        let mut opts = Opts {
453            fragment: false,
454            expression: "stdin/test_basic/console".to_string(),
455            file: None,
456            format: Format::Toml,
457        };
458
459        assert_eq!(
460            generate_example(&opts, TransformInputsStrategy::Auto),
461            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
462
463                [sources.source0]
464                max_length = 102400
465                type = "stdin"
466
467                [sources.source0.decoding]
468                codec = "bytes"
469
470                [transforms.transform0]
471                inputs = ["source0"]
472                increase = 0.0
473                suffix = ""
474                type = "test_basic"
475
476                [sinks.sink0]
477                inputs = ["transform0"]
478                target = "stdout"
479                type = "console"
480
481                [sinks.sink0.encoding]
482                codec = "json"
483
484                [sinks.sink0.encoding.json]
485                pretty = false
486
487                [sinks.sink0.healthcheck]
488                enabled = true
489
490                [sinks.sink0.buffer]
491                type = "memory"
492                max_events = 500
493                when_full = "block"
494            "#}
495            .to_string())
496        );
497
498        opts.expression = "stdin|test_basic|console".to_string();
499        assert_eq!(
500            generate_example(&opts, TransformInputsStrategy::Auto),
501            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
502
503                [sources.source0]
504                max_length = 102400
505                type = "stdin"
506
507                [sources.source0.decoding]
508                codec = "bytes"
509
510                [transforms.transform0]
511                inputs = ["source0"]
512                increase = 0.0
513                suffix = ""
514                type = "test_basic"
515
516                [sinks.sink0]
517                inputs = ["transform0"]
518                target = "stdout"
519                type = "console"
520
521                [sinks.sink0.encoding]
522                codec = "json"
523
524                [sinks.sink0.encoding.json]
525                pretty = false
526
527                [sinks.sink0.healthcheck]
528                enabled = true
529
530                [sinks.sink0.buffer]
531                type = "memory"
532                max_events = 500
533                when_full = "block"
534            "#}
535            .to_string())
536        );
537
538        opts.expression = "stdin//console".to_string();
539        assert_eq!(
540            generate_example(&opts, TransformInputsStrategy::Auto),
541            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
542
543                [sources.source0]
544                max_length = 102400
545                type = "stdin"
546
547                [sources.source0.decoding]
548                codec = "bytes"
549
550                [sinks.sink0]
551                inputs = ["source0"]
552                target = "stdout"
553                type = "console"
554
555                [sinks.sink0.encoding]
556                codec = "json"
557
558                [sinks.sink0.encoding.json]
559                pretty = false
560
561                [sinks.sink0.healthcheck]
562                enabled = true
563
564                [sinks.sink0.buffer]
565                type = "memory"
566                max_events = 500
567                when_full = "block"
568            "#}
569            .to_string())
570        );
571
572        opts.expression = "//console".to_string();
573        assert_eq!(
574            generate_example(&opts, TransformInputsStrategy::Auto),
575            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
576
577                [sinks.sink0]
578                inputs = ["component-id"]
579                target = "stdout"
580                type = "console"
581
582                [sinks.sink0.encoding]
583                codec = "json"
584
585                [sinks.sink0.encoding.json]
586                pretty = false
587
588                [sinks.sink0.healthcheck]
589                enabled = true
590
591                [sinks.sink0.buffer]
592                type = "memory"
593                max_events = 500
594                when_full = "block"
595            "#}
596            .to_string())
597        );
598
599        opts.expression = "/test_basic,test_basic,test_basic".to_string();
600        assert_eq!(
601            generate_example(&opts, TransformInputsStrategy::Auto),
602            Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
603
604                [transforms.transform0]
605                inputs = []
606                increase = 0.0
607                suffix = ""
608                type = "test_basic"
609
610                [transforms.transform1]
611                inputs = ["transform0"]
612                increase = 0.0
613                suffix = ""
614                type = "test_basic"
615
616                [transforms.transform2]
617                inputs = ["transform1"]
618                increase = 0.0
619                suffix = ""
620                type = "test_basic"
621            "#}
622            .to_string())
623        );
624
625        opts.fragment = true;
626        opts.expression = "/test_basic,test_basic,test_basic".to_string();
627        assert_eq!(
628            generate_example(&opts, TransformInputsStrategy::Auto),
629            Ok(indoc::indoc! {r#"
630                [transforms.transform0]
631                inputs = []
632                increase = 0.0
633                suffix = ""
634                type = "test_basic"
635
636                [transforms.transform1]
637                inputs = ["transform0"]
638                increase = 0.0
639                suffix = ""
640                type = "test_basic"
641
642                [transforms.transform2]
643                inputs = ["transform1"]
644                increase = 0.0
645                suffix = ""
646                type = "test_basic"
647            "#}
648            .to_string())
649        );
650    }
651
652    #[cfg(all(
653        feature = "sources-demo_logs",
654        feature = "transforms-remap",
655        feature = "sinks-console"
656    ))]
657    #[test]
658    fn generate_basic_yaml() {
659        let opts = Opts {
660            fragment: false,
661            expression: "demo_logs/remap/console".to_string(),
662            file: None,
663            format: Format::Yaml,
664        };
665
666        assert_eq!(
667            generate_example(&opts, TransformInputsStrategy::Auto).unwrap(),
668            indoc::indoc! {r"
669            data_dir: /var/lib/vector/
670            sources:
671              source0:
672                count: 9223372036854775807
673                decoding:
674                  codec: bytes
675                format: json
676                framing:
677                  method: bytes
678                interval: 1.0
679                type: demo_logs
680            transforms:
681              transform0:
682                inputs:
683                - source0
684                drop_on_abort: false
685                drop_on_error: false
686                metric_tag_values: single
687                reroute_dropped: false
688                runtime: ast
689                type: remap
690            sinks:
691              sink0:
692                inputs:
693                - transform0
694                encoding:
695                  codec: json
696                  json:
697                    pretty: false
698                target: stdout
699                type: console
700                healthcheck:
701                  enabled: true
702                  uri: null
703                buffer:
704                  type: memory
705                  max_events: 500
706                  when_full: block
707            "}
708        );
709    }
710
711    #[cfg(all(
712        feature = "sources-demo_logs",
713        feature = "transforms-remap",
714        feature = "sinks-console"
715    ))]
716    #[test]
717    fn generate_basic_json() {
718        let opts = Opts {
719            fragment: false,
720            expression: "demo_logs/remap/console".to_string(),
721            file: None,
722            format: Format::Json,
723        };
724
725        assert_eq!(
726            generate_example(&opts, TransformInputsStrategy::Auto).unwrap(),
727            indoc::indoc! {r#"
728            {
729              "data_dir": "/var/lib/vector/",
730              "sources": {
731                "source0": {
732                  "count": 9223372036854775807,
733                  "decoding": {
734                    "codec": "bytes"
735                  },
736                  "format": "json",
737                  "framing": {
738                    "method": "bytes"
739                  },
740                  "interval": 1.0,
741                  "type": "demo_logs"
742                }
743              },
744              "transforms": {
745                "transform0": {
746                  "inputs": [
747                    "source0"
748                  ],
749                  "drop_on_abort": false,
750                  "drop_on_error": false,
751                  "metric_tag_values": "single",
752                  "reroute_dropped": false,
753                  "runtime": "ast",
754                  "type": "remap"
755                }
756              },
757              "sinks": {
758                "sink0": {
759                  "inputs": [
760                    "transform0"
761                  ],
762                  "encoding": {
763                    "codec": "json",
764                    "json": {
765                      "pretty": false
766                    }
767                  },
768                  "target": "stdout",
769                  "type": "console",
770                  "healthcheck": {
771                    "enabled": true,
772                    "uri": null
773                  },
774                  "buffer": {
775                    "type": "memory",
776                    "max_events": 500,
777                    "when_full": "block"
778                  }
779                }
780              }
781            }"#}
782        );
783    }
784}