1#![allow(missing_docs)]
2use std::{
3 fs::{create_dir_all, File},
4 io::Write,
5 path::{Path, PathBuf},
6};
7
8use clap::Parser;
9use colored::*;
10use indexmap::IndexMap;
11use serde::Serialize;
12use toml::{map::Map, Value};
13use vector_lib::configurable::component::{
14 SinkDescription, SourceDescription, TransformDescription,
15};
16use vector_lib::{buffers::BufferConfig, config::GlobalOptions, default_data_dir};
17
18use crate::config::{format, Format, SinkHealthcheckOptions};
19
20#[derive(Parser, Debug)]
21#[command(rename_all = "kebab-case")]
22pub struct Opts {
23 #[arg(short, long)]
25 pub(crate) fragment: bool,
26
27 pub(crate) expression: String,
57
58 #[arg(long)]
60 pub(crate) file: Option<PathBuf>,
61
62 #[arg(long, default_value = "yaml")]
63 pub(crate) format: Format,
64}
65
66#[derive(Serialize)]
67pub struct SinkOuter {
68 pub inputs: Vec<String>,
69 #[serde(flatten)]
70 pub inner: Value,
71 pub healthcheck: SinkHealthcheckOptions,
72 pub buffer: BufferConfig,
73}
74
75#[derive(Serialize)]
76pub struct TransformOuter {
77 pub inputs: Vec<String>,
78 #[serde(flatten)]
79 pub inner: Value,
80}
81
82#[derive(Serialize, Default)]
83pub struct Config {
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub sources: Option<IndexMap<String, Value>>,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub transforms: Option<IndexMap<String, TransformOuter>>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub sinks: Option<IndexMap<String, SinkOuter>>,
90}
91
92pub(crate) enum TransformInputsStrategy {
95 Auto,
100 #[cfg(test)]
104 #[allow(dead_code)]
105 All,
106}
107
108#[derive(Serialize, Default)]
109struct FullConfig {
110 #[serde(flatten)]
111 global_options: Option<GlobalOptions>,
112 #[serde(flatten)]
113 config: Config,
114}
115
116pub(crate) fn generate_example(
117 opts: &Opts,
118 transform_inputs_strategy: TransformInputsStrategy,
119) -> Result<String, Vec<String>> {
120 let components: Vec<Vec<_>> = opts
121 .expression
122 .split(['|', '/'])
123 .map(|s| {
124 s.split(',')
125 .map(|s| s.trim().to_string())
126 .filter(|s| !s.is_empty())
127 .collect()
128 })
129 .collect();
130
131 let mut config = Config::default();
132
133 let mut errs = Vec::new();
134
135 let mut source_names = Vec::new();
136 if let Some(source_types) = components.first() {
137 let mut sources = IndexMap::new();
138
139 for (i, source_expr) in source_types.iter().enumerate() {
140 let (name, source_type) = if let Some(c_index) = source_expr.find(':') {
141 if c_index == 0 {
142 errs.push(format!(
143 "failed to generate source '{source_expr}': empty name is not allowed"
144 ));
145 continue;
146 }
147 let mut chopped_expr = source_expr.clone();
148 (
149 chopped_expr.drain(..c_index).collect(),
150 chopped_expr.drain(1..).collect(),
151 )
152 } else {
153 (format!("source{i}"), source_expr.clone())
154 };
155 source_names.push(name.clone());
156
157 let mut example = match SourceDescription::example(&source_type) {
158 Ok(example) => example,
159 Err(err) => {
160 errs.push(format!("failed to generate source '{source_type}': {err}"));
161 Value::Table(Map::new())
162 }
163 };
164 example
165 .as_table_mut()
166 .expect("examples are always tables")
167 .insert("type".into(), source_type.to_owned().into());
168
169 sources.insert(name, example);
170 }
171
172 if !sources.is_empty() {
173 config.sources = Some(sources);
174 }
175 }
176
177 let mut transform_names = Vec::new();
178 if let Some(transform_types) = components.get(1) {
179 let mut transforms = IndexMap::new();
180
181 for (i, transform_expr) in transform_types.iter().enumerate() {
182 let (name, transform_type) = if let Some(c_index) = transform_expr.find(':') {
183 if c_index == 0 {
184 errs.push(format!(
185 "failed to generate transform '{transform_expr}': empty name is not allowed"
186 ));
187 continue;
188 }
189 let mut chopped_expr = transform_expr.clone();
190 (
191 chopped_expr.drain(..c_index).collect(),
192 chopped_expr.drain(1..).collect(),
193 )
194 } else {
195 (format!("transform{i}"), transform_expr.clone())
196 };
197 transform_names.push(name.clone());
198
199 let targets = match transform_inputs_strategy {
200 TransformInputsStrategy::Auto => {
201 if i == 0 {
202 source_names.clone()
203 } else {
204 vec![transform_names
205 .get(i - 1)
206 .unwrap_or(&"component-id".to_owned())
207 .to_owned()]
208 }
209 }
210 #[cfg(test)]
211 TransformInputsStrategy::All => source_names.clone(),
212 };
213
214 let mut example = match TransformDescription::example(&transform_type) {
215 Ok(example) => example,
216 Err(err) => {
217 errs.push(format!(
218 "failed to generate transform '{transform_type}': {err}"
219 ));
220 Value::Table(Map::new())
221 }
222 };
223 example
224 .as_table_mut()
225 .expect("examples are always tables")
226 .insert("type".into(), transform_type.to_owned().into());
227
228 transforms.insert(
229 name,
230 TransformOuter {
231 inputs: targets,
232 inner: example,
233 },
234 );
235 }
236
237 if !transforms.is_empty() {
238 config.transforms = Some(transforms);
239 }
240 }
241
242 if let Some(sink_types) = components.get(2) {
243 let mut sinks = IndexMap::new();
244
245 for (i, sink_expr) in sink_types.iter().enumerate() {
246 let (name, sink_type) = if let Some(c_index) = sink_expr.find(':') {
247 if c_index == 0 {
248 errs.push(format!(
249 "failed to generate sink '{sink_expr}': empty name is not allowed"
250 ));
251 continue;
252 }
253 let mut chopped_expr = sink_expr.clone();
254 (
255 chopped_expr.drain(..c_index).collect(),
256 chopped_expr.drain(1..).collect(),
257 )
258 } else {
259 (format!("sink{i}"), sink_expr.clone())
260 };
261
262 let mut example = match SinkDescription::example(&sink_type) {
263 Ok(example) => example,
264 Err(err) => {
265 errs.push(format!("failed to generate sink '{sink_type}': {err}"));
266 Value::Table(Map::new())
267 }
268 };
269 example
270 .as_table_mut()
271 .expect("examples are always tables")
272 .insert("type".into(), sink_type.to_owned().into());
273
274 sinks.insert(
275 name,
276 SinkOuter {
277 inputs: transform_names
278 .last()
279 .map(|s| vec![s.to_owned()])
280 .or_else(|| {
281 if !source_names.is_empty() {
282 Some(source_names.clone())
283 } else {
284 None
285 }
286 })
287 .unwrap_or_else(|| vec!["component-id".to_owned()]),
288 buffer: BufferConfig::default(),
289 healthcheck: SinkHealthcheckOptions::default(),
290 inner: example,
291 },
292 );
293 }
294
295 if !sinks.is_empty() {
296 config.sinks = Some(sinks);
297 }
298 }
299
300 if !errs.is_empty() {
301 return Err(errs);
302 }
303
304 let full_config = FullConfig {
305 global_options: if !opts.fragment {
306 Some(GlobalOptions {
307 data_dir: default_data_dir(),
308 ..Default::default()
309 })
310 } else {
311 None
312 },
313 config,
314 };
315
316 let builder = match format::serialize(&full_config, opts.format) {
317 Ok(v) => v,
318 Err(e) => {
319 errs.push(format!("failed to marshal sources: {e}"));
320 return Err(errs);
321 }
322 };
323
324 let file = opts.file.as_ref();
325 if file.is_some() {
326 #[allow(clippy::print_stdout)]
327 match write_config(file.as_ref().unwrap(), &builder) {
328 Ok(_) => {
329 println!(
330 "Config file written to {:?}",
331 &file.as_ref().unwrap().join("\n")
332 )
333 }
334 Err(e) => errs.push(format!("failed to write to file: {e}")),
335 };
336 };
337
338 if !errs.is_empty() {
339 Err(errs)
340 } else {
341 Ok(builder)
342 }
343}
344
345pub fn cmd(opts: &Opts) -> exitcode::ExitCode {
346 match generate_example(opts, TransformInputsStrategy::Auto) {
347 Ok(s) => {
348 #[allow(clippy::print_stdout)]
349 {
350 println!("{s}");
351 }
352 exitcode::OK
353 }
354 Err(errs) => {
355 #[allow(clippy::print_stderr)]
356 {
357 errs.iter().for_each(|e| eprintln!("{}", e.red()));
358 }
359 exitcode::SOFTWARE
360 }
361 }
362}
363
364fn write_config(filepath: &Path, body: &str) -> Result<(), crate::Error> {
365 if filepath.exists() {
366 Err(format!("{:?} already exists", &filepath).into())
368 } else {
369 if let Some(directory) = filepath.parent() {
370 create_dir_all(directory)?;
371 }
372 File::create(filepath)
373 .and_then(|mut file| file.write_all(body.as_bytes()))
374 .map_err(Into::into)
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use crate::config::ConfigBuilder;
382 use rstest::rstest;
383
384 fn generate_and_deserialize(expression: String, format: Format) {
385 let opts = Opts {
386 fragment: false,
387 expression,
388 file: None,
389 format,
390 };
391 let cfg_string = generate_example(&opts, TransformInputsStrategy::Auto).unwrap();
392 if let Err(error) = format::deserialize::<ConfigBuilder>(&cfg_string, opts.format) {
393 panic!(
394 "Failed to generate example for {} with error: {error:?})",
395 opts.expression
396 );
397 }
398 }
399
400 #[rstest]
401 #[case(Format::Toml)]
402 #[case(Format::Json)]
403 #[case(Format::Yaml)]
404 #[test]
405 fn generate_all(#[case] format: Format) {
406 for name in SourceDescription::types() {
407 generate_and_deserialize(format!("{name}//"), format);
408 }
409
410 for name in TransformDescription::types() {
411 generate_and_deserialize(format!("/{name}/"), format);
412 }
413
414 for name in SinkDescription::types() {
415 generate_and_deserialize(format!("//{name}"), format);
416 }
417 }
418
419 #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))]
420 #[test]
421 fn generate_configfile() {
422 use std::fs;
423
424 use tempfile::tempdir;
425
426 let tempdir = tempdir().expect("Unable to create tempdir for config");
427 let filepath = tempdir.path().join("./config.example.toml");
428 let opts = Opts {
429 fragment: false,
430 expression: "stdin/test_basic/console".to_string(),
431 file: Some(filepath.clone()),
432 format: Format::Toml,
433 };
434
435 let cfg = generate_example(&opts, TransformInputsStrategy::Auto);
436 let filecontents = fs::read_to_string(
437 fs::canonicalize(&filepath).expect("Could not return canonicalized filepath"),
438 )
439 .expect("Could not read config file");
440 fs::remove_file(filepath).expect("Could not cleanup config file!");
441 assert_eq!(cfg.unwrap(), filecontents)
442 }
443
444 #[cfg(all(feature = "sources-stdin", feature = "sinks-console"))]
445 #[test]
446 fn generate_basic_toml() {
447 let mut opts = Opts {
448 fragment: false,
449 expression: "stdin/test_basic/console".to_string(),
450 file: None,
451 format: Format::Toml,
452 };
453
454 assert_eq!(
455 generate_example(&opts, TransformInputsStrategy::Auto),
456 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
457
458 [sources.source0]
459 max_length = 102400
460 type = "stdin"
461
462 [sources.source0.decoding]
463 codec = "bytes"
464
465 [transforms.transform0]
466 inputs = ["source0"]
467 increase = 0.0
468 suffix = ""
469 type = "test_basic"
470
471 [sinks.sink0]
472 inputs = ["transform0"]
473 target = "stdout"
474 type = "console"
475
476 [sinks.sink0.encoding]
477 codec = "json"
478
479 [sinks.sink0.encoding.json]
480 pretty = false
481
482 [sinks.sink0.healthcheck]
483 enabled = true
484
485 [sinks.sink0.buffer]
486 type = "memory"
487 max_events = 500
488 when_full = "block"
489 "#}
490 .to_string())
491 );
492
493 opts.expression = "stdin|test_basic|console".to_string();
494 assert_eq!(
495 generate_example(&opts, TransformInputsStrategy::Auto),
496 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
497
498 [sources.source0]
499 max_length = 102400
500 type = "stdin"
501
502 [sources.source0.decoding]
503 codec = "bytes"
504
505 [transforms.transform0]
506 inputs = ["source0"]
507 increase = 0.0
508 suffix = ""
509 type = "test_basic"
510
511 [sinks.sink0]
512 inputs = ["transform0"]
513 target = "stdout"
514 type = "console"
515
516 [sinks.sink0.encoding]
517 codec = "json"
518
519 [sinks.sink0.encoding.json]
520 pretty = false
521
522 [sinks.sink0.healthcheck]
523 enabled = true
524
525 [sinks.sink0.buffer]
526 type = "memory"
527 max_events = 500
528 when_full = "block"
529 "#}
530 .to_string())
531 );
532
533 opts.expression = "stdin//console".to_string();
534 assert_eq!(
535 generate_example(&opts, TransformInputsStrategy::Auto),
536 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
537
538 [sources.source0]
539 max_length = 102400
540 type = "stdin"
541
542 [sources.source0.decoding]
543 codec = "bytes"
544
545 [sinks.sink0]
546 inputs = ["source0"]
547 target = "stdout"
548 type = "console"
549
550 [sinks.sink0.encoding]
551 codec = "json"
552
553 [sinks.sink0.encoding.json]
554 pretty = false
555
556 [sinks.sink0.healthcheck]
557 enabled = true
558
559 [sinks.sink0.buffer]
560 type = "memory"
561 max_events = 500
562 when_full = "block"
563 "#}
564 .to_string())
565 );
566
567 opts.expression = "//console".to_string();
568 assert_eq!(
569 generate_example(&opts, TransformInputsStrategy::Auto),
570 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
571
572 [sinks.sink0]
573 inputs = ["component-id"]
574 target = "stdout"
575 type = "console"
576
577 [sinks.sink0.encoding]
578 codec = "json"
579
580 [sinks.sink0.encoding.json]
581 pretty = false
582
583 [sinks.sink0.healthcheck]
584 enabled = true
585
586 [sinks.sink0.buffer]
587 type = "memory"
588 max_events = 500
589 when_full = "block"
590 "#}
591 .to_string())
592 );
593
594 opts.expression = "/test_basic,test_basic,test_basic".to_string();
595 assert_eq!(
596 generate_example(&opts, TransformInputsStrategy::Auto),
597 Ok(indoc::indoc! {r#"data_dir = "/var/lib/vector/"
598
599 [transforms.transform0]
600 inputs = []
601 increase = 0.0
602 suffix = ""
603 type = "test_basic"
604
605 [transforms.transform1]
606 inputs = ["transform0"]
607 increase = 0.0
608 suffix = ""
609 type = "test_basic"
610
611 [transforms.transform2]
612 inputs = ["transform1"]
613 increase = 0.0
614 suffix = ""
615 type = "test_basic"
616 "#}
617 .to_string())
618 );
619
620 opts.fragment = true;
621 opts.expression = "/test_basic,test_basic,test_basic".to_string();
622 assert_eq!(
623 generate_example(&opts, TransformInputsStrategy::Auto),
624 Ok(indoc::indoc! {r#"
625 [transforms.transform0]
626 inputs = []
627 increase = 0.0
628 suffix = ""
629 type = "test_basic"
630
631 [transforms.transform1]
632 inputs = ["transform0"]
633 increase = 0.0
634 suffix = ""
635 type = "test_basic"
636
637 [transforms.transform2]
638 inputs = ["transform1"]
639 increase = 0.0
640 suffix = ""
641 type = "test_basic"
642 "#}
643 .to_string())
644 );
645 }
646
647 #[cfg(all(
648 feature = "sources-demo_logs",
649 feature = "transforms-remap",
650 feature = "sinks-console"
651 ))]
652 #[test]
653 fn generate_basic_yaml() {
654 let opts = Opts {
655 fragment: false,
656 expression: "demo_logs/remap/console".to_string(),
657 file: None,
658 format: Format::Yaml,
659 };
660
661 assert_eq!(
662 generate_example(&opts, TransformInputsStrategy::Auto).unwrap(),
663 indoc::indoc! {r"
664 data_dir: /var/lib/vector/
665 sources:
666 source0:
667 count: 9223372036854775807
668 decoding:
669 codec: bytes
670 format: json
671 framing:
672 method: bytes
673 interval: 1.0
674 type: demo_logs
675 transforms:
676 transform0:
677 inputs:
678 - source0
679 drop_on_abort: false
680 drop_on_error: false
681 metric_tag_values: single
682 reroute_dropped: false
683 runtime: ast
684 type: remap
685 sinks:
686 sink0:
687 inputs:
688 - transform0
689 encoding:
690 codec: json
691 json:
692 pretty: false
693 target: stdout
694 type: console
695 healthcheck:
696 enabled: true
697 uri: null
698 buffer:
699 type: memory
700 max_events: 500
701 when_full: block
702 "}
703 );
704 }
705
706 #[cfg(all(
707 feature = "sources-demo_logs",
708 feature = "transforms-remap",
709 feature = "sinks-console"
710 ))]
711 #[test]
712 fn generate_basic_json() {
713 let opts = Opts {
714 fragment: false,
715 expression: "demo_logs/remap/console".to_string(),
716 file: None,
717 format: Format::Json,
718 };
719
720 assert_eq!(
721 generate_example(&opts, TransformInputsStrategy::Auto).unwrap(),
722 indoc::indoc! {r#"
723 {
724 "data_dir": "/var/lib/vector/",
725 "sources": {
726 "source0": {
727 "count": 9223372036854775807,
728 "decoding": {
729 "codec": "bytes"
730 },
731 "format": "json",
732 "framing": {
733 "method": "bytes"
734 },
735 "interval": 1.0,
736 "type": "demo_logs"
737 }
738 },
739 "transforms": {
740 "transform0": {
741 "inputs": [
742 "source0"
743 ],
744 "drop_on_abort": false,
745 "drop_on_error": false,
746 "metric_tag_values": "single",
747 "reroute_dropped": false,
748 "runtime": "ast",
749 "type": "remap"
750 }
751 },
752 "sinks": {
753 "sink0": {
754 "inputs": [
755 "transform0"
756 ],
757 "encoding": {
758 "codec": "json",
759 "json": {
760 "pretty": false
761 }
762 },
763 "target": "stdout",
764 "type": "console",
765 "healthcheck": {
766 "enabled": true,
767 "uri": null
768 },
769 "buffer": {
770 "type": "memory",
771 "max_events": 500,
772 "when_full": "block"
773 }
774 }
775 }
776 }"#}
777 );
778 }
779}