1#![allow(missing_docs)]
2use std::{
3 fs::{File, create_dir_all},
4 io::Write,
5 path::{Path, PathBuf},
6};
7
8use clap::Parser;
9use colored::*;
10use indexmap::IndexMap;
11use serde::Serialize;
12use toml::{Value, map::Map};
13use vector_lib::{
14 buffers::BufferConfig,
15 config::GlobalOptions,
16 configurable::component::{SinkDescription, SourceDescription, TransformDescription},
17 default_data_dir,
18};
19
20use crate::config::{Format, SinkHealthcheckOptions, format};
21
22#[derive(Parser, Debug)]
23#[command(rename_all = "kebab-case")]
24pub struct Opts {
25 #[arg(short, long)]
27 pub(crate) fragment: bool,
28
29 pub(crate) expression: String,
59
60 #[arg(long)]
62 pub(crate) file: Option<PathBuf>,
63
64 #[arg(long, default_value = "yaml")]
65 pub(crate) format: Format,
66}
67
68#[derive(Serialize)]
69pub struct SinkOuter {
70 pub inputs: Vec<String>,
71 #[serde(flatten)]
72 pub inner: Value,
73 pub healthcheck: SinkHealthcheckOptions,
74 pub buffer: BufferConfig,
75}
76
77#[derive(Serialize)]
78pub struct TransformOuter {
79 pub inputs: Vec<String>,
80 #[serde(flatten)]
81 pub inner: Value,
82}
83
84#[derive(Serialize, Default)]
85pub struct Config {
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub sources: Option<IndexMap<String, Value>>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub transforms: Option<IndexMap<String, TransformOuter>>,
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub sinks: Option<IndexMap<String, SinkOuter>>,
92}
93
94pub(crate) enum TransformInputsStrategy {
97 Auto,
102 #[cfg(test)]
106 #[allow(dead_code)]
107 All,
108}
109
110#[derive(Serialize, Default)]
111struct FullConfig {
112 #[serde(flatten)]
113 global_options: Option<GlobalOptions>,
114 #[serde(flatten)]
115 config: Config,
116}
117
118pub(crate) fn generate_example(
119 opts: &Opts,
120 transform_inputs_strategy: TransformInputsStrategy,
121) -> Result<String, Vec<String>> {
122 let components: Vec<Vec<_>> = opts
123 .expression
124 .split(['|', '/'])
125 .map(|s| {
126 s.split(',')
127 .map(|s| s.trim().to_string())
128 .filter(|s| !s.is_empty())
129 .collect()
130 })
131 .collect();
132
133 let mut config = Config::default();
134
135 let mut errs = Vec::new();
136
137 let mut source_names = Vec::new();
138 if let Some(source_types) = components.first() {
139 let mut sources = IndexMap::new();
140
141 for (i, source_expr) in source_types.iter().enumerate() {
142 let (name, source_type) = if let Some(c_index) = source_expr.find(':') {
143 if c_index == 0 {
144 errs.push(format!(
145 "failed to generate source '{source_expr}': empty name is not allowed"
146 ));
147 continue;
148 }
149 let mut chopped_expr = source_expr.clone();
150 (
151 chopped_expr.drain(..c_index).collect(),
152 chopped_expr.drain(1..).collect(),
153 )
154 } else {
155 (format!("source{i}"), source_expr.clone())
156 };
157 source_names.push(name.clone());
158
159 let mut example = match SourceDescription::example(&source_type) {
160 Ok(example) => example,
161 Err(err) => {
162 errs.push(format!("failed to generate source '{source_type}': {err}"));
163 Value::Table(Map::new())
164 }
165 };
166 example
167 .as_table_mut()
168 .expect("examples are always tables")
169 .insert("type".into(), source_type.to_owned().into());
170
171 sources.insert(name, example);
172 }
173
174 if !sources.is_empty() {
175 config.sources = Some(sources);
176 }
177 }
178
179 let mut transform_names = Vec::new();
180 if let Some(transform_types) = components.get(1) {
181 let mut transforms = IndexMap::new();
182
183 for (i, transform_expr) in transform_types.iter().enumerate() {
184 let (name, transform_type) = if let Some(c_index) = transform_expr.find(':') {
185 if c_index == 0 {
186 errs.push(format!(
187 "failed to generate transform '{transform_expr}': empty name is not allowed"
188 ));
189 continue;
190 }
191 let mut chopped_expr = transform_expr.clone();
192 (
193 chopped_expr.drain(..c_index).collect(),
194 chopped_expr.drain(1..).collect(),
195 )
196 } else {
197 (format!("transform{i}"), transform_expr.clone())
198 };
199 transform_names.push(name.clone());
200
201 let targets = match transform_inputs_strategy {
202 TransformInputsStrategy::Auto => {
203 if i == 0 {
204 source_names.clone()
205 } else {
206 vec![
207 transform_names
208 .get(i - 1)
209 .unwrap_or(&"component-id".to_owned())
210 .to_owned(),
211 ]
212 }
213 }
214 #[cfg(test)]
215 TransformInputsStrategy::All => source_names.clone(),
216 };
217
218 let mut example = match TransformDescription::example(&transform_type) {
219 Ok(example) => example,
220 Err(err) => {
221 errs.push(format!(
222 "failed to generate transform '{transform_type}': {err}"
223 ));
224 Value::Table(Map::new())
225 }
226 };
227 example
228 .as_table_mut()
229 .expect("examples are always tables")
230 .insert("type".into(), transform_type.to_owned().into());
231
232 transforms.insert(
233 name,
234 TransformOuter {
235 inputs: targets,
236 inner: example,
237 },
238 );
239 }
240
241 if !transforms.is_empty() {
242 config.transforms = Some(transforms);
243 }
244 }
245
246 if let Some(sink_types) = components.get(2) {
247 let mut sinks = IndexMap::new();
248
249 for (i, sink_expr) in sink_types.iter().enumerate() {
250 let (name, sink_type) = if let Some(c_index) = sink_expr.find(':') {
251 if c_index == 0 {
252 errs.push(format!(
253 "failed to generate sink '{sink_expr}': empty name is not allowed"
254 ));
255 continue;
256 }
257 let mut chopped_expr = sink_expr.clone();
258 (
259 chopped_expr.drain(..c_index).collect(),
260 chopped_expr.drain(1..).collect(),
261 )
262 } else {
263 (format!("sink{i}"), sink_expr.clone())
264 };
265
266 let mut example = match SinkDescription::example(&sink_type) {
267 Ok(example) => example,
268 Err(err) => {
269 errs.push(format!("failed to generate sink '{sink_type}': {err}"));
270 Value::Table(Map::new())
271 }
272 };
273 example
274 .as_table_mut()
275 .expect("examples are always tables")
276 .insert("type".into(), sink_type.to_owned().into());
277
278 sinks.insert(
279 name,
280 SinkOuter {
281 inputs: transform_names
282 .last()
283 .map(|s| vec![s.to_owned()])
284 .or_else(|| {
285 if !source_names.is_empty() {
286 Some(source_names.clone())
287 } else {
288 None
289 }
290 })
291 .unwrap_or_else(|| vec!["component-id".to_owned()]),
292 buffer: BufferConfig::default(),
293 healthcheck: SinkHealthcheckOptions::default(),
294 inner: example,
295 },
296 );
297 }
298
299 if !sinks.is_empty() {
300 config.sinks = Some(sinks);
301 }
302 }
303
304 if !errs.is_empty() {
305 return Err(errs);
306 }
307
308 let full_config = FullConfig {
309 global_options: if !opts.fragment {
310 Some(GlobalOptions {
311 data_dir: default_data_dir(),
312 ..Default::default()
313 })
314 } else {
315 None
316 },
317 config,
318 };
319
320 let builder = match format::serialize(&full_config, opts.format) {
321 Ok(v) => v,
322 Err(e) => {
323 errs.push(format!("failed to marshal sources: {e}"));
324 return Err(errs);
325 }
326 };
327
328 let file = opts.file.as_ref();
329 if let Some(path) = file {
330 match write_config(path, &builder) {
331 #[allow(clippy::print_stdout)]
332 Ok(_) => {
333 println!(
334 "Config file written to {:?}",
335 &file.as_ref().unwrap().join("\n")
336 )
337 }
338 Err(e) => errs.push(format!("failed to write to file: {e}")),
339 };
340 };
341
342 if !errs.is_empty() {
343 Err(errs)
344 } else {
345 Ok(builder)
346 }
347}
348
349pub fn cmd(opts: &Opts) -> exitcode::ExitCode {
350 match generate_example(opts, TransformInputsStrategy::Auto) {
351 Ok(s) => {
352 #[allow(clippy::print_stdout)]
353 {
354 println!("{s}");
355 }
356 exitcode::OK
357 }
358 Err(errs) => {
359 #[allow(clippy::print_stderr)]
360 {
361 errs.iter().for_each(|e| eprintln!("{}", e.red()));
362 }
363 exitcode::SOFTWARE
364 }
365 }
366}
367
368fn write_config(filepath: &Path, body: &str) -> Result<(), crate::Error> {
369 if filepath.exists() {
370 Err(format!("{:?} already exists", &filepath).into())
372 } else {
373 if let Some(directory) = filepath.parent() {
374 create_dir_all(directory)?;
375 }
376 File::create(filepath)
377 .and_then(|mut file| file.write_all(body.as_bytes()))
378 .map_err(Into::into)
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use rstest::rstest;
385
386 use super::*;
387 use crate::config::ConfigBuilder;
388
389 fn generate_and_deserialize(expression: String, format: Format) {
390 let opts = Opts {
391 fragment: false,
392 expression,
393 file: None,
394 format,
395 };
396 let cfg_string = generate_example(&opts, TransformInputsStrategy::Auto).unwrap();
397 if let Err(error) = format::deserialize::<ConfigBuilder>(&cfg_string, opts.format) {
398 panic!(
399 "Failed to generate example for {} with error: {error:?})",
400 opts.expression
401 );
402 }
403 }
404
405 #[rstest]
406 #[case(Format::Toml)]
407 #[case(Format::Json)]
408 #[case(Format::Yaml)]
409 #[test]
410 fn generate_all(#[case] format: Format) {
411 for name in SourceDescription::types() {
412 generate_and_deserialize(format!("{name}//"), format);
413 }
414
415 for name in TransformDescription::types() {
416 generate_and_deserialize(format!("/{name}/"), format);
417 }
418
419 for name in SinkDescription::types() {
420 generate_and_deserialize(format!("//{name}"), format);
421 }
422 }
423
424 #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))]
425 #[test]
426 fn generate_configfile() {
427 use std::fs;
428
429 use tempfile::tempdir;
430
431 let tempdir = tempdir().expect("Unable to create tempdir for config");
432 let filepath = tempdir.path().join("./config.example.toml");
433 let opts = Opts {
434 fragment: false,
435 expression: "stdin/test_basic/console".to_string(),
436 file: Some(filepath.clone()),
437 format: Format::Toml,
438 };
439
440 let cfg = generate_example(&opts, TransformInputsStrategy::Auto);
441 let filecontents = fs::read_to_string(
442 fs::canonicalize(&filepath).expect("Could not return canonicalized filepath"),
443 )
444 .expect("Could not read config file");
445 fs::remove_file(filepath).expect("Could not cleanup config file!");
446 assert_eq!(cfg.unwrap(), filecontents)
447 }
448
449 #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))]
450 #[test]
451 fn generate_basic_toml() {
452 let mut opts = Opts {
453 fragment: false,
454 expression: "stdin/test_basic/console".to_string(),
455 file: None,
456 format: Format::Toml,
457 };
458
459 assert_eq!(
460 generate_example(&opts, TransformInputsStrategy::Auto),
461 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
462
463 [sources.source0]
464 max_length = 102400
465 type = "stdin"
466
467 [sources.source0.decoding]
468 codec = "bytes"
469
470 [transforms.transform0]
471 inputs = ["source0"]
472 increase = 0.0
473 suffix = ""
474 type = "test_basic"
475
476 [sinks.sink0]
477 inputs = ["transform0"]
478 target = "stdout"
479 type = "console"
480
481 [sinks.sink0.encoding]
482 codec = "json"
483
484 [sinks.sink0.encoding.json]
485 pretty = false
486
487 [sinks.sink0.healthcheck]
488 enabled = true
489
490 [sinks.sink0.buffer]
491 type = "memory"
492 max_events = 500
493 when_full = "block"
494 "#}
495 .to_string())
496 );
497
498 opts.expression = "stdin|test_basic|console".to_string();
499 assert_eq!(
500 generate_example(&opts, TransformInputsStrategy::Auto),
501 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
502
503 [sources.source0]
504 max_length = 102400
505 type = "stdin"
506
507 [sources.source0.decoding]
508 codec = "bytes"
509
510 [transforms.transform0]
511 inputs = ["source0"]
512 increase = 0.0
513 suffix = ""
514 type = "test_basic"
515
516 [sinks.sink0]
517 inputs = ["transform0"]
518 target = "stdout"
519 type = "console"
520
521 [sinks.sink0.encoding]
522 codec = "json"
523
524 [sinks.sink0.encoding.json]
525 pretty = false
526
527 [sinks.sink0.healthcheck]
528 enabled = true
529
530 [sinks.sink0.buffer]
531 type = "memory"
532 max_events = 500
533 when_full = "block"
534 "#}
535 .to_string())
536 );
537
538 opts.expression = "stdin//console".to_string();
539 assert_eq!(
540 generate_example(&opts, TransformInputsStrategy::Auto),
541 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
542
543 [sources.source0]
544 max_length = 102400
545 type = "stdin"
546
547 [sources.source0.decoding]
548 codec = "bytes"
549
550 [sinks.sink0]
551 inputs = ["source0"]
552 target = "stdout"
553 type = "console"
554
555 [sinks.sink0.encoding]
556 codec = "json"
557
558 [sinks.sink0.encoding.json]
559 pretty = false
560
561 [sinks.sink0.healthcheck]
562 enabled = true
563
564 [sinks.sink0.buffer]
565 type = "memory"
566 max_events = 500
567 when_full = "block"
568 "#}
569 .to_string())
570 );
571
572 opts.expression = "//console".to_string();
573 assert_eq!(
574 generate_example(&opts, TransformInputsStrategy::Auto),
575 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
576
577 [sinks.sink0]
578 inputs = ["component-id"]
579 target = "stdout"
580 type = "console"
581
582 [sinks.sink0.encoding]
583 codec = "json"
584
585 [sinks.sink0.encoding.json]
586 pretty = false
587
588 [sinks.sink0.healthcheck]
589 enabled = true
590
591 [sinks.sink0.buffer]
592 type = "memory"
593 max_events = 500
594 when_full = "block"
595 "#}
596 .to_string())
597 );
598
599 opts.expression = "/test_basic,test_basic,test_basic".to_string();
600 assert_eq!(
601 generate_example(&opts, TransformInputsStrategy::Auto),
602 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
603
604 [transforms.transform0]
605 inputs = []
606 increase = 0.0
607 suffix = ""
608 type = "test_basic"
609
610 [transforms.transform1]
611 inputs = ["transform0"]
612 increase = 0.0
613 suffix = ""
614 type = "test_basic"
615
616 [transforms.transform2]
617 inputs = ["transform1"]
618 increase = 0.0
619 suffix = ""
620 type = "test_basic"
621 "#}
622 .to_string())
623 );
624
625 opts.fragment = true;
626 opts.expression = "/test_basic,test_basic,test_basic".to_string();
627 assert_eq!(
628 generate_example(&opts, TransformInputsStrategy::Auto),
629 Ok(indoc::indoc! {r#"
630 [transforms.transform0]
631 inputs = []
632 increase = 0.0
633 suffix = ""
634 type = "test_basic"
635
636 [transforms.transform1]
637 inputs = ["transform0"]
638 increase = 0.0
639 suffix = ""
640 type = "test_basic"
641
642 [transforms.transform2]
643 inputs = ["transform1"]
644 increase = 0.0
645 suffix = ""
646 type = "test_basic"
647 "#}
648 .to_string())
649 );
650 }
651
652 #[cfg(all(
653 feature = "sources-demo_logs",
654 feature = "transforms-remap",
655 feature = "sinks-console"
656 ))]
657 #[test]
658 fn generate_basic_yaml() {
659 let opts = Opts {
660 fragment: false,
661 expression: "demo_logs/remap/console".to_string(),
662 file: None,
663 format: Format::Yaml,
664 };
665
666 assert_eq!(
667 generate_example(&opts, TransformInputsStrategy::Auto).unwrap(),
668 indoc::indoc! {r"
669 data_dir: /var/lib/vector/
670 sources:
671 source0:
672 count: 9223372036854775807
673 decoding:
674 codec: bytes
675 format: json
676 framing:
677 method: bytes
678 interval: 1.0
679 type: demo_logs
680 transforms:
681 transform0:
682 inputs:
683 - source0
684 drop_on_abort: false
685 drop_on_error: false
686 metric_tag_values: single
687 reroute_dropped: false
688 runtime: ast
689 type: remap
690 sinks:
691 sink0:
692 inputs:
693 - transform0
694 encoding:
695 codec: json
696 json:
697 pretty: false
698 target: stdout
699 type: console
700 healthcheck:
701 enabled: true
702 uri: null
703 buffer:
704 type: memory
705 max_events: 500
706 when_full: block
707 "}
708 );
709 }
710
711 #[cfg(all(
712 feature = "sources-demo_logs",
713 feature = "transforms-remap",
714 feature = "sinks-console"
715 ))]
716 #[test]
717 fn generate_basic_json() {
718 let opts = Opts {
719 fragment: false,
720 expression: "demo_logs/remap/console".to_string(),
721 file: None,
722 format: Format::Json,
723 };
724
725 assert_eq!(
726 generate_example(&opts, TransformInputsStrategy::Auto).unwrap(),
727 indoc::indoc! {r#"
728 {
729 "data_dir": "/var/lib/vector/",
730 "sources": {
731 "source0": {
732 "count": 9223372036854775807,
733 "decoding": {
734 "codec": "bytes"
735 },
736 "format": "json",
737 "framing": {
738 "method": "bytes"
739 },
740 "interval": 1.0,
741 "type": "demo_logs"
742 }
743 },
744 "transforms": {
745 "transform0": {
746 "inputs": [
747 "source0"
748 ],
749 "drop_on_abort": false,
750 "drop_on_error": false,
751 "metric_tag_values": "single",
752 "reroute_dropped": false,
753 "runtime": "ast",
754 "type": "remap"
755 }
756 },
757 "sinks": {
758 "sink0": {
759 "inputs": [
760 "transform0"
761 ],
762 "encoding": {
763 "codec": "json",
764 "json": {
765 "pretty": false
766 }
767 },
768 "target": "stdout",
769 "type": "console",
770 "healthcheck": {
771 "enabled": true,
772 "uri": null
773 },
774 "buffer": {
775 "type": "memory",
776 "max_events": 500,
777 "when_full": "block"
778 }
779 }
780 }
781 }"#}
782 );
783 }
784}