1use std::{
2 collections::{BTreeMap, HashMap},
3 fs::File,
4 io::{self, Read},
5 path::PathBuf,
6 sync::Mutex,
7};
8
9use snafu::{ResultExt, Snafu};
10use vector_lib::{
11 TimeZone,
12 codecs::MetricTagValues,
13 compile_vrl,
14 config::LogNamespace,
15 configurable::configurable_component,
16 enrichment::TableRegistry,
17 lookup::{PathPrefix, metadata_path, owned_value_path},
18 schema::Definition,
19};
20use vector_vrl_functions::set_semantic_meaning::MeaningList;
21use vrl::{
22 compiler::{
23 CompileConfig, ExpressionError, Program, TypeState, VrlRuntime,
24 runtime::{Runtime, Terminate},
25 state::ExternalEnv,
26 },
27 diagnostic::{DiagnosticMessage, Note},
28 path,
29 path::ValuePath,
30 value::{Kind, Value},
31};
32
33use crate::{
34 Result,
35 config::{
36 ComponentKey, DataType, Input, OutputId, TransformConfig, TransformContext,
37 TransformOutput, log_schema,
38 },
39 event::{Event, TargetEvents, VrlTarget},
40 format_vrl_diagnostics,
41 internal_events::{RemapMappingAbort, RemapMappingError},
42 schema,
43 transforms::{SyncTransform, Transform, TransformOutputsBuf},
44};
45
46const DROPPED: &str = "dropped";
47type CacheKey = (TableRegistry, schema::Definition);
48type CacheValue = (Program, String, MeaningList);
49
50#[configurable_component(transform(
52 "remap",
53 "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
54))]
55#[derive(Derivative)]
56#[serde(deny_unknown_fields)]
57#[derivative(Default, Debug)]
58pub struct RemapConfig {
59 #[configurable(metadata(
65 docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
66 docs::syntax_override = "remap_program"
67 ))]
68 pub source: Option<String>,
69
70 #[configurable(metadata(docs::examples = "./my/program.vrl"))]
78 pub file: Option<PathBuf>,
79
80 #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
88 pub files: Option<Vec<PathBuf>>,
89
90 #[serde(default)]
97 pub metric_tag_values: MetricTagValues,
98
99 #[serde(default)]
108 #[configurable(metadata(docs::advanced))]
109 pub timezone: Option<TimeZone>,
110
111 #[serde(default = "crate::serde::default_false")]
122 #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
123 pub drop_on_error: bool,
124
125 #[serde(default = "crate::serde::default_true")]
136 #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
137 pub drop_on_abort: bool,
138
139 #[serde(default = "crate::serde::default_false")]
149 #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
150 pub reroute_dropped: bool,
151
152 #[configurable(derived, metadata(docs::hidden))]
153 #[serde(default)]
154 pub runtime: VrlRuntime,
155
156 #[configurable(derived, metadata(docs::hidden))]
157 #[serde(skip)]
158 #[derivative(Debug = "ignore")]
159 pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
163}
164
165impl Clone for RemapConfig {
166 fn clone(&self) -> Self {
167 Self {
168 source: self.source.clone(),
169 file: self.file.clone(),
170 files: self.files.clone(),
171 metric_tag_values: self.metric_tag_values,
172 timezone: self.timezone,
173 drop_on_error: self.drop_on_error,
174 drop_on_abort: self.drop_on_abort,
175 reroute_dropped: self.reroute_dropped,
176 runtime: self.runtime,
177 cache: Mutex::new(Default::default()),
178 }
179 }
180}
181
182impl RemapConfig {
183 fn compile_vrl_program(
184 &self,
185 enrichment_tables: TableRegistry,
186 merged_schema_definition: schema::Definition,
187 ) -> Result<(Program, String, MeaningList)> {
188 if let Some((_, res)) = self
189 .cache
190 .lock()
191 .expect("Data poisoned")
192 .iter()
193 .find(|v| v.0.0 == enrichment_tables && v.0.1 == merged_schema_definition)
194 {
195 return res.clone().map_err(Into::into);
196 }
197
198 let source = match (&self.source, &self.file, &self.files) {
199 (Some(source), None, None) => source.to_owned(),
200 (None, Some(path), None) => Self::read_file(path)?,
201 (None, None, Some(paths)) => {
202 let mut combined_source = String::new();
203 for path in paths {
204 let content = Self::read_file(path)?;
205 combined_source.push_str(&content);
206 combined_source.push('\n');
207 }
208 combined_source
209 }
210 _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
211 };
212
213 let mut functions = vrl::stdlib::all();
214 functions.append(&mut vector_lib::enrichment::vrl_functions());
215 #[cfg(feature = "sources-dnstap")]
216 functions.append(&mut dnstap_parser::vrl_functions());
217 functions.append(&mut vector_vrl_functions::all());
218
219 let state = TypeState {
220 local: Default::default(),
221 external: ExternalEnv::new_with_kind(
222 merged_schema_definition.event_kind().clone(),
223 merged_schema_definition.metadata_kind().clone(),
224 ),
225 };
226 let mut config = CompileConfig::default();
227
228 config.set_custom(enrichment_tables.clone());
229 config.set_custom(MeaningList::default());
230
231 let res = compile_vrl(&source, &functions, &state, config)
232 .map_err(|diagnostics| format_vrl_diagnostics(&source, diagnostics))
233 .map(|result| {
234 (
235 result.program,
236 format_vrl_diagnostics(&source, result.warnings),
237 result.config.get_custom::<MeaningList>().unwrap().clone(),
238 )
239 });
240
241 self.cache
242 .lock()
243 .expect("Data poisoned")
244 .push(((enrichment_tables, merged_schema_definition), res.clone()));
245
246 res.map_err(Into::into)
247 }
248
249 fn read_file(path: &PathBuf) -> Result<String> {
250 let mut buffer = String::new();
251 File::open(path)
252 .with_context(|_| FileOpenFailedSnafu { path })?
253 .read_to_string(&mut buffer)
254 .with_context(|_| FileReadFailedSnafu { path })?;
255 Ok(buffer)
256 }
257}
258
259impl_generate_config_from_default!(RemapConfig);
260
261#[async_trait::async_trait]
262#[typetag::serde(name = "remap")]
263impl TransformConfig for RemapConfig {
264 async fn build(&self, context: &TransformContext) -> Result<Transform> {
265 let (transform, warnings) = match self.runtime {
266 VrlRuntime::Ast => {
267 let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
268 (Transform::synchronous(remap), warnings)
269 }
270 };
271
272 if !warnings.is_empty() {
277 warn!(message = "VRL compilation warning.", %warnings);
278 }
279
280 Ok(transform)
281 }
282
283 fn input(&self) -> Input {
284 Input::all()
285 }
286
287 fn outputs(
288 &self,
289 enrichment_tables: vector_lib::enrichment::TableRegistry,
290 input_definitions: &[(OutputId, schema::Definition)],
291 _: LogNamespace,
292 ) -> Vec<TransformOutput> {
293 let merged_definition: Definition = input_definitions
294 .iter()
295 .map(|(_output, definition)| definition.clone())
296 .reduce(Definition::merge)
297 .unwrap_or_else(Definition::any);
298
299 let compiled = self
303 .compile_vrl_program(enrichment_tables, merged_definition)
304 .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
305 .map_err(|_| ());
306
307 let mut dropped_definitions = HashMap::new();
308 let mut default_definitions = HashMap::new();
309
310 for (output_id, input_definition) in input_definitions {
311 let default_definition = compiled
312 .clone()
313 .map(|(state, meaning)| {
314 let mut new_type_def = Definition::new(
315 state.external.target_kind().clone(),
316 state.external.metadata_kind().clone(),
317 input_definition.log_namespaces().clone(),
318 );
319
320 for (id, path) in input_definition.meanings() {
321 let _ = new_type_def.try_with_meaning(path.clone(), id);
325 }
326
327 for (id, path) in meaning {
329 new_type_def = new_type_def.with_meaning(path, &id);
331 }
332 new_type_def
333 })
334 .unwrap_or_else(|_| {
335 Definition::new_with_default_metadata(
336 Kind::never(),
338 input_definition.log_namespaces().clone(),
339 )
340 });
341
342 let dropped_definition = Definition::combine_log_namespaces(
345 input_definition.log_namespaces(),
346 input_definition.clone().with_event_field(
347 log_schema().metadata_key().expect("valid metadata key"),
348 Kind::object(BTreeMap::from([
349 ("reason".into(), Kind::bytes()),
350 ("message".into(), Kind::bytes()),
351 ("component_id".into(), Kind::bytes()),
352 ("component_type".into(), Kind::bytes()),
353 ("component_kind".into(), Kind::bytes()),
354 ])),
355 Some("metadata"),
356 ),
357 input_definition
358 .clone()
359 .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
360 .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
361 .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
362 .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
363 .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
364 );
365
366 default_definitions.insert(
367 output_id.clone(),
368 VrlTarget::modify_schema_definition_for_into_events(default_definition),
369 );
370 dropped_definitions.insert(
371 output_id.clone(),
372 VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
373 );
374 }
375
376 let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
377
378 if self.reroute_dropped {
379 vec![
380 default_output,
381 TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
382 ]
383 } else {
384 vec![default_output]
385 }
386 }
387
388 fn enable_concurrency(&self) -> bool {
389 true
390 }
391
392 fn files_to_watch(&self) -> Vec<&PathBuf> {
393 self.file
394 .iter()
395 .chain(self.files.iter().flatten())
396 .collect()
397 }
398}
399
400#[derive(Debug, Clone)]
401pub struct Remap<Runner>
402where
403 Runner: VrlRunner,
404{
405 component_key: Option<ComponentKey>,
406 program: Program,
407 timezone: TimeZone,
408 drop_on_error: bool,
409 drop_on_abort: bool,
410 reroute_dropped: bool,
411 runner: Runner,
412 metric_tag_values: MetricTagValues,
413}
414
415pub trait VrlRunner {
416 fn run(
417 &mut self,
418 target: &mut VrlTarget,
419 program: &Program,
420 timezone: &TimeZone,
421 ) -> std::result::Result<Value, Terminate>;
422}
423
424#[derive(Debug)]
425pub struct AstRunner {
426 pub runtime: Runtime,
427}
428
429impl Clone for AstRunner {
430 fn clone(&self) -> Self {
431 Self {
432 runtime: Runtime::default(),
433 }
434 }
435}
436
437impl VrlRunner for AstRunner {
438 fn run(
439 &mut self,
440 target: &mut VrlTarget,
441 program: &Program,
442 timezone: &TimeZone,
443 ) -> std::result::Result<Value, Terminate> {
444 let result = self.runtime.resolve(target, program, timezone);
445 self.runtime.clear();
446 result
447 }
448}
449
450impl Remap<AstRunner> {
451 pub fn new_ast(
452 config: RemapConfig,
453 context: &TransformContext,
454 ) -> crate::Result<(Self, String)> {
455 let (program, warnings, _) = config.compile_vrl_program(
456 context.enrichment_tables.clone(),
457 context.merged_schema_definition.clone(),
458 )?;
459
460 let runtime = Runtime::default();
461 let runner = AstRunner { runtime };
462
463 Self::new(config, context, program, runner).map(|remap| (remap, warnings))
464 }
465}
466
467impl<Runner> Remap<Runner>
468where
469 Runner: VrlRunner,
470{
471 fn new(
472 config: RemapConfig,
473 context: &TransformContext,
474 program: Program,
475 runner: Runner,
476 ) -> crate::Result<Self> {
477 Ok(Remap {
478 component_key: context.key.clone(),
479 program,
480 timezone: config
481 .timezone
482 .unwrap_or_else(|| context.globals.timezone()),
483 drop_on_error: config.drop_on_error,
484 drop_on_abort: config.drop_on_abort,
485 reroute_dropped: config.reroute_dropped,
486 runner,
487 metric_tag_values: config.metric_tag_values,
488 })
489 }
490
491 #[cfg(test)]
492 const fn runner(&self) -> &Runner {
493 &self.runner
494 }
495
496 fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
497 let message = error
498 .notes()
499 .iter()
500 .filter(|note| matches!(note, Note::UserErrorMessage(_)))
501 .next_back()
502 .map(|note| note.to_string())
503 .unwrap_or_else(|| error.to_string());
504 serde_json::json!({
505 "reason": reason,
506 "message": message,
507 "component_id": self.component_key,
508 "component_type": "remap",
509 "component_kind": "transform",
510 })
511 }
512
513 fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
514 match event {
515 Event::Log(log) => match log.namespace() {
516 LogNamespace::Legacy => {
517 if let Some(metadata_key) = log_schema().metadata_key() {
518 log.insert(
519 (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
520 self.dropped_data(reason, error),
521 );
522 }
523 }
524 LogNamespace::Vector => {
525 log.insert(
526 metadata_path!("vector", "dropped"),
527 self.dropped_data(reason, error),
528 );
529 }
530 },
531 Event::Metric(metric) => {
532 if let Some(metadata_key) = log_schema().metadata_key() {
533 metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
534 metric.replace_tag(
535 format!("{metadata_key}.dropped.component_id"),
536 self.component_key
537 .as_ref()
538 .map(ToString::to_string)
539 .unwrap_or_default(),
540 );
541 metric.replace_tag(
542 format!("{metadata_key}.dropped.component_type"),
543 "remap".into(),
544 );
545 metric.replace_tag(
546 format!("{metadata_key}.dropped.component_kind"),
547 "transform".into(),
548 );
549 }
550 }
551 Event::Trace(trace) => {
552 trace.maybe_insert(log_schema().metadata_key_target_path(), || {
553 self.dropped_data(reason, error).into()
554 });
555 }
556 }
557 }
558
559 fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
560 self.runner.run(target, &self.program, &self.timezone)
561 }
562}
563
564impl<Runner> SyncTransform for Remap<Runner>
565where
566 Runner: VrlRunner + Clone + Send + Sync,
567{
568 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
569 let forward_on_error = !self.drop_on_error || self.reroute_dropped;
580 let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
581 let original_event = if (self.program.info().fallible && forward_on_error)
582 || (self.program.info().abortable && forward_on_abort)
583 {
584 Some(event.clone())
585 } else {
586 None
587 };
588
589 let log_namespace = event
590 .maybe_as_log()
591 .map(|log| log.namespace())
592 .unwrap_or(LogNamespace::Legacy);
593
594 let mut target = VrlTarget::new(
595 event,
596 self.program.info(),
597 match self.metric_tag_values {
598 MetricTagValues::Single => false,
599 MetricTagValues::Full => true,
600 },
601 );
602 let result = self.run_vrl(&mut target);
603
604 match result {
605 Ok(_) => match target.into_events(log_namespace) {
606 TargetEvents::One(event) => push_default(event, output),
607 TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
608 TargetEvents::Traces(events) => {
609 events.for_each(|event| push_default(event, output))
610 }
611 },
612 Err(reason) => {
613 let (reason, error, drop) = match reason {
614 Terminate::Abort(error) => {
615 if !self.reroute_dropped {
616 emit!(RemapMappingAbort {
617 event_dropped: self.drop_on_abort,
618 });
619 }
620 ("abort", error, self.drop_on_abort)
621 }
622 Terminate::Error(error) => {
623 if !self.reroute_dropped {
624 emit!(RemapMappingError {
625 error: error.to_string(),
626 event_dropped: self.drop_on_error,
627 });
628 }
629 ("error", error, self.drop_on_error)
630 }
631 };
632
633 if !drop {
634 let event = original_event.expect("event will be set");
635
636 push_default(event, output);
637 } else if self.reroute_dropped {
638 let mut event = original_event.expect("event will be set");
639
640 self.annotate_dropped(&mut event, reason, error);
641 push_dropped(event, output);
642 }
643 }
644 }
645 }
646}
647
648#[inline]
649fn push_default(event: Event, output: &mut TransformOutputsBuf) {
650 output.push(None, event)
651}
652
653#[inline]
654fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
655 output.push(Some(DROPPED), event);
656}
657
658#[derive(Debug, Snafu)]
659pub enum BuildError {
660 #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
661 SourceAndOrFileOrFiles,
662
663 #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
664 FileOpenFailed { path: PathBuf, source: io::Error },
665 #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
666 FileReadFailed { path: PathBuf, source: io::Error },
667}
668
669#[cfg(test)]
670mod tests {
671 use std::{
672 collections::{HashMap, HashSet},
673 sync::Arc,
674 };
675
676 use chrono::DateTime;
677 use indoc::{formatdoc, indoc};
678 use tokio::sync::mpsc;
679 use tokio_stream::wrappers::ReceiverStream;
680 use vector_lib::{
681 config::GlobalOptions, enrichment::TableRegistry, event::EventMetadata, metric_tags,
682 };
683 use vrl::{btreemap, event_path, value::kind::Collection};
684
685 use super::*;
686 use crate::{
687 config::{ConfigBuilder, build_unit_tests},
688 event::{
689 LogEvent, Metric, Value,
690 metric::{MetricKind, MetricValue},
691 },
692 metrics::Controller,
693 schema,
694 test_util::components::{
695 COMPONENT_MULTIPLE_OUTPUTS_TESTS, assert_transform_compliance, init_test,
696 },
697 transforms::{OutputBuffer, test::create_topology},
698 };
699
700 fn test_default_schema_definition() -> schema::Definition {
701 schema::Definition::empty_legacy_namespace().with_event_field(
702 &owned_value_path!("a default field"),
703 Kind::integer().or_bytes(),
704 Some("default"),
705 )
706 }
707
708 fn test_dropped_schema_definition() -> schema::Definition {
709 schema::Definition::empty_legacy_namespace().with_event_field(
710 &owned_value_path!("a dropped field"),
711 Kind::boolean().or_null(),
712 Some("dropped"),
713 )
714 }
715
716 fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
717 let schema_definitions = HashMap::from([
718 (
719 None,
720 [("source".into(), test_default_schema_definition())].into(),
721 ),
722 (
723 Some(DROPPED.to_owned()),
724 [("source".into(), test_dropped_schema_definition())].into(),
725 ),
726 ]);
727
728 Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
729 .map(|(remap, _)| remap)
730 }
731
732 #[test]
733 fn generate_config() {
734 crate::test_util::test_generate_config::<RemapConfig>();
735 }
736
737 #[test]
738 fn config_missing_source_and_file() {
739 let config = RemapConfig {
740 source: None,
741 file: None,
742 ..Default::default()
743 };
744
745 let err = remap(config).unwrap_err().to_string();
746 assert_eq!(
747 &err,
748 "must provide exactly one of `source` or `file` or `files` configuration"
749 )
750 }
751
752 #[test]
753 fn config_both_source_and_file() {
754 let config = RemapConfig {
755 source: Some("".to_owned()),
756 file: Some("".into()),
757 ..Default::default()
758 };
759
760 let err = remap(config).unwrap_err().to_string();
761 assert_eq!(
762 &err,
763 "must provide exactly one of `source` or `file` or `files` configuration"
764 )
765 }
766
767 fn get_field_string(event: &Event, field: &str) -> String {
768 event
769 .as_log()
770 .get(field)
771 .unwrap()
772 .to_string_lossy()
773 .into_owned()
774 }
775
776 #[test]
777 fn check_remap_doesnt_share_state_between_events() {
778 let conf = RemapConfig {
779 source: Some(".foo = .sentinel".to_string()),
780 file: None,
781 drop_on_error: true,
782 drop_on_abort: false,
783 ..Default::default()
784 };
785 let mut tform = remap(conf).unwrap();
786 assert!(tform.runner().runtime.is_empty());
787
788 let event1 = {
789 let mut event1 = LogEvent::from("event1");
790 event1.insert("sentinel", "bar");
791 Event::from(event1)
792 };
793 let result1 = transform_one(&mut tform, event1).unwrap();
794 assert_eq!(get_field_string(&result1, "message"), "event1");
795 assert_eq!(get_field_string(&result1, "foo"), "bar");
796 assert!(tform.runner().runtime.is_empty());
797
798 let event2 = {
799 let event2 = LogEvent::from("event2");
800 Event::from(event2)
801 };
802 let result2 = transform_one(&mut tform, event2).unwrap();
803 assert_eq!(get_field_string(&result2, "message"), "event2");
804 assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
805 assert!(tform.runner().runtime.is_empty());
806 }
807
808 #[test]
809 fn remap_return_raw_string_vector_namespace() {
810 let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
811
812 let event = {
813 let mut metadata = EventMetadata::default()
814 .with_schema_definition(&Arc::new(initial_definition.clone()));
815 metadata
817 .value_mut()
818 .insert(&owned_value_path!("vector"), BTreeMap::new());
819
820 let mut event = LogEvent::new_with_metadata(metadata);
821 event.insert("copy_from", "buz");
822 Event::from(event)
823 };
824
825 let conf = RemapConfig {
826 source: Some(r#" . = "root string";"#.to_string()),
827 file: None,
828 drop_on_error: true,
829 drop_on_abort: false,
830 ..Default::default()
831 };
832 let mut tform = remap(conf.clone()).unwrap();
833 let result = transform_one(&mut tform, event).unwrap();
834 assert_eq!(get_field_string(&result, "."), "root string");
835
836 let mut outputs = conf.outputs(
837 TableRegistry::default(),
838 &[(OutputId::dummy(), initial_definition)],
839 LogNamespace::Vector,
840 );
841
842 assert_eq!(outputs.len(), 1);
843 let output = outputs.pop().unwrap();
844 assert_eq!(output.port, None);
845 let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
846 let expected_schema =
847 Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
848 assert_eq!(actual_schema_def, expected_schema);
849 }
850
851 #[test]
852 fn check_remap_adds() {
853 let event = {
854 let mut event = LogEvent::from("augment me");
855 event.insert("copy_from", "buz");
856 Event::from(event)
857 };
858
859 let conf = RemapConfig {
860 source: Some(
861 r#" .foo = "bar"
862 .bar = "baz"
863 .copy = .copy_from
864"#
865 .to_string(),
866 ),
867 file: None,
868 drop_on_error: true,
869 drop_on_abort: false,
870 ..Default::default()
871 };
872 let mut tform = remap(conf).unwrap();
873 let result = transform_one(&mut tform, event).unwrap();
874 assert_eq!(get_field_string(&result, "message"), "augment me");
875 assert_eq!(get_field_string(&result, "copy_from"), "buz");
876 assert_eq!(get_field_string(&result, "foo"), "bar");
877 assert_eq!(get_field_string(&result, "bar"), "baz");
878 assert_eq!(get_field_string(&result, "copy"), "buz");
879 }
880
881 #[test]
882 fn check_remap_emits_multiple() {
883 let event = {
884 let mut event = LogEvent::from("augment me");
885 event.insert(
886 "events",
887 vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
888 );
889 Event::from(event)
890 };
891
892 let conf = RemapConfig {
893 source: Some(
894 indoc! {r"
895 . = .events
896 "}
897 .to_owned(),
898 ),
899 file: None,
900 drop_on_error: true,
901 drop_on_abort: false,
902 ..Default::default()
903 };
904 let mut tform = remap(conf).unwrap();
905
906 let out = collect_outputs(&mut tform, event);
907 assert_eq!(2, out.primary.len());
908 let mut result = out.primary.into_events();
909
910 let r = result.next().unwrap();
911 assert_eq!(get_field_string(&r, "message"), "foo");
912 let r = result.next().unwrap();
913 assert_eq!(get_field_string(&r, "message"), "bar");
914 }
915
916 #[test]
917 fn check_remap_error() {
918 let event = {
919 let mut event = Event::Log(LogEvent::from("augment me"));
920 event.as_mut_log().insert("bar", "is a string");
921 event
922 };
923
924 let conf = RemapConfig {
925 source: Some(formatdoc! {r#"
926 .foo = "foo"
927 .not_an_int = int!(.bar)
928 .baz = 12
929 "#}),
930 file: None,
931 drop_on_error: false,
932 drop_on_abort: false,
933 ..Default::default()
934 };
935 let mut tform = remap(conf).unwrap();
936
937 let event = transform_one(&mut tform, event).unwrap();
938
939 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
940 assert!(event.as_log().get("foo").is_none());
941 assert!(event.as_log().get("baz").is_none());
942 }
943
944 #[test]
945 fn check_remap_error_drop() {
946 let event = {
947 let mut event = Event::Log(LogEvent::from("augment me"));
948 event.as_mut_log().insert("bar", "is a string");
949 event
950 };
951
952 let conf = RemapConfig {
953 source: Some(formatdoc! {r#"
954 .foo = "foo"
955 .not_an_int = int!(.bar)
956 .baz = 12
957 "#}),
958 file: None,
959 drop_on_error: true,
960 drop_on_abort: false,
961 ..Default::default()
962 };
963 let mut tform = remap(conf).unwrap();
964
965 assert!(transform_one(&mut tform, event).is_none())
966 }
967
968 #[test]
969 fn check_remap_error_infallible() {
970 let event = {
971 let mut event = Event::Log(LogEvent::from("augment me"));
972 event.as_mut_log().insert("bar", "is a string");
973 event
974 };
975
976 let conf = RemapConfig {
977 source: Some(formatdoc! {r#"
978 .foo = "foo"
979 .baz = 12
980 "#}),
981 file: None,
982 drop_on_error: false,
983 drop_on_abort: false,
984 ..Default::default()
985 };
986 let mut tform = remap(conf).unwrap();
987
988 let event = transform_one(&mut tform, event).unwrap();
989
990 assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
991 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
992 assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
993 }
994
995 #[test]
996 fn check_remap_abort() {
997 let event = {
998 let mut event = Event::Log(LogEvent::from("augment me"));
999 event.as_mut_log().insert("bar", "is a string");
1000 event
1001 };
1002
1003 let conf = RemapConfig {
1004 source: Some(formatdoc! {r#"
1005 .foo = "foo"
1006 abort
1007 .baz = 12
1008 "#}),
1009 file: None,
1010 drop_on_error: false,
1011 drop_on_abort: false,
1012 ..Default::default()
1013 };
1014 let mut tform = remap(conf).unwrap();
1015
1016 let event = transform_one(&mut tform, event).unwrap();
1017
1018 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1019 assert!(event.as_log().get("foo").is_none());
1020 assert!(event.as_log().get("baz").is_none());
1021 }
1022
1023 #[test]
1024 fn check_remap_abort_drop() {
1025 let event = {
1026 let mut event = Event::Log(LogEvent::from("augment me"));
1027 event.as_mut_log().insert("bar", "is a string");
1028 event
1029 };
1030
1031 let conf = RemapConfig {
1032 source: Some(formatdoc! {r#"
1033 .foo = "foo"
1034 abort
1035 .baz = 12
1036 "#}),
1037 file: None,
1038 drop_on_error: false,
1039 drop_on_abort: true,
1040 ..Default::default()
1041 };
1042 let mut tform = remap(conf).unwrap();
1043
1044 assert!(transform_one(&mut tform, event).is_none())
1045 }
1046
1047 #[test]
1048 fn check_remap_metric() {
1049 let metric = Event::Metric(Metric::new(
1050 "counter",
1051 MetricKind::Absolute,
1052 MetricValue::Counter { value: 1.0 },
1053 ));
1054 let metadata = metric.metadata().clone();
1055
1056 let conf = RemapConfig {
1057 source: Some(
1058 r#".tags.host = "zoobub"
1059 .name = "zork"
1060 .namespace = "zerk"
1061 .kind = "incremental""#
1062 .to_string(),
1063 ),
1064 file: None,
1065 drop_on_error: true,
1066 drop_on_abort: false,
1067 ..Default::default()
1068 };
1069 let mut tform = remap(conf).unwrap();
1070
1071 let result = transform_one(&mut tform, metric).unwrap();
1072 assert_eq!(
1073 result,
1074 Event::Metric(
1075 Metric::new_with_metadata(
1076 "zork",
1077 MetricKind::Incremental,
1078 MetricValue::Counter { value: 1.0 },
1079 metadata
1082 )
1083 .with_namespace(Some("zerk"))
1084 .with_tags(Some(metric_tags! {
1085 "host" => "zoobub",
1086 }))
1087 )
1088 );
1089 }
1090
1091 #[test]
1092 fn remap_timezone_fallback() {
1093 let error = Event::from_json_value(
1094 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1095 LogNamespace::Legacy,
1096 )
1097 .unwrap();
1098 let conf = RemapConfig {
1099 source: Some(formatdoc! {r#"
1100 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1101 "#}),
1102 drop_on_error: true,
1103 drop_on_abort: true,
1104 reroute_dropped: true,
1105 ..Default::default()
1106 };
1107 let context = TransformContext {
1108 key: Some(ComponentKey::from("remapper")),
1109 globals: GlobalOptions {
1110 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1111 ..Default::default()
1112 },
1113 ..Default::default()
1114 };
1115 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1116
1117 let output = transform_one_fallible(&mut tform, error).unwrap();
1118 let log = output.as_log();
1119 assert_eq!(
1120 log["timestamp"],
1121 DateTime::<chrono::Utc>::from(
1122 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1123 )
1124 .into()
1125 );
1126 }
1127
1128 #[test]
1129 fn remap_timezone_override() {
1130 let error = Event::from_json_value(
1131 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1132 LogNamespace::Legacy,
1133 )
1134 .unwrap();
1135 let conf = RemapConfig {
1136 source: Some(formatdoc! {r#"
1137 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1138 "#}),
1139 drop_on_error: true,
1140 drop_on_abort: true,
1141 reroute_dropped: true,
1142 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1143 ..Default::default()
1144 };
1145 let context = TransformContext {
1146 key: Some(ComponentKey::from("remapper")),
1147 globals: GlobalOptions {
1148 timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1149 ..Default::default()
1150 },
1151 ..Default::default()
1152 };
1153 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1154
1155 let output = transform_one_fallible(&mut tform, error).unwrap();
1156 let log = output.as_log();
1157 assert_eq!(
1158 log["timestamp"],
1159 DateTime::<chrono::Utc>::from(
1160 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1161 )
1162 .into()
1163 );
1164 }
1165
1166 #[test]
1167 fn check_remap_branching() {
1168 let happy =
1169 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1170 .unwrap();
1171 let abort = Event::from_json_value(
1172 serde_json::json!({"hello": "goodbye"}),
1173 LogNamespace::Legacy,
1174 )
1175 .unwrap();
1176 let error =
1177 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1178
1179 let happy_metric = {
1180 let mut metric = Metric::new(
1181 "counter",
1182 MetricKind::Absolute,
1183 MetricValue::Counter { value: 1.0 },
1184 );
1185 metric.replace_tag("hello".into(), "world".into());
1186 Event::Metric(metric)
1187 };
1188
1189 let abort_metric = {
1190 let mut metric = Metric::new(
1191 "counter",
1192 MetricKind::Absolute,
1193 MetricValue::Counter { value: 1.0 },
1194 );
1195 metric.replace_tag("hello".into(), "goodbye".into());
1196 Event::Metric(metric)
1197 };
1198
1199 let error_metric = {
1200 let mut metric = Metric::new(
1201 "counter",
1202 MetricKind::Absolute,
1203 MetricValue::Counter { value: 1.0 },
1204 );
1205 metric.replace_tag("not_hello".into(), "oops".into());
1206 Event::Metric(metric)
1207 };
1208
1209 let conf = RemapConfig {
1210 source: Some(formatdoc! {r#"
1211 if exists(.tags) {{
1212 # metrics
1213 .tags.foo = "bar"
1214 if string!(.tags.hello) == "goodbye" {{
1215 abort
1216 }}
1217 }} else {{
1218 # logs
1219 .foo = "bar"
1220 if string(.hello) == "goodbye" {{
1221 abort
1222 }}
1223 }}
1224 "#}),
1225 drop_on_error: true,
1226 drop_on_abort: true,
1227 reroute_dropped: true,
1228 ..Default::default()
1229 };
1230 let schema_definitions = HashMap::from([
1231 (
1232 None,
1233 [("source".into(), test_default_schema_definition())].into(),
1234 ),
1235 (
1236 Some(DROPPED.to_owned()),
1237 [("source".into(), test_dropped_schema_definition())].into(),
1238 ),
1239 ]);
1240 let context = TransformContext {
1241 key: Some(ComponentKey::from("remapper")),
1242 schema_definitions,
1243 merged_schema_definition: schema::Definition::new_with_default_metadata(
1244 Kind::any_object(),
1245 [LogNamespace::Legacy],
1246 )
1247 .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1248 ..Default::default()
1249 };
1250 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1251
1252 let output = transform_one_fallible(&mut tform, happy).unwrap();
1253 let log = output.as_log();
1254 assert_eq!(log["hello"], "world".into());
1255 assert_eq!(log["foo"], "bar".into());
1256 assert!(!log.contains(event_path!("metadata")));
1257
1258 let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1259 let log = output.as_log();
1260 assert_eq!(log["hello"], "goodbye".into());
1261 assert!(!log.contains(event_path!("foo")));
1262 assert_eq!(
1263 log["metadata"],
1264 serde_json::json!({
1265 "dropped": {
1266 "reason": "abort",
1267 "message": "aborted",
1268 "component_id": "remapper",
1269 "component_type": "remap",
1270 "component_kind": "transform",
1271 }
1272 })
1273 .try_into()
1274 .unwrap()
1275 );
1276
1277 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1278 let log = output.as_log();
1279 assert_eq!(log["hello"], 42.into());
1280 assert!(!log.contains(event_path!("foo")));
1281 assert_eq!(
1282 log["metadata"],
1283 serde_json::json!({
1284 "dropped": {
1285 "reason": "error",
1286 "message": "function call error for \"string\" at (160:174): expected string, got integer",
1287 "component_id": "remapper",
1288 "component_type": "remap",
1289 "component_kind": "transform",
1290 }
1291 })
1292 .try_into()
1293 .unwrap()
1294 );
1295
1296 let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1297 similar_asserts::assert_eq!(
1298 output,
1299 Event::Metric(
1300 Metric::new_with_metadata(
1301 "counter",
1302 MetricKind::Absolute,
1303 MetricValue::Counter { value: 1.0 },
1304 EventMetadata::default()
1307 .with_schema_definition(output.metadata().schema_definition()),
1308 )
1309 .with_tags(Some(metric_tags! {
1310 "hello" => "world",
1311 "foo" => "bar",
1312 }))
1313 )
1314 );
1315
1316 let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1317 similar_asserts::assert_eq!(
1318 output,
1319 Event::Metric(
1320 Metric::new_with_metadata(
1321 "counter",
1322 MetricKind::Absolute,
1323 MetricValue::Counter { value: 1.0 },
1324 EventMetadata::default()
1327 .with_schema_definition(output.metadata().schema_definition()),
1328 )
1329 .with_tags(Some(metric_tags! {
1330 "hello" => "goodbye",
1331 "metadata.dropped.reason" => "abort",
1332 "metadata.dropped.component_id" => "remapper",
1333 "metadata.dropped.component_type" => "remap",
1334 "metadata.dropped.component_kind" => "transform",
1335 }))
1336 )
1337 );
1338
1339 let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1340 similar_asserts::assert_eq!(
1341 output,
1342 Event::Metric(
1343 Metric::new_with_metadata(
1344 "counter",
1345 MetricKind::Absolute,
1346 MetricValue::Counter { value: 1.0 },
1347 EventMetadata::default()
1350 .with_schema_definition(output.metadata().schema_definition()),
1351 )
1352 .with_tags(Some(metric_tags! {
1353 "not_hello" => "oops",
1354 "metadata.dropped.reason" => "error",
1355 "metadata.dropped.component_id" => "remapper",
1356 "metadata.dropped.component_type" => "remap",
1357 "metadata.dropped.component_kind" => "transform",
1358 }))
1359 )
1360 );
1361 }
1362
1363 #[test]
1364 fn check_remap_branching_assert_with_message() {
1365 let error_trigger_assert_custom_message =
1366 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1367 let error_trigger_default_assert_message =
1368 Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1369 let conf = RemapConfig {
1370 source: Some(formatdoc! {r#"
1371 assert_eq!(.hello, 0, "custom message here")
1372 assert_eq!(.hello, 1)
1373 "#}),
1374 drop_on_error: true,
1375 drop_on_abort: true,
1376 reroute_dropped: true,
1377 ..Default::default()
1378 };
1379 let context = TransformContext {
1380 key: Some(ComponentKey::from("remapper")),
1381 ..Default::default()
1382 };
1383 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1384
1385 let output =
1386 transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1387 let log = output.as_log();
1388 assert_eq!(log["hello"], 42.into());
1389 assert!(!log.contains(event_path!("foo")));
1390 assert_eq!(
1391 log["metadata"],
1392 serde_json::json!({
1393 "dropped": {
1394 "reason": "error",
1395 "message": "custom message here",
1396 "component_id": "remapper",
1397 "component_type": "remap",
1398 "component_kind": "transform",
1399 }
1400 })
1401 .try_into()
1402 .unwrap()
1403 );
1404
1405 let output =
1406 transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1407 let log = output.as_log();
1408 assert_eq!(log["hello"], 0.into());
1409 assert!(!log.contains(event_path!("foo")));
1410 assert_eq!(
1411 log["metadata"],
1412 serde_json::json!({
1413 "dropped": {
1414 "reason": "error",
1415 "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1416 "component_id": "remapper",
1417 "component_type": "remap",
1418 "component_kind": "transform",
1419 }
1420 })
1421 .try_into()
1422 .unwrap()
1423 );
1424 }
1425
1426 #[test]
1427 fn check_remap_branching_abort_with_message() {
1428 let error =
1429 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1430 let conf = RemapConfig {
1431 source: Some(formatdoc! {r#"
1432 abort "custom message here"
1433 "#}),
1434 drop_on_error: true,
1435 drop_on_abort: true,
1436 reroute_dropped: true,
1437 ..Default::default()
1438 };
1439 let context = TransformContext {
1440 key: Some(ComponentKey::from("remapper")),
1441 ..Default::default()
1442 };
1443 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1444
1445 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1446 let log = output.as_log();
1447 assert_eq!(log["hello"], 42.into());
1448 assert!(!log.contains(event_path!("foo")));
1449 assert_eq!(
1450 log["metadata"],
1451 serde_json::json!({
1452 "dropped": {
1453 "reason": "abort",
1454 "message": "custom message here",
1455 "component_id": "remapper",
1456 "component_type": "remap",
1457 "component_kind": "transform",
1458 }
1459 })
1460 .try_into()
1461 .unwrap()
1462 );
1463 }
1464
1465 #[test]
1466 fn check_remap_branching_disabled() {
1467 let happy =
1468 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1469 .unwrap();
1470 let abort = Event::from_json_value(
1471 serde_json::json!({"hello": "goodbye"}),
1472 LogNamespace::Legacy,
1473 )
1474 .unwrap();
1475 let error =
1476 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1477
1478 let conf = RemapConfig {
1479 source: Some(formatdoc! {r#"
1480 if exists(.tags) {{
1481 # metrics
1482 .tags.foo = "bar"
1483 if string!(.tags.hello) == "goodbye" {{
1484 abort
1485 }}
1486 }} else {{
1487 # logs
1488 .foo = "bar"
1489 if string!(.hello) == "goodbye" {{
1490 abort
1491 }}
1492 }}
1493 "#}),
1494 drop_on_error: true,
1495 drop_on_abort: true,
1496 reroute_dropped: false,
1497 ..Default::default()
1498 };
1499
1500 let schema_definition = schema::Definition::new_with_default_metadata(
1501 Kind::any_object(),
1502 [LogNamespace::Legacy],
1503 )
1504 .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1505 .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1506
1507 assert_eq!(
1508 conf.outputs(
1509 vector_lib::enrichment::TableRegistry::default(),
1510 &[(
1511 "test".into(),
1512 schema::Definition::new_with_default_metadata(
1513 Kind::any_object(),
1514 [LogNamespace::Legacy]
1515 )
1516 )],
1517 LogNamespace::Legacy
1518 ),
1519 vec![TransformOutput::new(
1520 DataType::all_bits(),
1521 [("test".into(), schema_definition)].into()
1522 )]
1523 );
1524
1525 let context = TransformContext {
1526 key: Some(ComponentKey::from("remapper")),
1527 ..Default::default()
1528 };
1529 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1530
1531 let output = transform_one_fallible(&mut tform, happy).unwrap();
1532 let log = output.as_log();
1533 assert_eq!(log["hello"], "world".into());
1534 assert_eq!(log["foo"], "bar".into());
1535 assert!(!log.contains(event_path!("metadata")));
1536
1537 let out = collect_outputs(&mut tform, abort);
1538 assert!(out.primary.is_empty());
1539 assert!(out.named[DROPPED].is_empty());
1540
1541 let out = collect_outputs(&mut tform, error);
1542 assert!(out.primary.is_empty());
1543 assert!(out.named[DROPPED].is_empty());
1544 }
1545
1546 #[tokio::test]
1547 async fn check_remap_branching_metrics_with_output() {
1548 init_test();
1549
1550 let config: ConfigBuilder = toml::from_str(indoc! {r#"
1551 [transforms.foo]
1552 inputs = []
1553 type = "remap"
1554 drop_on_abort = true
1555 reroute_dropped = true
1556 source = "abort"
1557
1558 [[tests]]
1559 name = "metric output"
1560
1561 [tests.input]
1562 insert_at = "foo"
1563 value = "none"
1564
1565 [[tests.outputs]]
1566 extract_from = "foo.dropped"
1567 [[tests.outputs.conditions]]
1568 type = "vrl"
1569 source = "true"
1570 "#})
1571 .unwrap();
1572
1573 let mut tests = build_unit_tests(config).await.unwrap();
1574 assert!(tests.remove(0).run().await.errors.is_empty());
1575 COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1577 }
1578
1579 struct CollectedOuput {
1580 primary: OutputBuffer,
1581 named: HashMap<String, OutputBuffer>,
1582 }
1583
1584 fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput {
1585 let mut outputs = TransformOutputsBuf::new_with_capacity(
1586 vec![
1587 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1588 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1589 ],
1590 1,
1591 );
1592
1593 ft.transform(event, &mut outputs);
1594
1595 CollectedOuput {
1596 primary: outputs.take_primary(),
1597 named: outputs.take_all_named(),
1598 }
1599 }
1600
1601 fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1602 let out = collect_outputs(ft, event);
1603 assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1604 assert!(out.primary.len() <= 1);
1605 out.primary.into_events().next()
1606 }
1607
1608 fn transform_one_fallible(
1609 ft: &mut dyn SyncTransform,
1610 event: Event,
1611 ) -> std::result::Result<Event, Event> {
1612 let mut outputs = TransformOutputsBuf::new_with_capacity(
1613 vec![
1614 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1615 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1616 ],
1617 1,
1618 );
1619
1620 ft.transform(event, &mut outputs);
1621
1622 let mut buf = outputs.drain().collect::<Vec<_>>();
1623 let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1624
1625 assert!(buf.len() < 2);
1626 assert!(err_buf.len() < 2);
1627 match (buf.pop(), err_buf.pop()) {
1628 (Some(good), None) => Ok(good),
1629 (None, Some(bad)) => Err(bad),
1630 (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1631 }
1632 }
1633
1634 #[tokio::test]
1635 async fn emits_internal_events() {
1636 assert_transform_compliance(async move {
1637 let config = RemapConfig {
1638 source: Some("abort".to_owned()),
1639 drop_on_abort: true,
1640 ..Default::default()
1641 };
1642
1643 let (tx, rx) = mpsc::channel(1);
1644 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1645
1646 let log = LogEvent::from("hello world");
1647 tx.send(log.into()).await.unwrap();
1648
1649 drop(tx);
1650 topology.stop().await;
1651 assert_eq!(out.recv().await, None);
1652 })
1653 .await
1654 }
1655
1656 #[test]
1657 fn test_combined_transforms_simple() {
1658 let transform1 = RemapConfig {
1663 source: Some(r#".thing = "potato""#.to_string()),
1664 ..Default::default()
1665 };
1666
1667 let transform2 = RemapConfig {
1668 source: Some(".thang = .thing".to_string()),
1669 ..Default::default()
1670 };
1671
1672 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1673
1674 let outputs1 = transform1.outputs(
1675 enrichment_tables.clone(),
1676 &[("in".into(), schema::Definition::default_legacy_namespace())],
1677 LogNamespace::Legacy,
1678 );
1679
1680 assert_eq!(
1681 vec![TransformOutput::new(
1682 DataType::all_bits(),
1683 [(
1685 "in".into(),
1686 Definition::default_legacy_namespace().with_event_field(
1687 &owned_value_path!("thing"),
1688 Kind::bytes(),
1689 None
1690 ),
1691 )]
1692 .into()
1693 )],
1694 outputs1
1695 );
1696
1697 let outputs2 = transform2.outputs(
1698 enrichment_tables,
1699 &[(
1700 "in1".into(),
1701 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1702 )],
1703 LogNamespace::Legacy,
1704 );
1705
1706 assert_eq!(
1707 vec![TransformOutput::new(
1708 DataType::all_bits(),
1709 [(
1710 "in1".into(),
1711 Definition::default_legacy_namespace()
1712 .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1713 .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1714 )]
1715 .into(),
1716 )],
1717 outputs2
1718 );
1719 }
1720
1721 #[test]
1722 fn test_combined_transforms_unnest() {
1723 let transform1 = RemapConfig {
1728 source: Some(
1729 indoc! {
1730 r#"
1731 .thing = [{"cabbage": 32}, {"parsnips": 45}]
1732 . = unnest(.thing)
1733 "#
1734 }
1735 .to_string(),
1736 ),
1737 ..Default::default()
1738 };
1739
1740 let transform2 = RemapConfig {
1741 source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1742 ..Default::default()
1743 };
1744
1745 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1746
1747 let outputs1 = transform1.outputs(
1748 enrichment_tables.clone(),
1749 &[(
1750 "in".into(),
1751 schema::Definition::new_with_default_metadata(
1752 Kind::any_object(),
1753 [LogNamespace::Legacy],
1754 ),
1755 )],
1756 LogNamespace::Legacy,
1757 );
1758
1759 assert_eq!(
1760 vec![TransformOutput::new(
1761 DataType::all_bits(),
1762 [(
1763 "in".into(),
1764 Definition::new_with_default_metadata(
1765 Kind::any_object(),
1766 [LogNamespace::Legacy]
1767 )
1768 .with_event_field(
1769 &owned_value_path!("thing"),
1770 Kind::object(Collection::from(BTreeMap::from([
1771 ("cabbage".into(), Kind::integer().or_undefined(),),
1772 ("parsnips".into(), Kind::integer().or_undefined(),)
1773 ]))),
1774 None
1775 ),
1776 )]
1777 .into(),
1778 )],
1779 outputs1
1780 );
1781
1782 let outputs2 = transform2.outputs(
1783 enrichment_tables,
1784 &[(
1785 "in1".into(),
1786 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1787 )],
1788 LogNamespace::Legacy,
1789 );
1790
1791 assert_eq!(
1792 vec![TransformOutput::new(
1793 DataType::all_bits(),
1794 [(
1795 "in1".into(),
1796 Definition::default_legacy_namespace()
1797 .with_event_field(
1798 &owned_value_path!("thing"),
1799 Kind::object(Collection::from(BTreeMap::from([
1800 ("cabbage".into(), Kind::integer().or_undefined(),),
1801 ("parsnips".into(), Kind::integer().or_undefined(),)
1802 ]))),
1803 None
1804 )
1805 .with_event_field(
1806 &owned_value_path!("thang"),
1807 Kind::integer().or_null(),
1808 None
1809 ),
1810 )]
1811 .into(),
1812 )],
1813 outputs2
1814 );
1815 }
1816
1817 #[test]
1818 fn test_transform_abort() {
1819 let transform1 = RemapConfig {
1822 source: Some(r"abort".to_string()),
1823 ..Default::default()
1824 };
1825
1826 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1827
1828 let outputs1 = transform1.outputs(
1829 enrichment_tables,
1830 &[(
1831 "in".into(),
1832 schema::Definition::new_with_default_metadata(
1833 Kind::any_object(),
1834 [LogNamespace::Legacy],
1835 ),
1836 )],
1837 LogNamespace::Legacy,
1838 );
1839
1840 assert_eq!(
1841 vec![TransformOutput::new(
1842 DataType::all_bits(),
1843 [(
1844 "in".into(),
1845 Definition::new_with_default_metadata(
1846 Kind::any_object(),
1847 [LogNamespace::Legacy]
1848 ),
1849 )]
1850 .into(),
1851 )],
1852 outputs1
1853 );
1854 }
1855
1856 #[test]
1857 fn test_error_outputs() {
1858 let transform1 = RemapConfig {
1863 source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1865 reroute_dropped: true,
1866 ..Default::default()
1867 };
1868
1869 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1870
1871 let outputs1 = transform1.outputs(
1872 enrichment_tables,
1873 &[(
1874 "in".into(),
1875 schema::Definition::new_with_default_metadata(
1876 Kind::any_object(),
1877 [LogNamespace::Legacy],
1878 ),
1879 )],
1880 LogNamespace::Legacy,
1881 );
1882
1883 assert_eq!(
1884 HashSet::from([None, Some("dropped".to_string())]),
1885 outputs1
1886 .into_iter()
1887 .map(|output| output.port)
1888 .collect::<HashSet<_>>()
1889 );
1890 }
1891
1892 #[test]
1893 fn test_non_object_events() {
1894 let transform1 = RemapConfig {
1895 source: Some(r#". = "fish" "#.to_string()),
1897 ..Default::default()
1898 };
1899
1900 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1901
1902 let outputs1 = transform1.outputs(
1903 enrichment_tables,
1904 &[(
1905 "in".into(),
1906 schema::Definition::new_with_default_metadata(
1907 Kind::any_object(),
1908 [LogNamespace::Legacy],
1909 ),
1910 )],
1911 LogNamespace::Legacy,
1912 );
1913
1914 let wanted = schema::Definition::new_with_default_metadata(
1915 Kind::object(Collection::from_unknown(Kind::undefined())),
1916 [LogNamespace::Legacy],
1917 )
1918 .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1919
1920 assert_eq!(
1921 HashMap::from([(OutputId::from("in"), wanted)]),
1922 outputs1[0].schema_definitions(true),
1923 );
1924 }
1925
1926 #[test]
1927 fn test_array_and_non_object_events() {
1928 let transform1 = RemapConfig {
1929 source: Some(
1930 indoc! {r#"
1931 if .lizard == true {
1932 .thing = [{"cabbage": 42}];
1933 . = unnest(.thing)
1934 } else {
1935 . = "fish"
1936 }
1937 "#}
1938 .to_string(),
1939 ),
1940 ..Default::default()
1941 };
1942
1943 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1944
1945 let outputs1 = transform1.outputs(
1946 enrichment_tables,
1947 &[(
1948 "in".into(),
1949 schema::Definition::new_with_default_metadata(
1950 Kind::any_object(),
1951 [LogNamespace::Legacy],
1952 ),
1953 )],
1954 LogNamespace::Legacy,
1955 );
1956
1957 let wanted = schema::Definition::new_with_default_metadata(
1958 Kind::any_object(),
1959 [LogNamespace::Legacy],
1960 )
1961 .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1962 .with_event_field(
1963 &owned_value_path!("thing"),
1964 Kind::object(Collection::from(BTreeMap::from([(
1965 "cabbage".into(),
1966 Kind::integer(),
1967 )])))
1968 .or_undefined(),
1969 None,
1970 );
1971
1972 assert_eq!(
1973 HashMap::from([(OutputId::from("in"), wanted)]),
1974 outputs1[0].schema_definitions(true),
1975 );
1976 }
1977
1978 #[test]
1979 fn check_remap_array_vector_namespace() {
1980 let event = {
1981 let mut event = LogEvent::from("input");
1982 event
1984 .metadata_mut()
1985 .value_mut()
1986 .insert("vector", BTreeMap::new());
1987 Event::from(event)
1988 };
1989
1990 let conf = RemapConfig {
1991 source: Some(
1992 r". = [null]
1993"
1994 .to_string(),
1995 ),
1996 file: None,
1997 drop_on_error: true,
1998 drop_on_abort: false,
1999 ..Default::default()
2000 };
2001 let mut tform = remap(conf.clone()).unwrap();
2002 let result = transform_one(&mut tform, event).unwrap();
2003
2004 assert_eq!(result.as_log().get("."), Some(&Value::Null));
2006
2007 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
2008 let outputs1 = conf.outputs(
2009 enrichment_tables,
2010 &[(
2011 "in".into(),
2012 schema::Definition::new_with_default_metadata(
2013 Kind::any_object(),
2014 [LogNamespace::Vector],
2015 ),
2016 )],
2017 LogNamespace::Vector,
2018 );
2019
2020 let wanted =
2021 schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
2022
2023 assert_eq!(
2024 HashMap::from([(OutputId::from("in"), wanted)]),
2025 outputs1[0].schema_definitions(true),
2026 );
2027 }
2028
2029 fn assert_no_metrics(source: String) {
2030 vector_lib::metrics::init_test();
2031
2032 let config = RemapConfig {
2033 source: Some(source),
2034 drop_on_error: true,
2035 drop_on_abort: true,
2036 reroute_dropped: true,
2037 ..Default::default()
2038 };
2039 let mut ast_runner = remap(config).unwrap();
2040 let input_event =
2041 Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2042 let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2043 let dropped_log = dropped_event.as_log();
2044 assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2045
2046 let controller = Controller::get().expect("no controller");
2047 let metrics = controller
2048 .capture_metrics()
2049 .into_iter()
2050 .map(|metric| (metric.name().to_string(), metric))
2051 .collect::<BTreeMap<String, Metric>>();
2052 assert_eq!(metrics.get("component_discarded_events_total"), None);
2053 assert_eq!(metrics.get("component_errors_total"), None);
2054 }
2055 #[test]
2056 fn do_not_emit_metrics_when_dropped() {
2057 assert_no_metrics("abort".to_string());
2058 }
2059
2060 #[test]
2061 fn do_not_emit_metrics_when_errored() {
2062 assert_no_metrics("parse_key_value!(.message)".to_string());
2063 }
2064}