1use std::{
2 collections::{BTreeMap, HashMap},
3 fs::File,
4 io::{self, Read},
5 path::PathBuf,
6 sync::Mutex,
7};
8
9use snafu::{ResultExt, Snafu};
10use vector_lib::{
11 TimeZone,
12 codecs::MetricTagValues,
13 compile_vrl,
14 config::LogNamespace,
15 configurable::configurable_component,
16 enrichment::TableRegistry,
17 lookup::{PathPrefix, metadata_path, owned_value_path},
18 schema::Definition,
19};
20use vector_vrl_functions::set_semantic_meaning::MeaningList;
21use vector_vrl_metrics::MetricsStorage;
22use vrl::{
23 compiler::{
24 CompileConfig, ExpressionError, Program, TypeState, VrlRuntime,
25 runtime::{Runtime, Terminate},
26 state::ExternalEnv,
27 },
28 diagnostic::{DiagnosticMessage, Note},
29 path,
30 path::ValuePath,
31 value::{Kind, Value},
32};
33
34use crate::{
35 Result,
36 config::{
37 ComponentKey, DataType, Input, OutputId, TransformConfig, TransformContext,
38 TransformOutput, log_schema,
39 },
40 event::{Event, TargetEvents, VrlTarget},
41 format_vrl_diagnostics,
42 internal_events::{RemapMappingAbort, RemapMappingError},
43 schema,
44 transforms::{SyncTransform, Transform, TransformOutputsBuf},
45};
46
47const DROPPED: &str = "dropped";
48type CacheKey = (TableRegistry, schema::Definition);
49type CacheValue = (Program, String, MeaningList);
50
51#[configurable_component(transform(
53 "remap",
54 "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
55))]
56#[derive(Derivative)]
57#[serde(deny_unknown_fields)]
58#[derivative(Default, Debug)]
59pub struct RemapConfig {
60 #[configurable(metadata(
66 docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
67 docs::syntax_override = "remap_program"
68 ))]
69 pub source: Option<String>,
70
71 #[configurable(metadata(docs::examples = "./my/program.vrl"))]
79 pub file: Option<PathBuf>,
80
81 #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
89 pub files: Option<Vec<PathBuf>>,
90
91 #[serde(default)]
98 pub metric_tag_values: MetricTagValues,
99
100 #[serde(default)]
109 #[configurable(metadata(docs::advanced))]
110 pub timezone: Option<TimeZone>,
111
112 #[serde(default = "crate::serde::default_false")]
123 #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
124 pub drop_on_error: bool,
125
126 #[serde(default = "crate::serde::default_true")]
137 #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
138 pub drop_on_abort: bool,
139
140 #[serde(default = "crate::serde::default_false")]
150 #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
151 pub reroute_dropped: bool,
152
153 #[configurable(derived, metadata(docs::hidden))]
154 #[serde(default)]
155 pub runtime: VrlRuntime,
156
157 #[configurable(derived, metadata(docs::hidden))]
158 #[serde(skip)]
159 #[derivative(Debug = "ignore")]
160 pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
164}
165
166impl Clone for RemapConfig {
167 fn clone(&self) -> Self {
168 Self {
169 source: self.source.clone(),
170 file: self.file.clone(),
171 files: self.files.clone(),
172 metric_tag_values: self.metric_tag_values,
173 timezone: self.timezone,
174 drop_on_error: self.drop_on_error,
175 drop_on_abort: self.drop_on_abort,
176 reroute_dropped: self.reroute_dropped,
177 runtime: self.runtime,
178 cache: Mutex::new(Default::default()),
179 }
180 }
181}
182
183impl RemapConfig {
184 fn compile_vrl_program(
185 &self,
186 enrichment_tables: TableRegistry,
187 metrics_storage: MetricsStorage,
188 merged_schema_definition: schema::Definition,
189 ) -> Result<(Program, String, MeaningList)> {
190 if let Some((_, res)) = self
191 .cache
192 .lock()
193 .expect("Data poisoned")
194 .iter()
195 .find(|v| v.0.0 == enrichment_tables && v.0.1 == merged_schema_definition)
196 {
197 return res.clone().map_err(Into::into);
198 }
199
200 let source = match (&self.source, &self.file, &self.files) {
201 (Some(source), None, None) => source.to_owned(),
202 (None, Some(path), None) => Self::read_file(path)?,
203 (None, None, Some(paths)) => {
204 let mut combined_source = String::new();
205 for path in paths {
206 let content = Self::read_file(path)?;
207 combined_source.push_str(&content);
208 combined_source.push('\n');
209 }
210 combined_source
211 }
212 _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
213 };
214
215 let state = TypeState {
216 local: Default::default(),
217 external: ExternalEnv::new_with_kind(
218 merged_schema_definition.event_kind().clone(),
219 merged_schema_definition.metadata_kind().clone(),
220 ),
221 };
222 let mut config = CompileConfig::default();
223
224 config.set_custom(enrichment_tables.clone());
225 config.set_custom(metrics_storage);
226 config.set_custom(MeaningList::default());
227
228 let res = compile_vrl(&source, &vector_vrl_functions::all(), &state, config)
229 .map_err(|diagnostics| format_vrl_diagnostics(&source, diagnostics))
230 .map(|result| {
231 (
232 result.program,
233 format_vrl_diagnostics(&source, result.warnings),
234 result.config.get_custom::<MeaningList>().unwrap().clone(),
235 )
236 });
237
238 self.cache
239 .lock()
240 .expect("Data poisoned")
241 .push(((enrichment_tables, merged_schema_definition), res.clone()));
242
243 res.map_err(Into::into)
244 }
245
246 fn read_file(path: &PathBuf) -> Result<String> {
247 let mut buffer = String::new();
248 File::open(path)
249 .with_context(|_| FileOpenFailedSnafu { path })?
250 .read_to_string(&mut buffer)
251 .with_context(|_| FileReadFailedSnafu { path })?;
252 Ok(buffer)
253 }
254}
255
256impl_generate_config_from_default!(RemapConfig);
257
258#[async_trait::async_trait]
259#[typetag::serde(name = "remap")]
260impl TransformConfig for RemapConfig {
261 async fn build(&self, context: &TransformContext) -> Result<Transform> {
262 let (transform, warnings) = match self.runtime {
263 VrlRuntime::Ast => {
264 let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
265 (Transform::synchronous(remap), warnings)
266 }
267 };
268
269 if !warnings.is_empty() {
274 warn!(message = "VRL compilation warning.", %warnings);
275 }
276
277 Ok(transform)
278 }
279
280 fn input(&self) -> Input {
281 Input::all()
282 }
283
284 fn outputs(
285 &self,
286 context: &TransformContext,
287 input_definitions: &[(OutputId, schema::Definition)],
288 ) -> Vec<TransformOutput> {
289 let merged_definition: Definition = input_definitions
290 .iter()
291 .map(|(_output, definition)| definition.clone())
292 .reduce(Definition::merge)
293 .unwrap_or_else(Definition::any);
294
295 let compiled = self
299 .compile_vrl_program(
300 context.enrichment_tables.clone(),
301 context.metrics_storage.clone(),
302 merged_definition,
303 )
304 .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
305 .map_err(|_| ());
306
307 let mut dropped_definitions = HashMap::new();
308 let mut default_definitions = HashMap::new();
309
310 for (output_id, input_definition) in input_definitions {
311 let default_definition = compiled
312 .clone()
313 .map(|(state, meaning)| {
314 let mut new_type_def = Definition::new(
315 state.external.target_kind().clone(),
316 state.external.metadata_kind().clone(),
317 input_definition.log_namespaces().clone(),
318 );
319
320 for (id, path) in input_definition.meanings() {
321 let _ = new_type_def.try_with_meaning(path.clone(), id);
325 }
326
327 for (id, path) in meaning {
329 new_type_def = new_type_def.with_meaning(path, &id);
331 }
332 new_type_def
333 })
334 .unwrap_or_else(|_| {
335 Definition::new_with_default_metadata(
336 Kind::never(),
338 input_definition.log_namespaces().clone(),
339 )
340 });
341
342 let dropped_definition = Definition::combine_log_namespaces(
345 input_definition.log_namespaces(),
346 input_definition.clone().with_event_field(
347 log_schema().metadata_key().expect("valid metadata key"),
348 Kind::object(BTreeMap::from([
349 ("reason".into(), Kind::bytes()),
350 ("message".into(), Kind::bytes()),
351 ("component_id".into(), Kind::bytes()),
352 ("component_type".into(), Kind::bytes()),
353 ("component_kind".into(), Kind::bytes()),
354 ])),
355 Some("metadata"),
356 ),
357 input_definition
358 .clone()
359 .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
360 .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
361 .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
362 .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
363 .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
364 );
365
366 default_definitions.insert(
367 output_id.clone(),
368 VrlTarget::modify_schema_definition_for_into_events(default_definition),
369 );
370 dropped_definitions.insert(
371 output_id.clone(),
372 VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
373 );
374 }
375
376 let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
377
378 if self.reroute_dropped {
379 vec![
380 default_output,
381 TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
382 ]
383 } else {
384 vec![default_output]
385 }
386 }
387
388 fn enable_concurrency(&self) -> bool {
389 true
390 }
391
392 fn files_to_watch(&self) -> Vec<&PathBuf> {
393 self.file
394 .iter()
395 .chain(self.files.iter().flatten())
396 .collect()
397 }
398}
399
400#[derive(Debug, Clone)]
401pub struct Remap<Runner>
402where
403 Runner: VrlRunner,
404{
405 component_key: Option<ComponentKey>,
406 program: Program,
407 timezone: TimeZone,
408 drop_on_error: bool,
409 drop_on_abort: bool,
410 reroute_dropped: bool,
411 runner: Runner,
412 metric_tag_values: MetricTagValues,
413}
414
415pub trait VrlRunner {
416 fn run(
417 &mut self,
418 target: &mut VrlTarget,
419 program: &Program,
420 timezone: &TimeZone,
421 ) -> std::result::Result<Value, Terminate>;
422}
423
424#[derive(Debug)]
425pub struct AstRunner {
426 pub runtime: Runtime,
427}
428
429impl Clone for AstRunner {
430 fn clone(&self) -> Self {
431 Self {
432 runtime: Runtime::default(),
433 }
434 }
435}
436
437impl VrlRunner for AstRunner {
438 fn run(
439 &mut self,
440 target: &mut VrlTarget,
441 program: &Program,
442 timezone: &TimeZone,
443 ) -> std::result::Result<Value, Terminate> {
444 let result = self.runtime.resolve(target, program, timezone);
445 self.runtime.clear();
446 result
447 }
448}
449
450impl Remap<AstRunner> {
451 pub fn new_ast(
452 config: RemapConfig,
453 context: &TransformContext,
454 ) -> crate::Result<(Self, String)> {
455 let (program, warnings, _) = config.compile_vrl_program(
456 context.enrichment_tables.clone(),
457 context.metrics_storage.clone(),
458 context.merged_schema_definition.clone(),
459 )?;
460
461 let runtime = Runtime::default();
462 let runner = AstRunner { runtime };
463
464 Self::new(config, context, program, runner).map(|remap| (remap, warnings))
465 }
466}
467
468impl<Runner> Remap<Runner>
469where
470 Runner: VrlRunner,
471{
472 fn new(
473 config: RemapConfig,
474 context: &TransformContext,
475 program: Program,
476 runner: Runner,
477 ) -> crate::Result<Self> {
478 Ok(Remap {
479 component_key: context.key.clone(),
480 program,
481 timezone: config
482 .timezone
483 .unwrap_or_else(|| context.globals.timezone()),
484 drop_on_error: config.drop_on_error,
485 drop_on_abort: config.drop_on_abort,
486 reroute_dropped: config.reroute_dropped,
487 runner,
488 metric_tag_values: config.metric_tag_values,
489 })
490 }
491
492 #[cfg(test)]
493 const fn runner(&self) -> &Runner {
494 &self.runner
495 }
496
497 fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
498 let message = error
499 .notes()
500 .iter()
501 .rfind(|note| matches!(note, Note::UserErrorMessage(_)))
502 .map(|note| note.to_string())
503 .unwrap_or_else(|| error.to_string());
504 serde_json::json!({
505 "reason": reason,
506 "message": message,
507 "component_id": self.component_key,
508 "component_type": "remap",
509 "component_kind": "transform",
510 })
511 }
512
513 fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
514 match event {
515 Event::Log(log) => match log.namespace() {
516 LogNamespace::Legacy => {
517 if let Some(metadata_key) = log_schema().metadata_key() {
518 log.insert(
519 (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
520 self.dropped_data(reason, error),
521 );
522 }
523 }
524 LogNamespace::Vector => {
525 log.insert(
526 metadata_path!("vector", "dropped"),
527 self.dropped_data(reason, error),
528 );
529 }
530 },
531 Event::Metric(metric) => {
532 if let Some(metadata_key) = log_schema().metadata_key() {
533 metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
534 metric.replace_tag(
535 format!("{metadata_key}.dropped.component_id"),
536 self.component_key
537 .as_ref()
538 .map(ToString::to_string)
539 .unwrap_or_default(),
540 );
541 metric.replace_tag(
542 format!("{metadata_key}.dropped.component_type"),
543 "remap".into(),
544 );
545 metric.replace_tag(
546 format!("{metadata_key}.dropped.component_kind"),
547 "transform".into(),
548 );
549 }
550 }
551 Event::Trace(trace) => {
552 trace.maybe_insert(log_schema().metadata_key_target_path(), || {
553 self.dropped_data(reason, error).into()
554 });
555 }
556 }
557 }
558
559 fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
560 self.runner.run(target, &self.program, &self.timezone)
561 }
562}
563
564impl<Runner> SyncTransform for Remap<Runner>
565where
566 Runner: VrlRunner + Clone + Send + Sync,
567{
568 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
569 let forward_on_error = !self.drop_on_error || self.reroute_dropped;
580 let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
581 let original_event = if (self.program.info().fallible && forward_on_error)
582 || (self.program.info().abortable && forward_on_abort)
583 {
584 Some(event.clone())
585 } else {
586 None
587 };
588
589 let log_namespace = event
590 .maybe_as_log()
591 .map(|log| log.namespace())
592 .unwrap_or(LogNamespace::Legacy);
593
594 let mut target = VrlTarget::new(
595 event,
596 self.program.info(),
597 match self.metric_tag_values {
598 MetricTagValues::Single => false,
599 MetricTagValues::Full => true,
600 },
601 );
602 let result = self.run_vrl(&mut target);
603
604 match result {
605 Ok(_) => match target.into_events(log_namespace) {
606 TargetEvents::One(event) => push_default(event, output),
607 TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
608 TargetEvents::Traces(events) => {
609 events.for_each(|event| push_default(event, output))
610 }
611 },
612 Err(reason) => {
613 let (reason, error, drop) = match reason {
614 Terminate::Abort(error) => {
615 if !self.reroute_dropped {
616 emit!(RemapMappingAbort {
617 event_dropped: self.drop_on_abort,
618 });
619 }
620 ("abort", error, self.drop_on_abort)
621 }
622 Terminate::Error(error) => {
623 if !self.reroute_dropped {
624 emit!(RemapMappingError {
625 error: error.to_string(),
626 event_dropped: self.drop_on_error,
627 });
628 }
629 ("error", error, self.drop_on_error)
630 }
631 };
632
633 if !drop {
634 let event = original_event.expect("event will be set");
635
636 push_default(event, output);
637 } else if self.reroute_dropped {
638 let mut event = original_event.expect("event will be set");
639
640 self.annotate_dropped(&mut event, reason, error);
641 push_dropped(event, output);
642 }
643 }
644 }
645 }
646}
647
648#[inline]
649fn push_default(event: Event, output: &mut TransformOutputsBuf) {
650 output.push(None, event)
651}
652
653#[inline]
654fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
655 output.push(Some(DROPPED), event);
656}
657
658#[derive(Debug, Snafu)]
659pub enum BuildError {
660 #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
661 SourceAndOrFileOrFiles,
662
663 #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
664 FileOpenFailed { path: PathBuf, source: io::Error },
665 #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
666 FileReadFailed { path: PathBuf, source: io::Error },
667}
668
669#[cfg(test)]
670mod tests {
671 use std::{
672 collections::{HashMap, HashSet},
673 sync::Arc,
674 };
675
676 use chrono::DateTime;
677 use indoc::{formatdoc, indoc};
678 use tokio::sync::mpsc;
679 use tokio_stream::wrappers::ReceiverStream;
680 use vector_lib::{config::GlobalOptions, event::EventMetadata, metric_tags};
681 use vrl::{btreemap, event_path, value::kind::Collection};
682
683 use super::*;
684 use crate::{
685 config::{ConfigBuilder, build_unit_tests},
686 event::{
687 LogEvent, Metric, Value,
688 metric::{MetricKind, MetricValue},
689 },
690 metrics::Controller,
691 schema,
692 test_util::components::{
693 COMPONENT_MULTIPLE_OUTPUTS_TESTS, assert_transform_compliance, init_test,
694 },
695 transforms::{OutputBuffer, test::create_topology},
696 };
697
698 fn test_default_schema_definition() -> schema::Definition {
699 schema::Definition::empty_legacy_namespace().with_event_field(
700 &owned_value_path!("a default field"),
701 Kind::integer().or_bytes(),
702 Some("default"),
703 )
704 }
705
706 fn test_dropped_schema_definition() -> schema::Definition {
707 schema::Definition::empty_legacy_namespace().with_event_field(
708 &owned_value_path!("a dropped field"),
709 Kind::boolean().or_null(),
710 Some("dropped"),
711 )
712 }
713
714 fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
715 let schema_definitions = HashMap::from([
716 (
717 None,
718 [("source".into(), test_default_schema_definition())].into(),
719 ),
720 (
721 Some(DROPPED.to_owned()),
722 [("source".into(), test_dropped_schema_definition())].into(),
723 ),
724 ]);
725
726 Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
727 .map(|(remap, _)| remap)
728 }
729
730 #[test]
731 fn generate_config() {
732 crate::test_util::test_generate_config::<RemapConfig>();
733 }
734
735 #[test]
736 fn config_missing_source_and_file() {
737 let config = RemapConfig {
738 source: None,
739 file: None,
740 ..Default::default()
741 };
742
743 let err = remap(config).unwrap_err().to_string();
744 assert_eq!(
745 &err,
746 "must provide exactly one of `source` or `file` or `files` configuration"
747 )
748 }
749
750 #[test]
751 fn config_both_source_and_file() {
752 let config = RemapConfig {
753 source: Some("".to_owned()),
754 file: Some("".into()),
755 ..Default::default()
756 };
757
758 let err = remap(config).unwrap_err().to_string();
759 assert_eq!(
760 &err,
761 "must provide exactly one of `source` or `file` or `files` configuration"
762 )
763 }
764
765 fn get_field_string(event: &Event, field: &str) -> String {
766 event
767 .as_log()
768 .get(field)
769 .unwrap()
770 .to_string_lossy()
771 .into_owned()
772 }
773
774 #[test]
775 fn check_remap_doesnt_share_state_between_events() {
776 let conf = RemapConfig {
777 source: Some(".foo = .sentinel".to_string()),
778 file: None,
779 drop_on_error: true,
780 drop_on_abort: false,
781 ..Default::default()
782 };
783 let mut tform = remap(conf).unwrap();
784 assert!(tform.runner().runtime.is_empty());
785
786 let event1 = {
787 let mut event1 = LogEvent::from("event1");
788 event1.insert("sentinel", "bar");
789 Event::from(event1)
790 };
791 let result1 = transform_one(&mut tform, event1).unwrap();
792 assert_eq!(get_field_string(&result1, "message"), "event1");
793 assert_eq!(get_field_string(&result1, "foo"), "bar");
794 assert!(tform.runner().runtime.is_empty());
795
796 let event2 = {
797 let event2 = LogEvent::from("event2");
798 Event::from(event2)
799 };
800 let result2 = transform_one(&mut tform, event2).unwrap();
801 assert_eq!(get_field_string(&result2, "message"), "event2");
802 assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
803 assert!(tform.runner().runtime.is_empty());
804 }
805
806 #[test]
807 fn remap_return_raw_string_vector_namespace() {
808 let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
809
810 let event = {
811 let mut metadata = EventMetadata::default()
812 .with_schema_definition(&Arc::new(initial_definition.clone()));
813 metadata
815 .value_mut()
816 .insert(&owned_value_path!("vector"), BTreeMap::new());
817
818 let mut event = LogEvent::new_with_metadata(metadata);
819 event.insert("copy_from", "buz");
820 Event::from(event)
821 };
822
823 let conf = RemapConfig {
824 source: Some(r#" . = "root string";"#.to_string()),
825 file: None,
826 drop_on_error: true,
827 drop_on_abort: false,
828 ..Default::default()
829 };
830 let mut tform = remap(conf.clone()).unwrap();
831 let result = transform_one(&mut tform, event).unwrap();
832 assert_eq!(get_field_string(&result, "."), "root string");
833
834 let mut outputs = conf.outputs(
835 &Default::default(),
836 &[(OutputId::dummy(), initial_definition)],
837 );
838
839 assert_eq!(outputs.len(), 1);
840 let output = outputs.pop().unwrap();
841 assert_eq!(output.port, None);
842 let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
843 let expected_schema =
844 Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
845 assert_eq!(actual_schema_def, expected_schema);
846 }
847
848 #[test]
849 fn check_remap_adds() {
850 let event = {
851 let mut event = LogEvent::from("augment me");
852 event.insert("copy_from", "buz");
853 Event::from(event)
854 };
855
856 let conf = RemapConfig {
857 source: Some(
858 r#" .foo = "bar"
859 .bar = "baz"
860 .copy = .copy_from
861"#
862 .to_string(),
863 ),
864 file: None,
865 drop_on_error: true,
866 drop_on_abort: false,
867 ..Default::default()
868 };
869 let mut tform = remap(conf).unwrap();
870 let result = transform_one(&mut tform, event).unwrap();
871 assert_eq!(get_field_string(&result, "message"), "augment me");
872 assert_eq!(get_field_string(&result, "copy_from"), "buz");
873 assert_eq!(get_field_string(&result, "foo"), "bar");
874 assert_eq!(get_field_string(&result, "bar"), "baz");
875 assert_eq!(get_field_string(&result, "copy"), "buz");
876 }
877
878 #[test]
879 fn check_remap_emits_multiple() {
880 let event = {
881 let mut event = LogEvent::from("augment me");
882 event.insert(
883 "events",
884 vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
885 );
886 Event::from(event)
887 };
888
889 let conf = RemapConfig {
890 source: Some(
891 indoc! {r"
892 . = .events
893 "}
894 .to_owned(),
895 ),
896 file: None,
897 drop_on_error: true,
898 drop_on_abort: false,
899 ..Default::default()
900 };
901 let mut tform = remap(conf).unwrap();
902
903 let out = collect_outputs(&mut tform, event);
904 assert_eq!(2, out.primary.len());
905 let mut result = out.primary.into_events();
906
907 let r = result.next().unwrap();
908 assert_eq!(get_field_string(&r, "message"), "foo");
909 let r = result.next().unwrap();
910 assert_eq!(get_field_string(&r, "message"), "bar");
911 }
912
913 #[test]
914 fn check_remap_error() {
915 let event = {
916 let mut event = Event::Log(LogEvent::from("augment me"));
917 event.as_mut_log().insert("bar", "is a string");
918 event
919 };
920
921 let conf = RemapConfig {
922 source: Some(formatdoc! {r#"
923 .foo = "foo"
924 .not_an_int = int!(.bar)
925 .baz = 12
926 "#}),
927 file: None,
928 drop_on_error: false,
929 drop_on_abort: false,
930 ..Default::default()
931 };
932 let mut tform = remap(conf).unwrap();
933
934 let event = transform_one(&mut tform, event).unwrap();
935
936 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
937 assert!(event.as_log().get("foo").is_none());
938 assert!(event.as_log().get("baz").is_none());
939 }
940
941 #[test]
942 fn check_remap_error_drop() {
943 let event = {
944 let mut event = Event::Log(LogEvent::from("augment me"));
945 event.as_mut_log().insert("bar", "is a string");
946 event
947 };
948
949 let conf = RemapConfig {
950 source: Some(formatdoc! {r#"
951 .foo = "foo"
952 .not_an_int = int!(.bar)
953 .baz = 12
954 "#}),
955 file: None,
956 drop_on_error: true,
957 drop_on_abort: false,
958 ..Default::default()
959 };
960 let mut tform = remap(conf).unwrap();
961
962 assert!(transform_one(&mut tform, event).is_none())
963 }
964
965 #[test]
966 fn check_remap_error_infallible() {
967 let event = {
968 let mut event = Event::Log(LogEvent::from("augment me"));
969 event.as_mut_log().insert("bar", "is a string");
970 event
971 };
972
973 let conf = RemapConfig {
974 source: Some(formatdoc! {r#"
975 .foo = "foo"
976 .baz = 12
977 "#}),
978 file: None,
979 drop_on_error: false,
980 drop_on_abort: false,
981 ..Default::default()
982 };
983 let mut tform = remap(conf).unwrap();
984
985 let event = transform_one(&mut tform, event).unwrap();
986
987 assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
988 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
989 assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
990 }
991
992 #[test]
993 fn check_remap_abort() {
994 let event = {
995 let mut event = Event::Log(LogEvent::from("augment me"));
996 event.as_mut_log().insert("bar", "is a string");
997 event
998 };
999
1000 let conf = RemapConfig {
1001 source: Some(formatdoc! {r#"
1002 .foo = "foo"
1003 abort
1004 .baz = 12
1005 "#}),
1006 file: None,
1007 drop_on_error: false,
1008 drop_on_abort: false,
1009 ..Default::default()
1010 };
1011 let mut tform = remap(conf).unwrap();
1012
1013 let event = transform_one(&mut tform, event).unwrap();
1014
1015 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1016 assert!(event.as_log().get("foo").is_none());
1017 assert!(event.as_log().get("baz").is_none());
1018 }
1019
1020 #[test]
1021 fn check_remap_abort_drop() {
1022 let event = {
1023 let mut event = Event::Log(LogEvent::from("augment me"));
1024 event.as_mut_log().insert("bar", "is a string");
1025 event
1026 };
1027
1028 let conf = RemapConfig {
1029 source: Some(formatdoc! {r#"
1030 .foo = "foo"
1031 abort
1032 .baz = 12
1033 "#}),
1034 file: None,
1035 drop_on_error: false,
1036 drop_on_abort: true,
1037 ..Default::default()
1038 };
1039 let mut tform = remap(conf).unwrap();
1040
1041 assert!(transform_one(&mut tform, event).is_none())
1042 }
1043
1044 #[test]
1045 fn check_remap_metric() {
1046 let metric = Event::Metric(Metric::new(
1047 "counter",
1048 MetricKind::Absolute,
1049 MetricValue::Counter { value: 1.0 },
1050 ));
1051 let metadata = metric.metadata().clone();
1052
1053 let conf = RemapConfig {
1054 source: Some(
1055 r#".tags.host = "zoobub"
1056 .name = "zork"
1057 .namespace = "zerk"
1058 .kind = "incremental""#
1059 .to_string(),
1060 ),
1061 file: None,
1062 drop_on_error: true,
1063 drop_on_abort: false,
1064 ..Default::default()
1065 };
1066 let mut tform = remap(conf).unwrap();
1067
1068 let result = transform_one(&mut tform, metric).unwrap();
1069 assert_eq!(
1070 result,
1071 Event::Metric(
1072 Metric::new_with_metadata(
1073 "zork",
1074 MetricKind::Incremental,
1075 MetricValue::Counter { value: 1.0 },
1076 metadata
1079 )
1080 .with_namespace(Some("zerk"))
1081 .with_tags(Some(metric_tags! {
1082 "host" => "zoobub",
1083 }))
1084 )
1085 );
1086 }
1087
1088 #[test]
1089 fn remap_timezone_fallback() {
1090 let error = Event::from_json_value(
1091 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1092 LogNamespace::Legacy,
1093 )
1094 .unwrap();
1095 let conf = RemapConfig {
1096 source: Some(formatdoc! {r#"
1097 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1098 "#}),
1099 drop_on_error: true,
1100 drop_on_abort: true,
1101 reroute_dropped: true,
1102 ..Default::default()
1103 };
1104 let context = TransformContext {
1105 key: Some(ComponentKey::from("remapper")),
1106 globals: GlobalOptions {
1107 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1108 ..Default::default()
1109 },
1110 ..Default::default()
1111 };
1112 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1113
1114 let output = transform_one_fallible(&mut tform, error).unwrap();
1115 let log = output.as_log();
1116 assert_eq!(
1117 log["timestamp"],
1118 DateTime::<chrono::Utc>::from(
1119 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1120 )
1121 .into()
1122 );
1123 }
1124
1125 #[test]
1126 fn remap_timezone_override() {
1127 let error = Event::from_json_value(
1128 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1129 LogNamespace::Legacy,
1130 )
1131 .unwrap();
1132 let conf = RemapConfig {
1133 source: Some(formatdoc! {r#"
1134 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1135 "#}),
1136 drop_on_error: true,
1137 drop_on_abort: true,
1138 reroute_dropped: true,
1139 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1140 ..Default::default()
1141 };
1142 let context = TransformContext {
1143 key: Some(ComponentKey::from("remapper")),
1144 globals: GlobalOptions {
1145 timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1146 ..Default::default()
1147 },
1148 ..Default::default()
1149 };
1150 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1151
1152 let output = transform_one_fallible(&mut tform, error).unwrap();
1153 let log = output.as_log();
1154 assert_eq!(
1155 log["timestamp"],
1156 DateTime::<chrono::Utc>::from(
1157 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1158 )
1159 .into()
1160 );
1161 }
1162
1163 #[test]
1164 fn check_remap_branching() {
1165 let happy =
1166 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1167 .unwrap();
1168 let abort = Event::from_json_value(
1169 serde_json::json!({"hello": "goodbye"}),
1170 LogNamespace::Legacy,
1171 )
1172 .unwrap();
1173 let error =
1174 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1175
1176 let happy_metric = {
1177 let mut metric = Metric::new(
1178 "counter",
1179 MetricKind::Absolute,
1180 MetricValue::Counter { value: 1.0 },
1181 );
1182 metric.replace_tag("hello".into(), "world".into());
1183 Event::Metric(metric)
1184 };
1185
1186 let abort_metric = {
1187 let mut metric = Metric::new(
1188 "counter",
1189 MetricKind::Absolute,
1190 MetricValue::Counter { value: 1.0 },
1191 );
1192 metric.replace_tag("hello".into(), "goodbye".into());
1193 Event::Metric(metric)
1194 };
1195
1196 let error_metric = {
1197 let mut metric = Metric::new(
1198 "counter",
1199 MetricKind::Absolute,
1200 MetricValue::Counter { value: 1.0 },
1201 );
1202 metric.replace_tag("not_hello".into(), "oops".into());
1203 Event::Metric(metric)
1204 };
1205
1206 let conf = RemapConfig {
1207 source: Some(formatdoc! {r#"
1208 if exists(.tags) {{
1209 # metrics
1210 .tags.foo = "bar"
1211 if string!(.tags.hello) == "goodbye" {{
1212 abort
1213 }}
1214 }} else {{
1215 # logs
1216 .foo = "bar"
1217 if string(.hello) == "goodbye" {{
1218 abort
1219 }}
1220 }}
1221 "#}),
1222 drop_on_error: true,
1223 drop_on_abort: true,
1224 reroute_dropped: true,
1225 ..Default::default()
1226 };
1227 let schema_definitions = HashMap::from([
1228 (
1229 None,
1230 [("source".into(), test_default_schema_definition())].into(),
1231 ),
1232 (
1233 Some(DROPPED.to_owned()),
1234 [("source".into(), test_dropped_schema_definition())].into(),
1235 ),
1236 ]);
1237 let context = TransformContext {
1238 key: Some(ComponentKey::from("remapper")),
1239 schema_definitions,
1240 merged_schema_definition: schema::Definition::new_with_default_metadata(
1241 Kind::any_object(),
1242 [LogNamespace::Legacy],
1243 )
1244 .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1245 ..Default::default()
1246 };
1247 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1248
1249 let output = transform_one_fallible(&mut tform, happy).unwrap();
1250 let log = output.as_log();
1251 assert_eq!(log["hello"], "world".into());
1252 assert_eq!(log["foo"], "bar".into());
1253 assert!(!log.contains(event_path!("metadata")));
1254
1255 let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1256 let log = output.as_log();
1257 assert_eq!(log["hello"], "goodbye".into());
1258 assert!(!log.contains(event_path!("foo")));
1259 assert_eq!(
1260 log["metadata"],
1261 serde_json::json!({
1262 "dropped": {
1263 "reason": "abort",
1264 "message": "aborted",
1265 "component_id": "remapper",
1266 "component_type": "remap",
1267 "component_kind": "transform",
1268 }
1269 })
1270 .try_into()
1271 .unwrap()
1272 );
1273
1274 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1275 let log = output.as_log();
1276 assert_eq!(log["hello"], 42.into());
1277 assert!(!log.contains(event_path!("foo")));
1278 assert_eq!(
1279 log["metadata"],
1280 serde_json::json!({
1281 "dropped": {
1282 "reason": "error",
1283 "message": "function call error for \"string\" at (160:174): expected string, got integer",
1284 "component_id": "remapper",
1285 "component_type": "remap",
1286 "component_kind": "transform",
1287 }
1288 })
1289 .try_into()
1290 .unwrap()
1291 );
1292
1293 let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1294 similar_asserts::assert_eq!(
1295 output,
1296 Event::Metric(
1297 Metric::new_with_metadata(
1298 "counter",
1299 MetricKind::Absolute,
1300 MetricValue::Counter { value: 1.0 },
1301 EventMetadata::default()
1304 .with_schema_definition(output.metadata().schema_definition()),
1305 )
1306 .with_tags(Some(metric_tags! {
1307 "hello" => "world",
1308 "foo" => "bar",
1309 }))
1310 )
1311 );
1312
1313 let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1314 similar_asserts::assert_eq!(
1315 output,
1316 Event::Metric(
1317 Metric::new_with_metadata(
1318 "counter",
1319 MetricKind::Absolute,
1320 MetricValue::Counter { value: 1.0 },
1321 EventMetadata::default()
1324 .with_schema_definition(output.metadata().schema_definition()),
1325 )
1326 .with_tags(Some(metric_tags! {
1327 "hello" => "goodbye",
1328 "metadata.dropped.reason" => "abort",
1329 "metadata.dropped.component_id" => "remapper",
1330 "metadata.dropped.component_type" => "remap",
1331 "metadata.dropped.component_kind" => "transform",
1332 }))
1333 )
1334 );
1335
1336 let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1337 similar_asserts::assert_eq!(
1338 output,
1339 Event::Metric(
1340 Metric::new_with_metadata(
1341 "counter",
1342 MetricKind::Absolute,
1343 MetricValue::Counter { value: 1.0 },
1344 EventMetadata::default()
1347 .with_schema_definition(output.metadata().schema_definition()),
1348 )
1349 .with_tags(Some(metric_tags! {
1350 "not_hello" => "oops",
1351 "metadata.dropped.reason" => "error",
1352 "metadata.dropped.component_id" => "remapper",
1353 "metadata.dropped.component_type" => "remap",
1354 "metadata.dropped.component_kind" => "transform",
1355 }))
1356 )
1357 );
1358 }
1359
1360 #[test]
1361 fn check_remap_branching_assert_with_message() {
1362 let error_trigger_assert_custom_message =
1363 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1364 let error_trigger_default_assert_message =
1365 Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1366 let conf = RemapConfig {
1367 source: Some(formatdoc! {r#"
1368 assert_eq!(.hello, 0, "custom message here")
1369 assert_eq!(.hello, 1)
1370 "#}),
1371 drop_on_error: true,
1372 drop_on_abort: true,
1373 reroute_dropped: true,
1374 ..Default::default()
1375 };
1376 let context = TransformContext {
1377 key: Some(ComponentKey::from("remapper")),
1378 ..Default::default()
1379 };
1380 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1381
1382 let output =
1383 transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1384 let log = output.as_log();
1385 assert_eq!(log["hello"], 42.into());
1386 assert!(!log.contains(event_path!("foo")));
1387 assert_eq!(
1388 log["metadata"],
1389 serde_json::json!({
1390 "dropped": {
1391 "reason": "error",
1392 "message": "custom message here",
1393 "component_id": "remapper",
1394 "component_type": "remap",
1395 "component_kind": "transform",
1396 }
1397 })
1398 .try_into()
1399 .unwrap()
1400 );
1401
1402 let output =
1403 transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1404 let log = output.as_log();
1405 assert_eq!(log["hello"], 0.into());
1406 assert!(!log.contains(event_path!("foo")));
1407 assert_eq!(
1408 log["metadata"],
1409 serde_json::json!({
1410 "dropped": {
1411 "reason": "error",
1412 "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1413 "component_id": "remapper",
1414 "component_type": "remap",
1415 "component_kind": "transform",
1416 }
1417 })
1418 .try_into()
1419 .unwrap()
1420 );
1421 }
1422
1423 #[test]
1424 fn check_remap_branching_abort_with_message() {
1425 let error =
1426 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1427 let conf = RemapConfig {
1428 source: Some(formatdoc! {r#"
1429 abort "custom message here"
1430 "#}),
1431 drop_on_error: true,
1432 drop_on_abort: true,
1433 reroute_dropped: true,
1434 ..Default::default()
1435 };
1436 let context = TransformContext {
1437 key: Some(ComponentKey::from("remapper")),
1438 ..Default::default()
1439 };
1440 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1441
1442 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1443 let log = output.as_log();
1444 assert_eq!(log["hello"], 42.into());
1445 assert!(!log.contains(event_path!("foo")));
1446 assert_eq!(
1447 log["metadata"],
1448 serde_json::json!({
1449 "dropped": {
1450 "reason": "abort",
1451 "message": "custom message here",
1452 "component_id": "remapper",
1453 "component_type": "remap",
1454 "component_kind": "transform",
1455 }
1456 })
1457 .try_into()
1458 .unwrap()
1459 );
1460 }
1461
1462 #[test]
1463 fn check_remap_branching_disabled() {
1464 let happy =
1465 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1466 .unwrap();
1467 let abort = Event::from_json_value(
1468 serde_json::json!({"hello": "goodbye"}),
1469 LogNamespace::Legacy,
1470 )
1471 .unwrap();
1472 let error =
1473 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1474
1475 let conf = RemapConfig {
1476 source: Some(formatdoc! {r#"
1477 if exists(.tags) {{
1478 # metrics
1479 .tags.foo = "bar"
1480 if string!(.tags.hello) == "goodbye" {{
1481 abort
1482 }}
1483 }} else {{
1484 # logs
1485 .foo = "bar"
1486 if string!(.hello) == "goodbye" {{
1487 abort
1488 }}
1489 }}
1490 "#}),
1491 drop_on_error: true,
1492 drop_on_abort: true,
1493 reroute_dropped: false,
1494 ..Default::default()
1495 };
1496
1497 let schema_definition = schema::Definition::new_with_default_metadata(
1498 Kind::any_object(),
1499 [LogNamespace::Legacy],
1500 )
1501 .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1502 .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1503
1504 assert_eq!(
1505 conf.outputs(
1506 &Default::default(),
1507 &[(
1508 "test".into(),
1509 schema::Definition::new_with_default_metadata(
1510 Kind::any_object(),
1511 [LogNamespace::Legacy]
1512 )
1513 )],
1514 ),
1515 vec![TransformOutput::new(
1516 DataType::all_bits(),
1517 [("test".into(), schema_definition)].into()
1518 )]
1519 );
1520
1521 let context = TransformContext {
1522 key: Some(ComponentKey::from("remapper")),
1523 ..Default::default()
1524 };
1525 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1526
1527 let output = transform_one_fallible(&mut tform, happy).unwrap();
1528 let log = output.as_log();
1529 assert_eq!(log["hello"], "world".into());
1530 assert_eq!(log["foo"], "bar".into());
1531 assert!(!log.contains(event_path!("metadata")));
1532
1533 let out = collect_outputs(&mut tform, abort);
1534 assert!(out.primary.is_empty());
1535 assert!(out.named[DROPPED].is_empty());
1536
1537 let out = collect_outputs(&mut tform, error);
1538 assert!(out.primary.is_empty());
1539 assert!(out.named[DROPPED].is_empty());
1540 }
1541
1542 #[tokio::test]
1543 async fn check_remap_branching_metrics_with_output() {
1544 init_test();
1545
1546 let config: ConfigBuilder = toml::from_str(indoc! {r#"
1547 [transforms.foo]
1548 inputs = []
1549 type = "remap"
1550 drop_on_abort = true
1551 reroute_dropped = true
1552 source = "abort"
1553
1554 [[tests]]
1555 name = "metric output"
1556
1557 [tests.input]
1558 insert_at = "foo"
1559 value = "none"
1560
1561 [[tests.outputs]]
1562 extract_from = "foo.dropped"
1563 [[tests.outputs.conditions]]
1564 type = "vrl"
1565 source = "true"
1566 "#})
1567 .unwrap();
1568
1569 let mut tests = build_unit_tests(config).await.unwrap();
1570 assert!(tests.remove(0).run().await.errors.is_empty());
1571 COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1573 }
1574
1575 struct CollectedOuput {
1576 primary: OutputBuffer,
1577 named: HashMap<String, OutputBuffer>,
1578 }
1579
1580 fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput {
1581 let mut outputs = TransformOutputsBuf::new_with_capacity(
1582 vec![
1583 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1584 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1585 ],
1586 1,
1587 );
1588
1589 ft.transform(event, &mut outputs);
1590
1591 CollectedOuput {
1592 primary: outputs.take_primary(),
1593 named: outputs.take_all_named(),
1594 }
1595 }
1596
1597 fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1598 let out = collect_outputs(ft, event);
1599 assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1600 assert!(out.primary.len() <= 1);
1601 out.primary.into_events().next()
1602 }
1603
1604 fn transform_one_fallible(
1605 ft: &mut dyn SyncTransform,
1606 event: Event,
1607 ) -> std::result::Result<Event, Event> {
1608 let mut outputs = TransformOutputsBuf::new_with_capacity(
1609 vec![
1610 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1611 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1612 ],
1613 1,
1614 );
1615
1616 ft.transform(event, &mut outputs);
1617
1618 let mut buf = outputs.drain().collect::<Vec<_>>();
1619 let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1620
1621 assert!(buf.len() < 2);
1622 assert!(err_buf.len() < 2);
1623 match (buf.pop(), err_buf.pop()) {
1624 (Some(good), None) => Ok(good),
1625 (None, Some(bad)) => Err(bad),
1626 (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1627 }
1628 }
1629
1630 #[tokio::test]
1631 async fn emits_internal_events() {
1632 assert_transform_compliance(async move {
1633 let config = RemapConfig {
1634 source: Some("abort".to_owned()),
1635 drop_on_abort: true,
1636 ..Default::default()
1637 };
1638
1639 let (tx, rx) = mpsc::channel(1);
1640 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1641
1642 let log = LogEvent::from("hello world");
1643 tx.send(log.into()).await.unwrap();
1644
1645 drop(tx);
1646 topology.stop().await;
1647 assert_eq!(out.recv().await, None);
1648 })
1649 .await
1650 }
1651
1652 #[test]
1653 fn test_combined_transforms_simple() {
1654 let transform1 = RemapConfig {
1659 source: Some(r#".thing = "potato""#.to_string()),
1660 ..Default::default()
1661 };
1662
1663 let transform2 = RemapConfig {
1664 source: Some(".thang = .thing".to_string()),
1665 ..Default::default()
1666 };
1667
1668 let outputs1 = transform1.outputs(
1669 &Default::default(),
1670 &[("in".into(), schema::Definition::default_legacy_namespace())],
1671 );
1672
1673 assert_eq!(
1674 vec![TransformOutput::new(
1675 DataType::all_bits(),
1676 [(
1678 "in".into(),
1679 Definition::default_legacy_namespace().with_event_field(
1680 &owned_value_path!("thing"),
1681 Kind::bytes(),
1682 None
1683 ),
1684 )]
1685 .into()
1686 )],
1687 outputs1
1688 );
1689
1690 let outputs2 = transform2.outputs(
1691 &Default::default(),
1692 &[(
1693 "in1".into(),
1694 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1695 )],
1696 );
1697
1698 assert_eq!(
1699 vec![TransformOutput::new(
1700 DataType::all_bits(),
1701 [(
1702 "in1".into(),
1703 Definition::default_legacy_namespace()
1704 .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1705 .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1706 )]
1707 .into(),
1708 )],
1709 outputs2
1710 );
1711 }
1712
1713 #[test]
1714 fn test_combined_transforms_unnest() {
1715 let transform1 = RemapConfig {
1720 source: Some(
1721 indoc! {
1722 r#"
1723 .thing = [{"cabbage": 32}, {"parsnips": 45}]
1724 . = unnest(.thing)
1725 "#
1726 }
1727 .to_string(),
1728 ),
1729 ..Default::default()
1730 };
1731
1732 let transform2 = RemapConfig {
1733 source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1734 ..Default::default()
1735 };
1736
1737 let outputs1 = transform1.outputs(
1738 &Default::default(),
1739 &[(
1740 "in".into(),
1741 schema::Definition::new_with_default_metadata(
1742 Kind::any_object(),
1743 [LogNamespace::Legacy],
1744 ),
1745 )],
1746 );
1747
1748 assert_eq!(
1749 vec![TransformOutput::new(
1750 DataType::all_bits(),
1751 [(
1752 "in".into(),
1753 Definition::new_with_default_metadata(
1754 Kind::any_object(),
1755 [LogNamespace::Legacy]
1756 )
1757 .with_event_field(
1758 &owned_value_path!("thing"),
1759 Kind::object(Collection::from(BTreeMap::from([
1760 ("cabbage".into(), Kind::integer().or_undefined(),),
1761 ("parsnips".into(), Kind::integer().or_undefined(),)
1762 ]))),
1763 None
1764 ),
1765 )]
1766 .into(),
1767 )],
1768 outputs1
1769 );
1770
1771 let outputs2 = transform2.outputs(
1772 &Default::default(),
1773 &[(
1774 "in1".into(),
1775 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1776 )],
1777 );
1778
1779 assert_eq!(
1780 vec![TransformOutput::new(
1781 DataType::all_bits(),
1782 [(
1783 "in1".into(),
1784 Definition::default_legacy_namespace()
1785 .with_event_field(
1786 &owned_value_path!("thing"),
1787 Kind::object(Collection::from(BTreeMap::from([
1788 ("cabbage".into(), Kind::integer().or_undefined(),),
1789 ("parsnips".into(), Kind::integer().or_undefined(),)
1790 ]))),
1791 None
1792 )
1793 .with_event_field(
1794 &owned_value_path!("thang"),
1795 Kind::integer().or_null(),
1796 None
1797 ),
1798 )]
1799 .into(),
1800 )],
1801 outputs2
1802 );
1803 }
1804
1805 #[test]
1806 fn test_transform_abort() {
1807 let transform1 = RemapConfig {
1810 source: Some(r"abort".to_string()),
1811 ..Default::default()
1812 };
1813
1814 let outputs1 = transform1.outputs(
1815 &Default::default(),
1816 &[(
1817 "in".into(),
1818 schema::Definition::new_with_default_metadata(
1819 Kind::any_object(),
1820 [LogNamespace::Legacy],
1821 ),
1822 )],
1823 );
1824
1825 assert_eq!(
1826 vec![TransformOutput::new(
1827 DataType::all_bits(),
1828 [(
1829 "in".into(),
1830 Definition::new_with_default_metadata(
1831 Kind::any_object(),
1832 [LogNamespace::Legacy]
1833 ),
1834 )]
1835 .into(),
1836 )],
1837 outputs1
1838 );
1839 }
1840
1841 #[test]
1842 fn test_error_outputs() {
1843 let transform1 = RemapConfig {
1848 source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1850 reroute_dropped: true,
1851 ..Default::default()
1852 };
1853
1854 let outputs1 = transform1.outputs(
1855 &Default::default(),
1856 &[(
1857 "in".into(),
1858 schema::Definition::new_with_default_metadata(
1859 Kind::any_object(),
1860 [LogNamespace::Legacy],
1861 ),
1862 )],
1863 );
1864
1865 assert_eq!(
1866 HashSet::from([None, Some("dropped".to_string())]),
1867 outputs1
1868 .into_iter()
1869 .map(|output| output.port)
1870 .collect::<HashSet<_>>()
1871 );
1872 }
1873
1874 #[test]
1875 fn test_non_object_events() {
1876 let transform1 = RemapConfig {
1877 source: Some(r#". = "fish" "#.to_string()),
1879 ..Default::default()
1880 };
1881
1882 let outputs1 = transform1.outputs(
1883 &Default::default(),
1884 &[(
1885 "in".into(),
1886 schema::Definition::new_with_default_metadata(
1887 Kind::any_object(),
1888 [LogNamespace::Legacy],
1889 ),
1890 )],
1891 );
1892
1893 let wanted = schema::Definition::new_with_default_metadata(
1894 Kind::object(Collection::from_unknown(Kind::undefined())),
1895 [LogNamespace::Legacy],
1896 )
1897 .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1898
1899 assert_eq!(
1900 HashMap::from([(OutputId::from("in"), wanted)]),
1901 outputs1[0].schema_definitions(true),
1902 );
1903 }
1904
1905 #[test]
1906 fn test_array_and_non_object_events() {
1907 let transform1 = RemapConfig {
1908 source: Some(
1909 indoc! {r#"
1910 if .lizard == true {
1911 .thing = [{"cabbage": 42}];
1912 . = unnest(.thing)
1913 } else {
1914 . = "fish"
1915 }
1916 "#}
1917 .to_string(),
1918 ),
1919 ..Default::default()
1920 };
1921
1922 let outputs1 = transform1.outputs(
1923 &Default::default(),
1924 &[(
1925 "in".into(),
1926 schema::Definition::new_with_default_metadata(
1927 Kind::any_object(),
1928 [LogNamespace::Legacy],
1929 ),
1930 )],
1931 );
1932
1933 let wanted = schema::Definition::new_with_default_metadata(
1934 Kind::any_object(),
1935 [LogNamespace::Legacy],
1936 )
1937 .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1938 .with_event_field(
1939 &owned_value_path!("thing"),
1940 Kind::object(Collection::from(BTreeMap::from([(
1941 "cabbage".into(),
1942 Kind::integer(),
1943 )])))
1944 .or_undefined(),
1945 None,
1946 );
1947
1948 assert_eq!(
1949 HashMap::from([(OutputId::from("in"), wanted)]),
1950 outputs1[0].schema_definitions(true),
1951 );
1952 }
1953
1954 #[test]
1955 fn check_remap_array_vector_namespace() {
1956 let event = {
1957 let mut event = LogEvent::from("input");
1958 event
1960 .metadata_mut()
1961 .value_mut()
1962 .insert("vector", BTreeMap::new());
1963 Event::from(event)
1964 };
1965
1966 let conf = RemapConfig {
1967 source: Some(
1968 r". = [null]
1969"
1970 .to_string(),
1971 ),
1972 file: None,
1973 drop_on_error: true,
1974 drop_on_abort: false,
1975 ..Default::default()
1976 };
1977 let mut tform = remap(conf.clone()).unwrap();
1978 let result = transform_one(&mut tform, event).unwrap();
1979
1980 assert_eq!(result.as_log().get("."), Some(&Value::Null));
1982
1983 let outputs1 = conf.outputs(
1984 &Default::default(),
1985 &[(
1986 "in".into(),
1987 schema::Definition::new_with_default_metadata(
1988 Kind::any_object(),
1989 [LogNamespace::Vector],
1990 ),
1991 )],
1992 );
1993
1994 let wanted =
1995 schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
1996
1997 assert_eq!(
1998 HashMap::from([(OutputId::from("in"), wanted)]),
1999 outputs1[0].schema_definitions(true),
2000 );
2001 }
2002
2003 fn assert_no_metrics(source: String) {
2004 vector_lib::metrics::init_test();
2005
2006 let config = RemapConfig {
2007 source: Some(source),
2008 drop_on_error: true,
2009 drop_on_abort: true,
2010 reroute_dropped: true,
2011 ..Default::default()
2012 };
2013 let mut ast_runner = remap(config).unwrap();
2014 let input_event =
2015 Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2016 let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2017 let dropped_log = dropped_event.as_log();
2018 assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2019
2020 let controller = Controller::get().expect("no controller");
2021 let metrics = controller
2022 .capture_metrics()
2023 .into_iter()
2024 .map(|metric| (metric.name().to_string(), metric))
2025 .collect::<BTreeMap<String, Metric>>();
2026 assert_eq!(metrics.get("component_discarded_events_total"), None);
2027 assert_eq!(metrics.get("component_errors_total"), None);
2028 }
2029 #[test]
2030 fn do_not_emit_metrics_when_dropped() {
2031 assert_no_metrics("abort".to_string());
2032 }
2033
2034 #[test]
2035 fn do_not_emit_metrics_when_errored() {
2036 assert_no_metrics("parse_key_value!(.message)".to_string());
2037 }
2038}