1use std::collections::HashMap;
2use std::sync::Mutex;
3use std::{
4 collections::BTreeMap,
5 fs::File,
6 io::{self, Read},
7 path::PathBuf,
8};
9
10use snafu::{ResultExt, Snafu};
11use vector_lib::codecs::MetricTagValues;
12use vector_lib::compile_vrl;
13use vector_lib::config::LogNamespace;
14use vector_lib::configurable::configurable_component;
15use vector_lib::enrichment::TableRegistry;
16use vector_lib::lookup::{metadata_path, owned_value_path, PathPrefix};
17use vector_lib::schema::Definition;
18use vector_lib::TimeZone;
19use vector_vrl_functions::set_semantic_meaning::MeaningList;
20use vrl::compiler::runtime::{Runtime, Terminate};
21use vrl::compiler::state::ExternalEnv;
22use vrl::compiler::{CompileConfig, ExpressionError, Program, TypeState, VrlRuntime};
23use vrl::diagnostic::{DiagnosticMessage, Formatter, Note};
24use vrl::path;
25use vrl::path::ValuePath;
26use vrl::value::{Kind, Value};
27
28use crate::config::OutputId;
29use crate::{
30 config::{
31 log_schema, ComponentKey, DataType, Input, TransformConfig, TransformContext,
32 TransformOutput,
33 },
34 event::{Event, TargetEvents, VrlTarget},
35 internal_events::{RemapMappingAbort, RemapMappingError},
36 schema,
37 transforms::{SyncTransform, Transform, TransformOutputsBuf},
38 Result,
39};
40
41const DROPPED: &str = "dropped";
42type CacheKey = (TableRegistry, schema::Definition);
43type CacheValue = (Program, String, MeaningList);
44
45#[configurable_component(transform(
47 "remap",
48 "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
49))]
50#[derive(Derivative)]
51#[serde(deny_unknown_fields)]
52#[derivative(Default, Debug)]
53pub struct RemapConfig {
54 #[configurable(metadata(
60 docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
61 docs::syntax_override = "remap_program"
62 ))]
63 pub source: Option<String>,
64
65 #[configurable(metadata(docs::examples = "./my/program.vrl"))]
73 pub file: Option<PathBuf>,
74
75 #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
83 pub files: Option<Vec<PathBuf>>,
84
85 #[serde(default)]
92 pub metric_tag_values: MetricTagValues,
93
94 #[serde(default)]
103 #[configurable(metadata(docs::advanced))]
104 pub timezone: Option<TimeZone>,
105
106 #[serde(default = "crate::serde::default_false")]
117 #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
118 pub drop_on_error: bool,
119
120 #[serde(default = "crate::serde::default_true")]
131 #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
132 pub drop_on_abort: bool,
133
134 #[serde(default = "crate::serde::default_false")]
144 #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
145 pub reroute_dropped: bool,
146
147 #[configurable(derived, metadata(docs::hidden))]
148 #[serde(default)]
149 pub runtime: VrlRuntime,
150
151 #[configurable(derived, metadata(docs::hidden))]
152 #[serde(skip)]
153 #[derivative(Debug = "ignore")]
154 pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
158}
159
160impl Clone for RemapConfig {
161 fn clone(&self) -> Self {
162 Self {
163 source: self.source.clone(),
164 file: self.file.clone(),
165 files: self.files.clone(),
166 metric_tag_values: self.metric_tag_values,
167 timezone: self.timezone,
168 drop_on_error: self.drop_on_error,
169 drop_on_abort: self.drop_on_abort,
170 reroute_dropped: self.reroute_dropped,
171 runtime: self.runtime,
172 cache: Mutex::new(Default::default()),
173 }
174 }
175}
176
177impl RemapConfig {
178 fn compile_vrl_program(
179 &self,
180 enrichment_tables: TableRegistry,
181 merged_schema_definition: schema::Definition,
182 ) -> Result<(Program, String, MeaningList)> {
183 if let Some((_, res)) = self
184 .cache
185 .lock()
186 .expect("Data poisoned")
187 .iter()
188 .find(|v| v.0 .0 == enrichment_tables && v.0 .1 == merged_schema_definition)
189 {
190 return res.clone().map_err(Into::into);
191 }
192
193 let source = match (&self.source, &self.file, &self.files) {
194 (Some(source), None, None) => source.to_owned(),
195 (None, Some(path), None) => Self::read_file(path)?,
196 (None, None, Some(paths)) => {
197 let mut combined_source = String::new();
198 for path in paths {
199 let content = Self::read_file(path)?;
200 combined_source.push_str(&content);
201 combined_source.push('\n');
202 }
203 combined_source
204 }
205 _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
206 };
207
208 let mut functions = vrl::stdlib::all();
209 functions.append(&mut vector_lib::enrichment::vrl_functions());
210 #[cfg(feature = "sources-dnstap")]
211 functions.append(&mut dnstap_parser::vrl_functions());
212 functions.append(&mut vector_vrl_functions::all());
213
214 let state = TypeState {
215 local: Default::default(),
216 external: ExternalEnv::new_with_kind(
217 merged_schema_definition.event_kind().clone(),
218 merged_schema_definition.metadata_kind().clone(),
219 ),
220 };
221 let mut config = CompileConfig::default();
222
223 config.set_custom(enrichment_tables.clone());
224 config.set_custom(MeaningList::default());
225
226 let res = compile_vrl(&source, &functions, &state, config)
227 .map_err(|diagnostics| Formatter::new(&source, diagnostics).colored().to_string())
228 .map(|result| {
229 (
230 result.program,
231 Formatter::new(&source, result.warnings).to_string(),
232 result.config.get_custom::<MeaningList>().unwrap().clone(),
233 )
234 });
235
236 self.cache
237 .lock()
238 .expect("Data poisoned")
239 .push(((enrichment_tables, merged_schema_definition), res.clone()));
240
241 res.map_err(Into::into)
242 }
243
244 fn read_file(path: &PathBuf) -> Result<String> {
245 let mut buffer = String::new();
246 File::open(path)
247 .with_context(|_| FileOpenFailedSnafu { path })?
248 .read_to_string(&mut buffer)
249 .with_context(|_| FileReadFailedSnafu { path })?;
250 Ok(buffer)
251 }
252}
253
254impl_generate_config_from_default!(RemapConfig);
255
256#[async_trait::async_trait]
257#[typetag::serde(name = "remap")]
258impl TransformConfig for RemapConfig {
259 async fn build(&self, context: &TransformContext) -> Result<Transform> {
260 let (transform, warnings) = match self.runtime {
261 VrlRuntime::Ast => {
262 let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
263 (Transform::synchronous(remap), warnings)
264 }
265 };
266
267 if !warnings.is_empty() {
272 warn!(message = "VRL compilation warning.", %warnings);
273 }
274
275 Ok(transform)
276 }
277
278 fn input(&self) -> Input {
279 Input::all()
280 }
281
282 fn outputs(
283 &self,
284 enrichment_tables: vector_lib::enrichment::TableRegistry,
285 input_definitions: &[(OutputId, schema::Definition)],
286 _: LogNamespace,
287 ) -> Vec<TransformOutput> {
288 let merged_definition: Definition = input_definitions
289 .iter()
290 .map(|(_output, definition)| definition.clone())
291 .reduce(Definition::merge)
292 .unwrap_or_else(Definition::any);
293
294 let compiled = self
298 .compile_vrl_program(enrichment_tables, merged_definition)
299 .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
300 .map_err(|_| ());
301
302 let mut dropped_definitions = HashMap::new();
303 let mut default_definitions = HashMap::new();
304
305 for (output_id, input_definition) in input_definitions {
306 let default_definition = compiled
307 .clone()
308 .map(|(state, meaning)| {
309 let mut new_type_def = Definition::new(
310 state.external.target_kind().clone(),
311 state.external.metadata_kind().clone(),
312 input_definition.log_namespaces().clone(),
313 );
314
315 for (id, path) in input_definition.meanings() {
316 let _ = new_type_def.try_with_meaning(path.clone(), id);
320 }
321
322 for (id, path) in meaning {
324 new_type_def = new_type_def.with_meaning(path, &id);
326 }
327 new_type_def
328 })
329 .unwrap_or_else(|_| {
330 Definition::new_with_default_metadata(
331 Kind::never(),
333 input_definition.log_namespaces().clone(),
334 )
335 });
336
337 let dropped_definition = Definition::combine_log_namespaces(
340 input_definition.log_namespaces(),
341 input_definition.clone().with_event_field(
342 log_schema().metadata_key().expect("valid metadata key"),
343 Kind::object(BTreeMap::from([
344 ("reason".into(), Kind::bytes()),
345 ("message".into(), Kind::bytes()),
346 ("component_id".into(), Kind::bytes()),
347 ("component_type".into(), Kind::bytes()),
348 ("component_kind".into(), Kind::bytes()),
349 ])),
350 Some("metadata"),
351 ),
352 input_definition
353 .clone()
354 .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
355 .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
356 .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
357 .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
358 .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
359 );
360
361 default_definitions.insert(
362 output_id.clone(),
363 VrlTarget::modify_schema_definition_for_into_events(default_definition),
364 );
365 dropped_definitions.insert(
366 output_id.clone(),
367 VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
368 );
369 }
370
371 let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
372
373 if self.reroute_dropped {
374 vec![
375 default_output,
376 TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
377 ]
378 } else {
379 vec![default_output]
380 }
381 }
382
383 fn enable_concurrency(&self) -> bool {
384 true
385 }
386
387 fn files_to_watch(&self) -> Vec<&PathBuf> {
388 self.file
389 .iter()
390 .chain(self.files.iter().flatten())
391 .collect()
392 }
393}
394
395#[derive(Debug, Clone)]
396pub struct Remap<Runner>
397where
398 Runner: VrlRunner,
399{
400 component_key: Option<ComponentKey>,
401 program: Program,
402 timezone: TimeZone,
403 drop_on_error: bool,
404 drop_on_abort: bool,
405 reroute_dropped: bool,
406 runner: Runner,
407 metric_tag_values: MetricTagValues,
408}
409
410pub trait VrlRunner {
411 fn run(
412 &mut self,
413 target: &mut VrlTarget,
414 program: &Program,
415 timezone: &TimeZone,
416 ) -> std::result::Result<Value, Terminate>;
417}
418
419#[derive(Debug)]
420pub struct AstRunner {
421 pub runtime: Runtime,
422}
423
424impl Clone for AstRunner {
425 fn clone(&self) -> Self {
426 Self {
427 runtime: Runtime::default(),
428 }
429 }
430}
431
432impl VrlRunner for AstRunner {
433 fn run(
434 &mut self,
435 target: &mut VrlTarget,
436 program: &Program,
437 timezone: &TimeZone,
438 ) -> std::result::Result<Value, Terminate> {
439 let result = self.runtime.resolve(target, program, timezone);
440 self.runtime.clear();
441 result
442 }
443}
444
445impl Remap<AstRunner> {
446 pub fn new_ast(
447 config: RemapConfig,
448 context: &TransformContext,
449 ) -> crate::Result<(Self, String)> {
450 let (program, warnings, _) = config.compile_vrl_program(
451 context.enrichment_tables.clone(),
452 context.merged_schema_definition.clone(),
453 )?;
454
455 let runtime = Runtime::default();
456 let runner = AstRunner { runtime };
457
458 Self::new(config, context, program, runner).map(|remap| (remap, warnings))
459 }
460}
461
462impl<Runner> Remap<Runner>
463where
464 Runner: VrlRunner,
465{
466 fn new(
467 config: RemapConfig,
468 context: &TransformContext,
469 program: Program,
470 runner: Runner,
471 ) -> crate::Result<Self> {
472 Ok(Remap {
473 component_key: context.key.clone(),
474 program,
475 timezone: config
476 .timezone
477 .unwrap_or_else(|| context.globals.timezone()),
478 drop_on_error: config.drop_on_error,
479 drop_on_abort: config.drop_on_abort,
480 reroute_dropped: config.reroute_dropped,
481 runner,
482 metric_tag_values: config.metric_tag_values,
483 })
484 }
485
486 #[cfg(test)]
487 const fn runner(&self) -> &Runner {
488 &self.runner
489 }
490
491 fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
492 let message = error
493 .notes()
494 .iter()
495 .filter(|note| matches!(note, Note::UserErrorMessage(_)))
496 .next_back()
497 .map(|note| note.to_string())
498 .unwrap_or_else(|| error.to_string());
499 serde_json::json!({
500 "reason": reason,
501 "message": message,
502 "component_id": self.component_key,
503 "component_type": "remap",
504 "component_kind": "transform",
505 })
506 }
507
508 fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
509 match event {
510 Event::Log(log) => match log.namespace() {
511 LogNamespace::Legacy => {
512 if let Some(metadata_key) = log_schema().metadata_key() {
513 log.insert(
514 (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
515 self.dropped_data(reason, error),
516 );
517 }
518 }
519 LogNamespace::Vector => {
520 log.insert(
521 metadata_path!("vector", "dropped"),
522 self.dropped_data(reason, error),
523 );
524 }
525 },
526 Event::Metric(metric) => {
527 if let Some(metadata_key) = log_schema().metadata_key() {
528 metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
529 metric.replace_tag(
530 format!("{metadata_key}.dropped.component_id"),
531 self.component_key
532 .as_ref()
533 .map(ToString::to_string)
534 .unwrap_or_default(),
535 );
536 metric.replace_tag(
537 format!("{metadata_key}.dropped.component_type"),
538 "remap".into(),
539 );
540 metric.replace_tag(
541 format!("{metadata_key}.dropped.component_kind"),
542 "transform".into(),
543 );
544 }
545 }
546 Event::Trace(trace) => {
547 trace.maybe_insert(log_schema().metadata_key_target_path(), || {
548 self.dropped_data(reason, error).into()
549 });
550 }
551 }
552 }
553
554 fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
555 self.runner.run(target, &self.program, &self.timezone)
556 }
557}
558
559impl<Runner> SyncTransform for Remap<Runner>
560where
561 Runner: VrlRunner + Clone + Send + Sync,
562{
563 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
564 let forward_on_error = !self.drop_on_error || self.reroute_dropped;
575 let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
576 let original_event = if (self.program.info().fallible && forward_on_error)
577 || (self.program.info().abortable && forward_on_abort)
578 {
579 Some(event.clone())
580 } else {
581 None
582 };
583
584 let log_namespace = event
585 .maybe_as_log()
586 .map(|log| log.namespace())
587 .unwrap_or(LogNamespace::Legacy);
588
589 let mut target = VrlTarget::new(
590 event,
591 self.program.info(),
592 match self.metric_tag_values {
593 MetricTagValues::Single => false,
594 MetricTagValues::Full => true,
595 },
596 );
597 let result = self.run_vrl(&mut target);
598
599 match result {
600 Ok(_) => match target.into_events(log_namespace) {
601 TargetEvents::One(event) => push_default(event, output),
602 TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
603 TargetEvents::Traces(events) => {
604 events.for_each(|event| push_default(event, output))
605 }
606 },
607 Err(reason) => {
608 let (reason, error, drop) = match reason {
609 Terminate::Abort(error) => {
610 if !self.reroute_dropped {
611 emit!(RemapMappingAbort {
612 event_dropped: self.drop_on_abort,
613 });
614 }
615 ("abort", error, self.drop_on_abort)
616 }
617 Terminate::Error(error) => {
618 if !self.reroute_dropped {
619 emit!(RemapMappingError {
620 error: error.to_string(),
621 event_dropped: self.drop_on_error,
622 });
623 }
624 ("error", error, self.drop_on_error)
625 }
626 };
627
628 if !drop {
629 let event = original_event.expect("event will be set");
630
631 push_default(event, output);
632 } else if self.reroute_dropped {
633 let mut event = original_event.expect("event will be set");
634
635 self.annotate_dropped(&mut event, reason, error);
636 push_dropped(event, output);
637 }
638 }
639 }
640 }
641}
642
643#[inline]
644fn push_default(event: Event, output: &mut TransformOutputsBuf) {
645 output.push(None, event)
646}
647
648#[inline]
649fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
650 output.push(Some(DROPPED), event);
651}
652
653#[derive(Debug, Snafu)]
654pub enum BuildError {
655 #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
656 SourceAndOrFileOrFiles,
657
658 #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
659 FileOpenFailed { path: PathBuf, source: io::Error },
660 #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
661 FileReadFailed { path: PathBuf, source: io::Error },
662}
663
664#[cfg(test)]
665mod tests {
666 use std::collections::{HashMap, HashSet};
667 use std::sync::Arc;
668
669 use indoc::{formatdoc, indoc};
670 use vector_lib::{config::GlobalOptions, event::EventMetadata, metric_tags};
671 use vrl::value::kind::Collection;
672 use vrl::{btreemap, event_path};
673
674 use super::*;
675 use crate::metrics::Controller;
676 use crate::{
677 config::{build_unit_tests, ConfigBuilder},
678 event::{
679 metric::{MetricKind, MetricValue},
680 LogEvent, Metric, Value,
681 },
682 schema,
683 test_util::components::{
684 assert_transform_compliance, init_test, COMPONENT_MULTIPLE_OUTPUTS_TESTS,
685 },
686 transforms::test::create_topology,
687 transforms::OutputBuffer,
688 };
689 use chrono::DateTime;
690 use tokio::sync::mpsc;
691 use tokio_stream::wrappers::ReceiverStream;
692 use vector_lib::enrichment::TableRegistry;
693
694 fn test_default_schema_definition() -> schema::Definition {
695 schema::Definition::empty_legacy_namespace().with_event_field(
696 &owned_value_path!("a default field"),
697 Kind::integer().or_bytes(),
698 Some("default"),
699 )
700 }
701
702 fn test_dropped_schema_definition() -> schema::Definition {
703 schema::Definition::empty_legacy_namespace().with_event_field(
704 &owned_value_path!("a dropped field"),
705 Kind::boolean().or_null(),
706 Some("dropped"),
707 )
708 }
709
710 fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
711 let schema_definitions = HashMap::from([
712 (
713 None,
714 [("source".into(), test_default_schema_definition())].into(),
715 ),
716 (
717 Some(DROPPED.to_owned()),
718 [("source".into(), test_dropped_schema_definition())].into(),
719 ),
720 ]);
721
722 Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
723 .map(|(remap, _)| remap)
724 }
725
726 #[test]
727 fn generate_config() {
728 crate::test_util::test_generate_config::<RemapConfig>();
729 }
730
731 #[test]
732 fn config_missing_source_and_file() {
733 let config = RemapConfig {
734 source: None,
735 file: None,
736 ..Default::default()
737 };
738
739 let err = remap(config).unwrap_err().to_string();
740 assert_eq!(
741 &err,
742 "must provide exactly one of `source` or `file` or `files` configuration"
743 )
744 }
745
746 #[test]
747 fn config_both_source_and_file() {
748 let config = RemapConfig {
749 source: Some("".to_owned()),
750 file: Some("".into()),
751 ..Default::default()
752 };
753
754 let err = remap(config).unwrap_err().to_string();
755 assert_eq!(
756 &err,
757 "must provide exactly one of `source` or `file` or `files` configuration"
758 )
759 }
760
761 fn get_field_string(event: &Event, field: &str) -> String {
762 event
763 .as_log()
764 .get(field)
765 .unwrap()
766 .to_string_lossy()
767 .into_owned()
768 }
769
770 #[test]
771 fn check_remap_doesnt_share_state_between_events() {
772 let conf = RemapConfig {
773 source: Some(".foo = .sentinel".to_string()),
774 file: None,
775 drop_on_error: true,
776 drop_on_abort: false,
777 ..Default::default()
778 };
779 let mut tform = remap(conf).unwrap();
780 assert!(tform.runner().runtime.is_empty());
781
782 let event1 = {
783 let mut event1 = LogEvent::from("event1");
784 event1.insert("sentinel", "bar");
785 Event::from(event1)
786 };
787 let result1 = transform_one(&mut tform, event1).unwrap();
788 assert_eq!(get_field_string(&result1, "message"), "event1");
789 assert_eq!(get_field_string(&result1, "foo"), "bar");
790 assert!(tform.runner().runtime.is_empty());
791
792 let event2 = {
793 let event2 = LogEvent::from("event2");
794 Event::from(event2)
795 };
796 let result2 = transform_one(&mut tform, event2).unwrap();
797 assert_eq!(get_field_string(&result2, "message"), "event2");
798 assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
799 assert!(tform.runner().runtime.is_empty());
800 }
801
802 #[test]
803 fn remap_return_raw_string_vector_namespace() {
804 let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
805
806 let event = {
807 let mut metadata = EventMetadata::default()
808 .with_schema_definition(&Arc::new(initial_definition.clone()));
809 metadata
811 .value_mut()
812 .insert(&owned_value_path!("vector"), BTreeMap::new());
813
814 let mut event = LogEvent::new_with_metadata(metadata);
815 event.insert("copy_from", "buz");
816 Event::from(event)
817 };
818
819 let conf = RemapConfig {
820 source: Some(r#" . = "root string";"#.to_string()),
821 file: None,
822 drop_on_error: true,
823 drop_on_abort: false,
824 ..Default::default()
825 };
826 let mut tform = remap(conf.clone()).unwrap();
827 let result = transform_one(&mut tform, event).unwrap();
828 assert_eq!(get_field_string(&result, "."), "root string");
829
830 let mut outputs = conf.outputs(
831 TableRegistry::default(),
832 &[(OutputId::dummy(), initial_definition)],
833 LogNamespace::Vector,
834 );
835
836 assert_eq!(outputs.len(), 1);
837 let output = outputs.pop().unwrap();
838 assert_eq!(output.port, None);
839 let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
840 let expected_schema =
841 Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
842 assert_eq!(actual_schema_def, expected_schema);
843 }
844
845 #[test]
846 fn check_remap_adds() {
847 let event = {
848 let mut event = LogEvent::from("augment me");
849 event.insert("copy_from", "buz");
850 Event::from(event)
851 };
852
853 let conf = RemapConfig {
854 source: Some(
855 r#" .foo = "bar"
856 .bar = "baz"
857 .copy = .copy_from
858"#
859 .to_string(),
860 ),
861 file: None,
862 drop_on_error: true,
863 drop_on_abort: false,
864 ..Default::default()
865 };
866 let mut tform = remap(conf).unwrap();
867 let result = transform_one(&mut tform, event).unwrap();
868 assert_eq!(get_field_string(&result, "message"), "augment me");
869 assert_eq!(get_field_string(&result, "copy_from"), "buz");
870 assert_eq!(get_field_string(&result, "foo"), "bar");
871 assert_eq!(get_field_string(&result, "bar"), "baz");
872 assert_eq!(get_field_string(&result, "copy"), "buz");
873 }
874
875 #[test]
876 fn check_remap_emits_multiple() {
877 let event = {
878 let mut event = LogEvent::from("augment me");
879 event.insert(
880 "events",
881 vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
882 );
883 Event::from(event)
884 };
885
886 let conf = RemapConfig {
887 source: Some(
888 indoc! {r"
889 . = .events
890 "}
891 .to_owned(),
892 ),
893 file: None,
894 drop_on_error: true,
895 drop_on_abort: false,
896 ..Default::default()
897 };
898 let mut tform = remap(conf).unwrap();
899
900 let out = collect_outputs(&mut tform, event);
901 assert_eq!(2, out.primary.len());
902 let mut result = out.primary.into_events();
903
904 let r = result.next().unwrap();
905 assert_eq!(get_field_string(&r, "message"), "foo");
906 let r = result.next().unwrap();
907 assert_eq!(get_field_string(&r, "message"), "bar");
908 }
909
910 #[test]
911 fn check_remap_error() {
912 let event = {
913 let mut event = Event::Log(LogEvent::from("augment me"));
914 event.as_mut_log().insert("bar", "is a string");
915 event
916 };
917
918 let conf = RemapConfig {
919 source: Some(formatdoc! {r#"
920 .foo = "foo"
921 .not_an_int = int!(.bar)
922 .baz = 12
923 "#}),
924 file: None,
925 drop_on_error: false,
926 drop_on_abort: false,
927 ..Default::default()
928 };
929 let mut tform = remap(conf).unwrap();
930
931 let event = transform_one(&mut tform, event).unwrap();
932
933 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
934 assert!(event.as_log().get("foo").is_none());
935 assert!(event.as_log().get("baz").is_none());
936 }
937
938 #[test]
939 fn check_remap_error_drop() {
940 let event = {
941 let mut event = Event::Log(LogEvent::from("augment me"));
942 event.as_mut_log().insert("bar", "is a string");
943 event
944 };
945
946 let conf = RemapConfig {
947 source: Some(formatdoc! {r#"
948 .foo = "foo"
949 .not_an_int = int!(.bar)
950 .baz = 12
951 "#}),
952 file: None,
953 drop_on_error: true,
954 drop_on_abort: false,
955 ..Default::default()
956 };
957 let mut tform = remap(conf).unwrap();
958
959 assert!(transform_one(&mut tform, event).is_none())
960 }
961
962 #[test]
963 fn check_remap_error_infallible() {
964 let event = {
965 let mut event = Event::Log(LogEvent::from("augment me"));
966 event.as_mut_log().insert("bar", "is a string");
967 event
968 };
969
970 let conf = RemapConfig {
971 source: Some(formatdoc! {r#"
972 .foo = "foo"
973 .baz = 12
974 "#}),
975 file: None,
976 drop_on_error: false,
977 drop_on_abort: false,
978 ..Default::default()
979 };
980 let mut tform = remap(conf).unwrap();
981
982 let event = transform_one(&mut tform, event).unwrap();
983
984 assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
985 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
986 assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
987 }
988
989 #[test]
990 fn check_remap_abort() {
991 let event = {
992 let mut event = Event::Log(LogEvent::from("augment me"));
993 event.as_mut_log().insert("bar", "is a string");
994 event
995 };
996
997 let conf = RemapConfig {
998 source: Some(formatdoc! {r#"
999 .foo = "foo"
1000 abort
1001 .baz = 12
1002 "#}),
1003 file: None,
1004 drop_on_error: false,
1005 drop_on_abort: false,
1006 ..Default::default()
1007 };
1008 let mut tform = remap(conf).unwrap();
1009
1010 let event = transform_one(&mut tform, event).unwrap();
1011
1012 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1013 assert!(event.as_log().get("foo").is_none());
1014 assert!(event.as_log().get("baz").is_none());
1015 }
1016
1017 #[test]
1018 fn check_remap_abort_drop() {
1019 let event = {
1020 let mut event = Event::Log(LogEvent::from("augment me"));
1021 event.as_mut_log().insert("bar", "is a string");
1022 event
1023 };
1024
1025 let conf = RemapConfig {
1026 source: Some(formatdoc! {r#"
1027 .foo = "foo"
1028 abort
1029 .baz = 12
1030 "#}),
1031 file: None,
1032 drop_on_error: false,
1033 drop_on_abort: true,
1034 ..Default::default()
1035 };
1036 let mut tform = remap(conf).unwrap();
1037
1038 assert!(transform_one(&mut tform, event).is_none())
1039 }
1040
1041 #[test]
1042 fn check_remap_metric() {
1043 let metric = Event::Metric(Metric::new(
1044 "counter",
1045 MetricKind::Absolute,
1046 MetricValue::Counter { value: 1.0 },
1047 ));
1048 let metadata = metric.metadata().clone();
1049
1050 let conf = RemapConfig {
1051 source: Some(
1052 r#".tags.host = "zoobub"
1053 .name = "zork"
1054 .namespace = "zerk"
1055 .kind = "incremental""#
1056 .to_string(),
1057 ),
1058 file: None,
1059 drop_on_error: true,
1060 drop_on_abort: false,
1061 ..Default::default()
1062 };
1063 let mut tform = remap(conf).unwrap();
1064
1065 let result = transform_one(&mut tform, metric).unwrap();
1066 assert_eq!(
1067 result,
1068 Event::Metric(
1069 Metric::new_with_metadata(
1070 "zork",
1071 MetricKind::Incremental,
1072 MetricValue::Counter { value: 1.0 },
1073 metadata
1076 )
1077 .with_namespace(Some("zerk"))
1078 .with_tags(Some(metric_tags! {
1079 "host" => "zoobub",
1080 }))
1081 )
1082 );
1083 }
1084
1085 #[test]
1086 fn remap_timezone_fallback() {
1087 let error = Event::from_json_value(
1088 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1089 LogNamespace::Legacy,
1090 )
1091 .unwrap();
1092 let conf = RemapConfig {
1093 source: Some(formatdoc! {r#"
1094 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1095 "#}),
1096 drop_on_error: true,
1097 drop_on_abort: true,
1098 reroute_dropped: true,
1099 ..Default::default()
1100 };
1101 let context = TransformContext {
1102 key: Some(ComponentKey::from("remapper")),
1103 globals: GlobalOptions {
1104 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1105 ..Default::default()
1106 },
1107 ..Default::default()
1108 };
1109 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1110
1111 let output = transform_one_fallible(&mut tform, error).unwrap();
1112 let log = output.as_log();
1113 assert_eq!(
1114 log["timestamp"],
1115 DateTime::<chrono::Utc>::from(
1116 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1117 )
1118 .into()
1119 );
1120 }
1121
1122 #[test]
1123 fn remap_timezone_override() {
1124 let error = Event::from_json_value(
1125 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1126 LogNamespace::Legacy,
1127 )
1128 .unwrap();
1129 let conf = RemapConfig {
1130 source: Some(formatdoc! {r#"
1131 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1132 "#}),
1133 drop_on_error: true,
1134 drop_on_abort: true,
1135 reroute_dropped: true,
1136 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1137 ..Default::default()
1138 };
1139 let context = TransformContext {
1140 key: Some(ComponentKey::from("remapper")),
1141 globals: GlobalOptions {
1142 timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1143 ..Default::default()
1144 },
1145 ..Default::default()
1146 };
1147 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1148
1149 let output = transform_one_fallible(&mut tform, error).unwrap();
1150 let log = output.as_log();
1151 assert_eq!(
1152 log["timestamp"],
1153 DateTime::<chrono::Utc>::from(
1154 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1155 )
1156 .into()
1157 );
1158 }
1159
1160 #[test]
1161 fn check_remap_branching() {
1162 let happy =
1163 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1164 .unwrap();
1165 let abort = Event::from_json_value(
1166 serde_json::json!({"hello": "goodbye"}),
1167 LogNamespace::Legacy,
1168 )
1169 .unwrap();
1170 let error =
1171 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1172
1173 let happy_metric = {
1174 let mut metric = Metric::new(
1175 "counter",
1176 MetricKind::Absolute,
1177 MetricValue::Counter { value: 1.0 },
1178 );
1179 metric.replace_tag("hello".into(), "world".into());
1180 Event::Metric(metric)
1181 };
1182
1183 let abort_metric = {
1184 let mut metric = Metric::new(
1185 "counter",
1186 MetricKind::Absolute,
1187 MetricValue::Counter { value: 1.0 },
1188 );
1189 metric.replace_tag("hello".into(), "goodbye".into());
1190 Event::Metric(metric)
1191 };
1192
1193 let error_metric = {
1194 let mut metric = Metric::new(
1195 "counter",
1196 MetricKind::Absolute,
1197 MetricValue::Counter { value: 1.0 },
1198 );
1199 metric.replace_tag("not_hello".into(), "oops".into());
1200 Event::Metric(metric)
1201 };
1202
1203 let conf = RemapConfig {
1204 source: Some(formatdoc! {r#"
1205 if exists(.tags) {{
1206 # metrics
1207 .tags.foo = "bar"
1208 if string!(.tags.hello) == "goodbye" {{
1209 abort
1210 }}
1211 }} else {{
1212 # logs
1213 .foo = "bar"
1214 if string(.hello) == "goodbye" {{
1215 abort
1216 }}
1217 }}
1218 "#}),
1219 drop_on_error: true,
1220 drop_on_abort: true,
1221 reroute_dropped: true,
1222 ..Default::default()
1223 };
1224 let schema_definitions = HashMap::from([
1225 (
1226 None,
1227 [("source".into(), test_default_schema_definition())].into(),
1228 ),
1229 (
1230 Some(DROPPED.to_owned()),
1231 [("source".into(), test_dropped_schema_definition())].into(),
1232 ),
1233 ]);
1234 let context = TransformContext {
1235 key: Some(ComponentKey::from("remapper")),
1236 schema_definitions,
1237 merged_schema_definition: schema::Definition::new_with_default_metadata(
1238 Kind::any_object(),
1239 [LogNamespace::Legacy],
1240 )
1241 .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1242 ..Default::default()
1243 };
1244 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1245
1246 let output = transform_one_fallible(&mut tform, happy).unwrap();
1247 let log = output.as_log();
1248 assert_eq!(log["hello"], "world".into());
1249 assert_eq!(log["foo"], "bar".into());
1250 assert!(!log.contains(event_path!("metadata")));
1251
1252 let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1253 let log = output.as_log();
1254 assert_eq!(log["hello"], "goodbye".into());
1255 assert!(!log.contains(event_path!("foo")));
1256 assert_eq!(
1257 log["metadata"],
1258 serde_json::json!({
1259 "dropped": {
1260 "reason": "abort",
1261 "message": "aborted",
1262 "component_id": "remapper",
1263 "component_type": "remap",
1264 "component_kind": "transform",
1265 }
1266 })
1267 .try_into()
1268 .unwrap()
1269 );
1270
1271 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1272 let log = output.as_log();
1273 assert_eq!(log["hello"], 42.into());
1274 assert!(!log.contains(event_path!("foo")));
1275 assert_eq!(
1276 log["metadata"],
1277 serde_json::json!({
1278 "dropped": {
1279 "reason": "error",
1280 "message": "function call error for \"string\" at (160:174): expected string, got integer",
1281 "component_id": "remapper",
1282 "component_type": "remap",
1283 "component_kind": "transform",
1284 }
1285 })
1286 .try_into()
1287 .unwrap()
1288 );
1289
1290 let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1291 similar_asserts::assert_eq!(
1292 output,
1293 Event::Metric(
1294 Metric::new_with_metadata(
1295 "counter",
1296 MetricKind::Absolute,
1297 MetricValue::Counter { value: 1.0 },
1298 EventMetadata::default()
1301 .with_schema_definition(output.metadata().schema_definition()),
1302 )
1303 .with_tags(Some(metric_tags! {
1304 "hello" => "world",
1305 "foo" => "bar",
1306 }))
1307 )
1308 );
1309
1310 let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1311 similar_asserts::assert_eq!(
1312 output,
1313 Event::Metric(
1314 Metric::new_with_metadata(
1315 "counter",
1316 MetricKind::Absolute,
1317 MetricValue::Counter { value: 1.0 },
1318 EventMetadata::default()
1321 .with_schema_definition(output.metadata().schema_definition()),
1322 )
1323 .with_tags(Some(metric_tags! {
1324 "hello" => "goodbye",
1325 "metadata.dropped.reason" => "abort",
1326 "metadata.dropped.component_id" => "remapper",
1327 "metadata.dropped.component_type" => "remap",
1328 "metadata.dropped.component_kind" => "transform",
1329 }))
1330 )
1331 );
1332
1333 let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1334 similar_asserts::assert_eq!(
1335 output,
1336 Event::Metric(
1337 Metric::new_with_metadata(
1338 "counter",
1339 MetricKind::Absolute,
1340 MetricValue::Counter { value: 1.0 },
1341 EventMetadata::default()
1344 .with_schema_definition(output.metadata().schema_definition()),
1345 )
1346 .with_tags(Some(metric_tags! {
1347 "not_hello" => "oops",
1348 "metadata.dropped.reason" => "error",
1349 "metadata.dropped.component_id" => "remapper",
1350 "metadata.dropped.component_type" => "remap",
1351 "metadata.dropped.component_kind" => "transform",
1352 }))
1353 )
1354 );
1355 }
1356
1357 #[test]
1358 fn check_remap_branching_assert_with_message() {
1359 let error_trigger_assert_custom_message =
1360 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1361 let error_trigger_default_assert_message =
1362 Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1363 let conf = RemapConfig {
1364 source: Some(formatdoc! {r#"
1365 assert_eq!(.hello, 0, "custom message here")
1366 assert_eq!(.hello, 1)
1367 "#}),
1368 drop_on_error: true,
1369 drop_on_abort: true,
1370 reroute_dropped: true,
1371 ..Default::default()
1372 };
1373 let context = TransformContext {
1374 key: Some(ComponentKey::from("remapper")),
1375 ..Default::default()
1376 };
1377 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1378
1379 let output =
1380 transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1381 let log = output.as_log();
1382 assert_eq!(log["hello"], 42.into());
1383 assert!(!log.contains(event_path!("foo")));
1384 assert_eq!(
1385 log["metadata"],
1386 serde_json::json!({
1387 "dropped": {
1388 "reason": "error",
1389 "message": "custom message here",
1390 "component_id": "remapper",
1391 "component_type": "remap",
1392 "component_kind": "transform",
1393 }
1394 })
1395 .try_into()
1396 .unwrap()
1397 );
1398
1399 let output =
1400 transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1401 let log = output.as_log();
1402 assert_eq!(log["hello"], 0.into());
1403 assert!(!log.contains(event_path!("foo")));
1404 assert_eq!(
1405 log["metadata"],
1406 serde_json::json!({
1407 "dropped": {
1408 "reason": "error",
1409 "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1410 "component_id": "remapper",
1411 "component_type": "remap",
1412 "component_kind": "transform",
1413 }
1414 })
1415 .try_into()
1416 .unwrap()
1417 );
1418 }
1419
1420 #[test]
1421 fn check_remap_branching_abort_with_message() {
1422 let error =
1423 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1424 let conf = RemapConfig {
1425 source: Some(formatdoc! {r#"
1426 abort "custom message here"
1427 "#}),
1428 drop_on_error: true,
1429 drop_on_abort: true,
1430 reroute_dropped: true,
1431 ..Default::default()
1432 };
1433 let context = TransformContext {
1434 key: Some(ComponentKey::from("remapper")),
1435 ..Default::default()
1436 };
1437 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1438
1439 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1440 let log = output.as_log();
1441 assert_eq!(log["hello"], 42.into());
1442 assert!(!log.contains(event_path!("foo")));
1443 assert_eq!(
1444 log["metadata"],
1445 serde_json::json!({
1446 "dropped": {
1447 "reason": "abort",
1448 "message": "custom message here",
1449 "component_id": "remapper",
1450 "component_type": "remap",
1451 "component_kind": "transform",
1452 }
1453 })
1454 .try_into()
1455 .unwrap()
1456 );
1457 }
1458
1459 #[test]
1460 fn check_remap_branching_disabled() {
1461 let happy =
1462 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1463 .unwrap();
1464 let abort = Event::from_json_value(
1465 serde_json::json!({"hello": "goodbye"}),
1466 LogNamespace::Legacy,
1467 )
1468 .unwrap();
1469 let error =
1470 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1471
1472 let conf = RemapConfig {
1473 source: Some(formatdoc! {r#"
1474 if exists(.tags) {{
1475 # metrics
1476 .tags.foo = "bar"
1477 if string!(.tags.hello) == "goodbye" {{
1478 abort
1479 }}
1480 }} else {{
1481 # logs
1482 .foo = "bar"
1483 if string!(.hello) == "goodbye" {{
1484 abort
1485 }}
1486 }}
1487 "#}),
1488 drop_on_error: true,
1489 drop_on_abort: true,
1490 reroute_dropped: false,
1491 ..Default::default()
1492 };
1493
1494 let schema_definition = schema::Definition::new_with_default_metadata(
1495 Kind::any_object(),
1496 [LogNamespace::Legacy],
1497 )
1498 .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1499 .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1500
1501 assert_eq!(
1502 conf.outputs(
1503 vector_lib::enrichment::TableRegistry::default(),
1504 &[(
1505 "test".into(),
1506 schema::Definition::new_with_default_metadata(
1507 Kind::any_object(),
1508 [LogNamespace::Legacy]
1509 )
1510 )],
1511 LogNamespace::Legacy
1512 ),
1513 vec![TransformOutput::new(
1514 DataType::all_bits(),
1515 [("test".into(), schema_definition)].into()
1516 )]
1517 );
1518
1519 let context = TransformContext {
1520 key: Some(ComponentKey::from("remapper")),
1521 ..Default::default()
1522 };
1523 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1524
1525 let output = transform_one_fallible(&mut tform, happy).unwrap();
1526 let log = output.as_log();
1527 assert_eq!(log["hello"], "world".into());
1528 assert_eq!(log["foo"], "bar".into());
1529 assert!(!log.contains(event_path!("metadata")));
1530
1531 let out = collect_outputs(&mut tform, abort);
1532 assert!(out.primary.is_empty());
1533 assert!(out.named[DROPPED].is_empty());
1534
1535 let out = collect_outputs(&mut tform, error);
1536 assert!(out.primary.is_empty());
1537 assert!(out.named[DROPPED].is_empty());
1538 }
1539
1540 #[tokio::test]
1541 async fn check_remap_branching_metrics_with_output() {
1542 init_test();
1543
1544 let config: ConfigBuilder = toml::from_str(indoc! {r#"
1545 [transforms.foo]
1546 inputs = []
1547 type = "remap"
1548 drop_on_abort = true
1549 reroute_dropped = true
1550 source = "abort"
1551
1552 [[tests]]
1553 name = "metric output"
1554
1555 [tests.input]
1556 insert_at = "foo"
1557 value = "none"
1558
1559 [[tests.outputs]]
1560 extract_from = "foo.dropped"
1561 [[tests.outputs.conditions]]
1562 type = "vrl"
1563 source = "true"
1564 "#})
1565 .unwrap();
1566
1567 let mut tests = build_unit_tests(config).await.unwrap();
1568 assert!(tests.remove(0).run().await.errors.is_empty());
1569 COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1571 }
1572
1573 struct CollectedOuput {
1574 primary: OutputBuffer,
1575 named: HashMap<String, OutputBuffer>,
1576 }
1577
1578 fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput {
1579 let mut outputs = TransformOutputsBuf::new_with_capacity(
1580 vec![
1581 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1582 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1583 ],
1584 1,
1585 );
1586
1587 ft.transform(event, &mut outputs);
1588
1589 CollectedOuput {
1590 primary: outputs.take_primary(),
1591 named: outputs.take_all_named(),
1592 }
1593 }
1594
1595 fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1596 let out = collect_outputs(ft, event);
1597 assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1598 assert!(out.primary.len() <= 1);
1599 out.primary.into_events().next()
1600 }
1601
1602 fn transform_one_fallible(
1603 ft: &mut dyn SyncTransform,
1604 event: Event,
1605 ) -> std::result::Result<Event, Event> {
1606 let mut outputs = TransformOutputsBuf::new_with_capacity(
1607 vec![
1608 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1609 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1610 ],
1611 1,
1612 );
1613
1614 ft.transform(event, &mut outputs);
1615
1616 let mut buf = outputs.drain().collect::<Vec<_>>();
1617 let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1618
1619 assert!(buf.len() < 2);
1620 assert!(err_buf.len() < 2);
1621 match (buf.pop(), err_buf.pop()) {
1622 (Some(good), None) => Ok(good),
1623 (None, Some(bad)) => Err(bad),
1624 (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1625 }
1626 }
1627
1628 #[tokio::test]
1629 async fn emits_internal_events() {
1630 assert_transform_compliance(async move {
1631 let config = RemapConfig {
1632 source: Some("abort".to_owned()),
1633 drop_on_abort: true,
1634 ..Default::default()
1635 };
1636
1637 let (tx, rx) = mpsc::channel(1);
1638 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1639
1640 let log = LogEvent::from("hello world");
1641 tx.send(log.into()).await.unwrap();
1642
1643 drop(tx);
1644 topology.stop().await;
1645 assert_eq!(out.recv().await, None);
1646 })
1647 .await
1648 }
1649
1650 #[test]
1651 fn test_combined_transforms_simple() {
1652 let transform1 = RemapConfig {
1657 source: Some(r#".thing = "potato""#.to_string()),
1658 ..Default::default()
1659 };
1660
1661 let transform2 = RemapConfig {
1662 source: Some(".thang = .thing".to_string()),
1663 ..Default::default()
1664 };
1665
1666 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1667
1668 let outputs1 = transform1.outputs(
1669 enrichment_tables.clone(),
1670 &[("in".into(), schema::Definition::default_legacy_namespace())],
1671 LogNamespace::Legacy,
1672 );
1673
1674 assert_eq!(
1675 vec![TransformOutput::new(
1676 DataType::all_bits(),
1677 [(
1679 "in".into(),
1680 Definition::default_legacy_namespace().with_event_field(
1681 &owned_value_path!("thing"),
1682 Kind::bytes(),
1683 None
1684 ),
1685 )]
1686 .into()
1687 )],
1688 outputs1
1689 );
1690
1691 let outputs2 = transform2.outputs(
1692 enrichment_tables,
1693 &[(
1694 "in1".into(),
1695 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1696 )],
1697 LogNamespace::Legacy,
1698 );
1699
1700 assert_eq!(
1701 vec![TransformOutput::new(
1702 DataType::all_bits(),
1703 [(
1704 "in1".into(),
1705 Definition::default_legacy_namespace()
1706 .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1707 .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1708 )]
1709 .into(),
1710 )],
1711 outputs2
1712 );
1713 }
1714
1715 #[test]
1716 fn test_combined_transforms_unnest() {
1717 let transform1 = RemapConfig {
1722 source: Some(
1723 indoc! {
1724 r#"
1725 .thing = [{"cabbage": 32}, {"parsnips": 45}]
1726 . = unnest(.thing)
1727 "#
1728 }
1729 .to_string(),
1730 ),
1731 ..Default::default()
1732 };
1733
1734 let transform2 = RemapConfig {
1735 source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1736 ..Default::default()
1737 };
1738
1739 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1740
1741 let outputs1 = transform1.outputs(
1742 enrichment_tables.clone(),
1743 &[(
1744 "in".into(),
1745 schema::Definition::new_with_default_metadata(
1746 Kind::any_object(),
1747 [LogNamespace::Legacy],
1748 ),
1749 )],
1750 LogNamespace::Legacy,
1751 );
1752
1753 assert_eq!(
1754 vec![TransformOutput::new(
1755 DataType::all_bits(),
1756 [(
1757 "in".into(),
1758 Definition::new_with_default_metadata(
1759 Kind::any_object(),
1760 [LogNamespace::Legacy]
1761 )
1762 .with_event_field(
1763 &owned_value_path!("thing"),
1764 Kind::object(Collection::from(BTreeMap::from([
1765 ("cabbage".into(), Kind::integer().or_undefined(),),
1766 ("parsnips".into(), Kind::integer().or_undefined(),)
1767 ]))),
1768 None
1769 ),
1770 )]
1771 .into(),
1772 )],
1773 outputs1
1774 );
1775
1776 let outputs2 = transform2.outputs(
1777 enrichment_tables,
1778 &[(
1779 "in1".into(),
1780 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1781 )],
1782 LogNamespace::Legacy,
1783 );
1784
1785 assert_eq!(
1786 vec![TransformOutput::new(
1787 DataType::all_bits(),
1788 [(
1789 "in1".into(),
1790 Definition::default_legacy_namespace()
1791 .with_event_field(
1792 &owned_value_path!("thing"),
1793 Kind::object(Collection::from(BTreeMap::from([
1794 ("cabbage".into(), Kind::integer().or_undefined(),),
1795 ("parsnips".into(), Kind::integer().or_undefined(),)
1796 ]))),
1797 None
1798 )
1799 .with_event_field(
1800 &owned_value_path!("thang"),
1801 Kind::integer().or_null(),
1802 None
1803 ),
1804 )]
1805 .into(),
1806 )],
1807 outputs2
1808 );
1809 }
1810
1811 #[test]
1812 fn test_transform_abort() {
1813 let transform1 = RemapConfig {
1816 source: Some(r"abort".to_string()),
1817 ..Default::default()
1818 };
1819
1820 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1821
1822 let outputs1 = transform1.outputs(
1823 enrichment_tables,
1824 &[(
1825 "in".into(),
1826 schema::Definition::new_with_default_metadata(
1827 Kind::any_object(),
1828 [LogNamespace::Legacy],
1829 ),
1830 )],
1831 LogNamespace::Legacy,
1832 );
1833
1834 assert_eq!(
1835 vec![TransformOutput::new(
1836 DataType::all_bits(),
1837 [(
1838 "in".into(),
1839 Definition::new_with_default_metadata(
1840 Kind::any_object(),
1841 [LogNamespace::Legacy]
1842 ),
1843 )]
1844 .into(),
1845 )],
1846 outputs1
1847 );
1848 }
1849
1850 #[test]
1851 fn test_error_outputs() {
1852 let transform1 = RemapConfig {
1857 source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1859 reroute_dropped: true,
1860 ..Default::default()
1861 };
1862
1863 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1864
1865 let outputs1 = transform1.outputs(
1866 enrichment_tables,
1867 &[(
1868 "in".into(),
1869 schema::Definition::new_with_default_metadata(
1870 Kind::any_object(),
1871 [LogNamespace::Legacy],
1872 ),
1873 )],
1874 LogNamespace::Legacy,
1875 );
1876
1877 assert_eq!(
1878 HashSet::from([None, Some("dropped".to_string())]),
1879 outputs1
1880 .into_iter()
1881 .map(|output| output.port)
1882 .collect::<HashSet<_>>()
1883 );
1884 }
1885
1886 #[test]
1887 fn test_non_object_events() {
1888 let transform1 = RemapConfig {
1889 source: Some(r#". = "fish" "#.to_string()),
1891 ..Default::default()
1892 };
1893
1894 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1895
1896 let outputs1 = transform1.outputs(
1897 enrichment_tables,
1898 &[(
1899 "in".into(),
1900 schema::Definition::new_with_default_metadata(
1901 Kind::any_object(),
1902 [LogNamespace::Legacy],
1903 ),
1904 )],
1905 LogNamespace::Legacy,
1906 );
1907
1908 let wanted = schema::Definition::new_with_default_metadata(
1909 Kind::object(Collection::from_unknown(Kind::undefined())),
1910 [LogNamespace::Legacy],
1911 )
1912 .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1913
1914 assert_eq!(
1915 HashMap::from([(OutputId::from("in"), wanted)]),
1916 outputs1[0].schema_definitions(true),
1917 );
1918 }
1919
1920 #[test]
1921 fn test_array_and_non_object_events() {
1922 let transform1 = RemapConfig {
1923 source: Some(
1924 indoc! {r#"
1925 if .lizard == true {
1926 .thing = [{"cabbage": 42}];
1927 . = unnest(.thing)
1928 } else {
1929 . = "fish"
1930 }
1931 "#}
1932 .to_string(),
1933 ),
1934 ..Default::default()
1935 };
1936
1937 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1938
1939 let outputs1 = transform1.outputs(
1940 enrichment_tables,
1941 &[(
1942 "in".into(),
1943 schema::Definition::new_with_default_metadata(
1944 Kind::any_object(),
1945 [LogNamespace::Legacy],
1946 ),
1947 )],
1948 LogNamespace::Legacy,
1949 );
1950
1951 let wanted = schema::Definition::new_with_default_metadata(
1952 Kind::any_object(),
1953 [LogNamespace::Legacy],
1954 )
1955 .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1956 .with_event_field(
1957 &owned_value_path!("thing"),
1958 Kind::object(Collection::from(BTreeMap::from([(
1959 "cabbage".into(),
1960 Kind::integer(),
1961 )])))
1962 .or_undefined(),
1963 None,
1964 );
1965
1966 assert_eq!(
1967 HashMap::from([(OutputId::from("in"), wanted)]),
1968 outputs1[0].schema_definitions(true),
1969 );
1970 }
1971
1972 #[test]
1973 fn check_remap_array_vector_namespace() {
1974 let event = {
1975 let mut event = LogEvent::from("input");
1976 event
1978 .metadata_mut()
1979 .value_mut()
1980 .insert("vector", BTreeMap::new());
1981 Event::from(event)
1982 };
1983
1984 let conf = RemapConfig {
1985 source: Some(
1986 r". = [null]
1987"
1988 .to_string(),
1989 ),
1990 file: None,
1991 drop_on_error: true,
1992 drop_on_abort: false,
1993 ..Default::default()
1994 };
1995 let mut tform = remap(conf.clone()).unwrap();
1996 let result = transform_one(&mut tform, event).unwrap();
1997
1998 assert_eq!(result.as_log().get("."), Some(&Value::Null));
2000
2001 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
2002 let outputs1 = conf.outputs(
2003 enrichment_tables,
2004 &[(
2005 "in".into(),
2006 schema::Definition::new_with_default_metadata(
2007 Kind::any_object(),
2008 [LogNamespace::Vector],
2009 ),
2010 )],
2011 LogNamespace::Vector,
2012 );
2013
2014 let wanted =
2015 schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
2016
2017 assert_eq!(
2018 HashMap::from([(OutputId::from("in"), wanted)]),
2019 outputs1[0].schema_definitions(true),
2020 );
2021 }
2022
2023 fn assert_no_metrics(source: String) {
2024 vector_lib::metrics::init_test();
2025
2026 let config = RemapConfig {
2027 source: Some(source),
2028 drop_on_error: true,
2029 drop_on_abort: true,
2030 reroute_dropped: true,
2031 ..Default::default()
2032 };
2033 let mut ast_runner = remap(config).unwrap();
2034 let input_event =
2035 Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2036 let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2037 let dropped_log = dropped_event.as_log();
2038 assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2039
2040 let controller = Controller::get().expect("no controller");
2041 let metrics = controller
2042 .capture_metrics()
2043 .into_iter()
2044 .map(|metric| (metric.name().to_string(), metric))
2045 .collect::<BTreeMap<String, Metric>>();
2046 assert_eq!(metrics.get("component_discarded_events_total"), None);
2047 assert_eq!(metrics.get("component_errors_total"), None);
2048 }
2049 #[test]
2050 fn do_not_emit_metrics_when_dropped() {
2051 assert_no_metrics("abort".to_string());
2052 }
2053
2054 #[test]
2055 fn do_not_emit_metrics_when_errored() {
2056 assert_no_metrics("parse_key_value!(.message)".to_string());
2057 }
2058}