1use std::{
2 collections::{BTreeMap, HashMap},
3 fs::File,
4 io::{self, Read},
5 path::PathBuf,
6 sync::Mutex,
7};
8
9use snafu::{ResultExt, Snafu};
10use vector_lib::{
11 TimeZone,
12 codecs::MetricTagValues,
13 compile_vrl,
14 config::LogNamespace,
15 configurable::configurable_component,
16 enrichment::TableRegistry,
17 lookup::{PathPrefix, metadata_path, owned_value_path},
18 schema::Definition,
19};
20use vector_vrl_functions::set_semantic_meaning::MeaningList;
21use vrl::{
22 compiler::{
23 CompileConfig, ExpressionError, Program, TypeState, VrlRuntime,
24 runtime::{Runtime, Terminate},
25 state::ExternalEnv,
26 },
27 diagnostic::{DiagnosticMessage, Formatter, Note},
28 path,
29 path::ValuePath,
30 value::{Kind, Value},
31};
32
33use crate::{
34 Result,
35 config::{
36 ComponentKey, DataType, Input, OutputId, TransformConfig, TransformContext,
37 TransformOutput, log_schema,
38 },
39 event::{Event, TargetEvents, VrlTarget},
40 internal_events::{RemapMappingAbort, RemapMappingError},
41 schema,
42 transforms::{SyncTransform, Transform, TransformOutputsBuf},
43};
44
45const DROPPED: &str = "dropped";
46type CacheKey = (TableRegistry, schema::Definition);
47type CacheValue = (Program, String, MeaningList);
48
49#[configurable_component(transform(
51 "remap",
52 "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
53))]
54#[derive(Derivative)]
55#[serde(deny_unknown_fields)]
56#[derivative(Default, Debug)]
57pub struct RemapConfig {
58 #[configurable(metadata(
64 docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
65 docs::syntax_override = "remap_program"
66 ))]
67 pub source: Option<String>,
68
69 #[configurable(metadata(docs::examples = "./my/program.vrl"))]
77 pub file: Option<PathBuf>,
78
79 #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
87 pub files: Option<Vec<PathBuf>>,
88
89 #[serde(default)]
96 pub metric_tag_values: MetricTagValues,
97
98 #[serde(default)]
107 #[configurable(metadata(docs::advanced))]
108 pub timezone: Option<TimeZone>,
109
110 #[serde(default = "crate::serde::default_false")]
121 #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
122 pub drop_on_error: bool,
123
124 #[serde(default = "crate::serde::default_true")]
135 #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
136 pub drop_on_abort: bool,
137
138 #[serde(default = "crate::serde::default_false")]
148 #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
149 pub reroute_dropped: bool,
150
151 #[configurable(derived, metadata(docs::hidden))]
152 #[serde(default)]
153 pub runtime: VrlRuntime,
154
155 #[configurable(derived, metadata(docs::hidden))]
156 #[serde(skip)]
157 #[derivative(Debug = "ignore")]
158 pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
162}
163
164impl Clone for RemapConfig {
165 fn clone(&self) -> Self {
166 Self {
167 source: self.source.clone(),
168 file: self.file.clone(),
169 files: self.files.clone(),
170 metric_tag_values: self.metric_tag_values,
171 timezone: self.timezone,
172 drop_on_error: self.drop_on_error,
173 drop_on_abort: self.drop_on_abort,
174 reroute_dropped: self.reroute_dropped,
175 runtime: self.runtime,
176 cache: Mutex::new(Default::default()),
177 }
178 }
179}
180
181impl RemapConfig {
182 fn compile_vrl_program(
183 &self,
184 enrichment_tables: TableRegistry,
185 merged_schema_definition: schema::Definition,
186 ) -> Result<(Program, String, MeaningList)> {
187 if let Some((_, res)) = self
188 .cache
189 .lock()
190 .expect("Data poisoned")
191 .iter()
192 .find(|v| v.0.0 == enrichment_tables && v.0.1 == merged_schema_definition)
193 {
194 return res.clone().map_err(Into::into);
195 }
196
197 let source = match (&self.source, &self.file, &self.files) {
198 (Some(source), None, None) => source.to_owned(),
199 (None, Some(path), None) => Self::read_file(path)?,
200 (None, None, Some(paths)) => {
201 let mut combined_source = String::new();
202 for path in paths {
203 let content = Self::read_file(path)?;
204 combined_source.push_str(&content);
205 combined_source.push('\n');
206 }
207 combined_source
208 }
209 _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
210 };
211
212 let mut functions = vrl::stdlib::all();
213 functions.append(&mut vector_lib::enrichment::vrl_functions());
214 #[cfg(feature = "sources-dnstap")]
215 functions.append(&mut dnstap_parser::vrl_functions());
216 functions.append(&mut vector_vrl_functions::all());
217
218 let state = TypeState {
219 local: Default::default(),
220 external: ExternalEnv::new_with_kind(
221 merged_schema_definition.event_kind().clone(),
222 merged_schema_definition.metadata_kind().clone(),
223 ),
224 };
225 let mut config = CompileConfig::default();
226
227 config.set_custom(enrichment_tables.clone());
228 config.set_custom(MeaningList::default());
229
230 let res = compile_vrl(&source, &functions, &state, config)
231 .map_err(|diagnostics| Formatter::new(&source, diagnostics).colored().to_string())
232 .map(|result| {
233 (
234 result.program,
235 Formatter::new(&source, result.warnings).to_string(),
236 result.config.get_custom::<MeaningList>().unwrap().clone(),
237 )
238 });
239
240 self.cache
241 .lock()
242 .expect("Data poisoned")
243 .push(((enrichment_tables, merged_schema_definition), res.clone()));
244
245 res.map_err(Into::into)
246 }
247
248 fn read_file(path: &PathBuf) -> Result<String> {
249 let mut buffer = String::new();
250 File::open(path)
251 .with_context(|_| FileOpenFailedSnafu { path })?
252 .read_to_string(&mut buffer)
253 .with_context(|_| FileReadFailedSnafu { path })?;
254 Ok(buffer)
255 }
256}
257
258impl_generate_config_from_default!(RemapConfig);
259
260#[async_trait::async_trait]
261#[typetag::serde(name = "remap")]
262impl TransformConfig for RemapConfig {
263 async fn build(&self, context: &TransformContext) -> Result<Transform> {
264 let (transform, warnings) = match self.runtime {
265 VrlRuntime::Ast => {
266 let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
267 (Transform::synchronous(remap), warnings)
268 }
269 };
270
271 if !warnings.is_empty() {
276 warn!(message = "VRL compilation warning.", %warnings);
277 }
278
279 Ok(transform)
280 }
281
282 fn input(&self) -> Input {
283 Input::all()
284 }
285
286 fn outputs(
287 &self,
288 enrichment_tables: vector_lib::enrichment::TableRegistry,
289 input_definitions: &[(OutputId, schema::Definition)],
290 _: LogNamespace,
291 ) -> Vec<TransformOutput> {
292 let merged_definition: Definition = input_definitions
293 .iter()
294 .map(|(_output, definition)| definition.clone())
295 .reduce(Definition::merge)
296 .unwrap_or_else(Definition::any);
297
298 let compiled = self
302 .compile_vrl_program(enrichment_tables, merged_definition)
303 .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
304 .map_err(|_| ());
305
306 let mut dropped_definitions = HashMap::new();
307 let mut default_definitions = HashMap::new();
308
309 for (output_id, input_definition) in input_definitions {
310 let default_definition = compiled
311 .clone()
312 .map(|(state, meaning)| {
313 let mut new_type_def = Definition::new(
314 state.external.target_kind().clone(),
315 state.external.metadata_kind().clone(),
316 input_definition.log_namespaces().clone(),
317 );
318
319 for (id, path) in input_definition.meanings() {
320 let _ = new_type_def.try_with_meaning(path.clone(), id);
324 }
325
326 for (id, path) in meaning {
328 new_type_def = new_type_def.with_meaning(path, &id);
330 }
331 new_type_def
332 })
333 .unwrap_or_else(|_| {
334 Definition::new_with_default_metadata(
335 Kind::never(),
337 input_definition.log_namespaces().clone(),
338 )
339 });
340
341 let dropped_definition = Definition::combine_log_namespaces(
344 input_definition.log_namespaces(),
345 input_definition.clone().with_event_field(
346 log_schema().metadata_key().expect("valid metadata key"),
347 Kind::object(BTreeMap::from([
348 ("reason".into(), Kind::bytes()),
349 ("message".into(), Kind::bytes()),
350 ("component_id".into(), Kind::bytes()),
351 ("component_type".into(), Kind::bytes()),
352 ("component_kind".into(), Kind::bytes()),
353 ])),
354 Some("metadata"),
355 ),
356 input_definition
357 .clone()
358 .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
359 .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
360 .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
361 .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
362 .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
363 );
364
365 default_definitions.insert(
366 output_id.clone(),
367 VrlTarget::modify_schema_definition_for_into_events(default_definition),
368 );
369 dropped_definitions.insert(
370 output_id.clone(),
371 VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
372 );
373 }
374
375 let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
376
377 if self.reroute_dropped {
378 vec![
379 default_output,
380 TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
381 ]
382 } else {
383 vec![default_output]
384 }
385 }
386
387 fn enable_concurrency(&self) -> bool {
388 true
389 }
390
391 fn files_to_watch(&self) -> Vec<&PathBuf> {
392 self.file
393 .iter()
394 .chain(self.files.iter().flatten())
395 .collect()
396 }
397}
398
399#[derive(Debug, Clone)]
400pub struct Remap<Runner>
401where
402 Runner: VrlRunner,
403{
404 component_key: Option<ComponentKey>,
405 program: Program,
406 timezone: TimeZone,
407 drop_on_error: bool,
408 drop_on_abort: bool,
409 reroute_dropped: bool,
410 runner: Runner,
411 metric_tag_values: MetricTagValues,
412}
413
414pub trait VrlRunner {
415 fn run(
416 &mut self,
417 target: &mut VrlTarget,
418 program: &Program,
419 timezone: &TimeZone,
420 ) -> std::result::Result<Value, Terminate>;
421}
422
423#[derive(Debug)]
424pub struct AstRunner {
425 pub runtime: Runtime,
426}
427
428impl Clone for AstRunner {
429 fn clone(&self) -> Self {
430 Self {
431 runtime: Runtime::default(),
432 }
433 }
434}
435
436impl VrlRunner for AstRunner {
437 fn run(
438 &mut self,
439 target: &mut VrlTarget,
440 program: &Program,
441 timezone: &TimeZone,
442 ) -> std::result::Result<Value, Terminate> {
443 let result = self.runtime.resolve(target, program, timezone);
444 self.runtime.clear();
445 result
446 }
447}
448
449impl Remap<AstRunner> {
450 pub fn new_ast(
451 config: RemapConfig,
452 context: &TransformContext,
453 ) -> crate::Result<(Self, String)> {
454 let (program, warnings, _) = config.compile_vrl_program(
455 context.enrichment_tables.clone(),
456 context.merged_schema_definition.clone(),
457 )?;
458
459 let runtime = Runtime::default();
460 let runner = AstRunner { runtime };
461
462 Self::new(config, context, program, runner).map(|remap| (remap, warnings))
463 }
464}
465
466impl<Runner> Remap<Runner>
467where
468 Runner: VrlRunner,
469{
470 fn new(
471 config: RemapConfig,
472 context: &TransformContext,
473 program: Program,
474 runner: Runner,
475 ) -> crate::Result<Self> {
476 Ok(Remap {
477 component_key: context.key.clone(),
478 program,
479 timezone: config
480 .timezone
481 .unwrap_or_else(|| context.globals.timezone()),
482 drop_on_error: config.drop_on_error,
483 drop_on_abort: config.drop_on_abort,
484 reroute_dropped: config.reroute_dropped,
485 runner,
486 metric_tag_values: config.metric_tag_values,
487 })
488 }
489
490 #[cfg(test)]
491 const fn runner(&self) -> &Runner {
492 &self.runner
493 }
494
495 fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
496 let message = error
497 .notes()
498 .iter()
499 .filter(|note| matches!(note, Note::UserErrorMessage(_)))
500 .next_back()
501 .map(|note| note.to_string())
502 .unwrap_or_else(|| error.to_string());
503 serde_json::json!({
504 "reason": reason,
505 "message": message,
506 "component_id": self.component_key,
507 "component_type": "remap",
508 "component_kind": "transform",
509 })
510 }
511
512 fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
513 match event {
514 Event::Log(log) => match log.namespace() {
515 LogNamespace::Legacy => {
516 if let Some(metadata_key) = log_schema().metadata_key() {
517 log.insert(
518 (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
519 self.dropped_data(reason, error),
520 );
521 }
522 }
523 LogNamespace::Vector => {
524 log.insert(
525 metadata_path!("vector", "dropped"),
526 self.dropped_data(reason, error),
527 );
528 }
529 },
530 Event::Metric(metric) => {
531 if let Some(metadata_key) = log_schema().metadata_key() {
532 metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
533 metric.replace_tag(
534 format!("{metadata_key}.dropped.component_id"),
535 self.component_key
536 .as_ref()
537 .map(ToString::to_string)
538 .unwrap_or_default(),
539 );
540 metric.replace_tag(
541 format!("{metadata_key}.dropped.component_type"),
542 "remap".into(),
543 );
544 metric.replace_tag(
545 format!("{metadata_key}.dropped.component_kind"),
546 "transform".into(),
547 );
548 }
549 }
550 Event::Trace(trace) => {
551 trace.maybe_insert(log_schema().metadata_key_target_path(), || {
552 self.dropped_data(reason, error).into()
553 });
554 }
555 }
556 }
557
558 fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
559 self.runner.run(target, &self.program, &self.timezone)
560 }
561}
562
563impl<Runner> SyncTransform for Remap<Runner>
564where
565 Runner: VrlRunner + Clone + Send + Sync,
566{
567 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
568 let forward_on_error = !self.drop_on_error || self.reroute_dropped;
579 let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
580 let original_event = if (self.program.info().fallible && forward_on_error)
581 || (self.program.info().abortable && forward_on_abort)
582 {
583 Some(event.clone())
584 } else {
585 None
586 };
587
588 let log_namespace = event
589 .maybe_as_log()
590 .map(|log| log.namespace())
591 .unwrap_or(LogNamespace::Legacy);
592
593 let mut target = VrlTarget::new(
594 event,
595 self.program.info(),
596 match self.metric_tag_values {
597 MetricTagValues::Single => false,
598 MetricTagValues::Full => true,
599 },
600 );
601 let result = self.run_vrl(&mut target);
602
603 match result {
604 Ok(_) => match target.into_events(log_namespace) {
605 TargetEvents::One(event) => push_default(event, output),
606 TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
607 TargetEvents::Traces(events) => {
608 events.for_each(|event| push_default(event, output))
609 }
610 },
611 Err(reason) => {
612 let (reason, error, drop) = match reason {
613 Terminate::Abort(error) => {
614 if !self.reroute_dropped {
615 emit!(RemapMappingAbort {
616 event_dropped: self.drop_on_abort,
617 });
618 }
619 ("abort", error, self.drop_on_abort)
620 }
621 Terminate::Error(error) => {
622 if !self.reroute_dropped {
623 emit!(RemapMappingError {
624 error: error.to_string(),
625 event_dropped: self.drop_on_error,
626 });
627 }
628 ("error", error, self.drop_on_error)
629 }
630 };
631
632 if !drop {
633 let event = original_event.expect("event will be set");
634
635 push_default(event, output);
636 } else if self.reroute_dropped {
637 let mut event = original_event.expect("event will be set");
638
639 self.annotate_dropped(&mut event, reason, error);
640 push_dropped(event, output);
641 }
642 }
643 }
644 }
645}
646
647#[inline]
648fn push_default(event: Event, output: &mut TransformOutputsBuf) {
649 output.push(None, event)
650}
651
652#[inline]
653fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
654 output.push(Some(DROPPED), event);
655}
656
657#[derive(Debug, Snafu)]
658pub enum BuildError {
659 #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
660 SourceAndOrFileOrFiles,
661
662 #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
663 FileOpenFailed { path: PathBuf, source: io::Error },
664 #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
665 FileReadFailed { path: PathBuf, source: io::Error },
666}
667
668#[cfg(test)]
669mod tests {
670 use std::{
671 collections::{HashMap, HashSet},
672 sync::Arc,
673 };
674
675 use chrono::DateTime;
676 use indoc::{formatdoc, indoc};
677 use tokio::sync::mpsc;
678 use tokio_stream::wrappers::ReceiverStream;
679 use vector_lib::{
680 config::GlobalOptions, enrichment::TableRegistry, event::EventMetadata, metric_tags,
681 };
682 use vrl::{btreemap, event_path, value::kind::Collection};
683
684 use super::*;
685 use crate::{
686 config::{ConfigBuilder, build_unit_tests},
687 event::{
688 LogEvent, Metric, Value,
689 metric::{MetricKind, MetricValue},
690 },
691 metrics::Controller,
692 schema,
693 test_util::components::{
694 COMPONENT_MULTIPLE_OUTPUTS_TESTS, assert_transform_compliance, init_test,
695 },
696 transforms::{OutputBuffer, test::create_topology},
697 };
698
699 fn test_default_schema_definition() -> schema::Definition {
700 schema::Definition::empty_legacy_namespace().with_event_field(
701 &owned_value_path!("a default field"),
702 Kind::integer().or_bytes(),
703 Some("default"),
704 )
705 }
706
707 fn test_dropped_schema_definition() -> schema::Definition {
708 schema::Definition::empty_legacy_namespace().with_event_field(
709 &owned_value_path!("a dropped field"),
710 Kind::boolean().or_null(),
711 Some("dropped"),
712 )
713 }
714
715 fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
716 let schema_definitions = HashMap::from([
717 (
718 None,
719 [("source".into(), test_default_schema_definition())].into(),
720 ),
721 (
722 Some(DROPPED.to_owned()),
723 [("source".into(), test_dropped_schema_definition())].into(),
724 ),
725 ]);
726
727 Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
728 .map(|(remap, _)| remap)
729 }
730
731 #[test]
732 fn generate_config() {
733 crate::test_util::test_generate_config::<RemapConfig>();
734 }
735
736 #[test]
737 fn config_missing_source_and_file() {
738 let config = RemapConfig {
739 source: None,
740 file: None,
741 ..Default::default()
742 };
743
744 let err = remap(config).unwrap_err().to_string();
745 assert_eq!(
746 &err,
747 "must provide exactly one of `source` or `file` or `files` configuration"
748 )
749 }
750
751 #[test]
752 fn config_both_source_and_file() {
753 let config = RemapConfig {
754 source: Some("".to_owned()),
755 file: Some("".into()),
756 ..Default::default()
757 };
758
759 let err = remap(config).unwrap_err().to_string();
760 assert_eq!(
761 &err,
762 "must provide exactly one of `source` or `file` or `files` configuration"
763 )
764 }
765
766 fn get_field_string(event: &Event, field: &str) -> String {
767 event
768 .as_log()
769 .get(field)
770 .unwrap()
771 .to_string_lossy()
772 .into_owned()
773 }
774
775 #[test]
776 fn check_remap_doesnt_share_state_between_events() {
777 let conf = RemapConfig {
778 source: Some(".foo = .sentinel".to_string()),
779 file: None,
780 drop_on_error: true,
781 drop_on_abort: false,
782 ..Default::default()
783 };
784 let mut tform = remap(conf).unwrap();
785 assert!(tform.runner().runtime.is_empty());
786
787 let event1 = {
788 let mut event1 = LogEvent::from("event1");
789 event1.insert("sentinel", "bar");
790 Event::from(event1)
791 };
792 let result1 = transform_one(&mut tform, event1).unwrap();
793 assert_eq!(get_field_string(&result1, "message"), "event1");
794 assert_eq!(get_field_string(&result1, "foo"), "bar");
795 assert!(tform.runner().runtime.is_empty());
796
797 let event2 = {
798 let event2 = LogEvent::from("event2");
799 Event::from(event2)
800 };
801 let result2 = transform_one(&mut tform, event2).unwrap();
802 assert_eq!(get_field_string(&result2, "message"), "event2");
803 assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
804 assert!(tform.runner().runtime.is_empty());
805 }
806
807 #[test]
808 fn remap_return_raw_string_vector_namespace() {
809 let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
810
811 let event = {
812 let mut metadata = EventMetadata::default()
813 .with_schema_definition(&Arc::new(initial_definition.clone()));
814 metadata
816 .value_mut()
817 .insert(&owned_value_path!("vector"), BTreeMap::new());
818
819 let mut event = LogEvent::new_with_metadata(metadata);
820 event.insert("copy_from", "buz");
821 Event::from(event)
822 };
823
824 let conf = RemapConfig {
825 source: Some(r#" . = "root string";"#.to_string()),
826 file: None,
827 drop_on_error: true,
828 drop_on_abort: false,
829 ..Default::default()
830 };
831 let mut tform = remap(conf.clone()).unwrap();
832 let result = transform_one(&mut tform, event).unwrap();
833 assert_eq!(get_field_string(&result, "."), "root string");
834
835 let mut outputs = conf.outputs(
836 TableRegistry::default(),
837 &[(OutputId::dummy(), initial_definition)],
838 LogNamespace::Vector,
839 );
840
841 assert_eq!(outputs.len(), 1);
842 let output = outputs.pop().unwrap();
843 assert_eq!(output.port, None);
844 let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
845 let expected_schema =
846 Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
847 assert_eq!(actual_schema_def, expected_schema);
848 }
849
850 #[test]
851 fn check_remap_adds() {
852 let event = {
853 let mut event = LogEvent::from("augment me");
854 event.insert("copy_from", "buz");
855 Event::from(event)
856 };
857
858 let conf = RemapConfig {
859 source: Some(
860 r#" .foo = "bar"
861 .bar = "baz"
862 .copy = .copy_from
863"#
864 .to_string(),
865 ),
866 file: None,
867 drop_on_error: true,
868 drop_on_abort: false,
869 ..Default::default()
870 };
871 let mut tform = remap(conf).unwrap();
872 let result = transform_one(&mut tform, event).unwrap();
873 assert_eq!(get_field_string(&result, "message"), "augment me");
874 assert_eq!(get_field_string(&result, "copy_from"), "buz");
875 assert_eq!(get_field_string(&result, "foo"), "bar");
876 assert_eq!(get_field_string(&result, "bar"), "baz");
877 assert_eq!(get_field_string(&result, "copy"), "buz");
878 }
879
880 #[test]
881 fn check_remap_emits_multiple() {
882 let event = {
883 let mut event = LogEvent::from("augment me");
884 event.insert(
885 "events",
886 vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
887 );
888 Event::from(event)
889 };
890
891 let conf = RemapConfig {
892 source: Some(
893 indoc! {r"
894 . = .events
895 "}
896 .to_owned(),
897 ),
898 file: None,
899 drop_on_error: true,
900 drop_on_abort: false,
901 ..Default::default()
902 };
903 let mut tform = remap(conf).unwrap();
904
905 let out = collect_outputs(&mut tform, event);
906 assert_eq!(2, out.primary.len());
907 let mut result = out.primary.into_events();
908
909 let r = result.next().unwrap();
910 assert_eq!(get_field_string(&r, "message"), "foo");
911 let r = result.next().unwrap();
912 assert_eq!(get_field_string(&r, "message"), "bar");
913 }
914
915 #[test]
916 fn check_remap_error() {
917 let event = {
918 let mut event = Event::Log(LogEvent::from("augment me"));
919 event.as_mut_log().insert("bar", "is a string");
920 event
921 };
922
923 let conf = RemapConfig {
924 source: Some(formatdoc! {r#"
925 .foo = "foo"
926 .not_an_int = int!(.bar)
927 .baz = 12
928 "#}),
929 file: None,
930 drop_on_error: false,
931 drop_on_abort: false,
932 ..Default::default()
933 };
934 let mut tform = remap(conf).unwrap();
935
936 let event = transform_one(&mut tform, event).unwrap();
937
938 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
939 assert!(event.as_log().get("foo").is_none());
940 assert!(event.as_log().get("baz").is_none());
941 }
942
943 #[test]
944 fn check_remap_error_drop() {
945 let event = {
946 let mut event = Event::Log(LogEvent::from("augment me"));
947 event.as_mut_log().insert("bar", "is a string");
948 event
949 };
950
951 let conf = RemapConfig {
952 source: Some(formatdoc! {r#"
953 .foo = "foo"
954 .not_an_int = int!(.bar)
955 .baz = 12
956 "#}),
957 file: None,
958 drop_on_error: true,
959 drop_on_abort: false,
960 ..Default::default()
961 };
962 let mut tform = remap(conf).unwrap();
963
964 assert!(transform_one(&mut tform, event).is_none())
965 }
966
967 #[test]
968 fn check_remap_error_infallible() {
969 let event = {
970 let mut event = Event::Log(LogEvent::from("augment me"));
971 event.as_mut_log().insert("bar", "is a string");
972 event
973 };
974
975 let conf = RemapConfig {
976 source: Some(formatdoc! {r#"
977 .foo = "foo"
978 .baz = 12
979 "#}),
980 file: None,
981 drop_on_error: false,
982 drop_on_abort: false,
983 ..Default::default()
984 };
985 let mut tform = remap(conf).unwrap();
986
987 let event = transform_one(&mut tform, event).unwrap();
988
989 assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
990 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
991 assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
992 }
993
994 #[test]
995 fn check_remap_abort() {
996 let event = {
997 let mut event = Event::Log(LogEvent::from("augment me"));
998 event.as_mut_log().insert("bar", "is a string");
999 event
1000 };
1001
1002 let conf = RemapConfig {
1003 source: Some(formatdoc! {r#"
1004 .foo = "foo"
1005 abort
1006 .baz = 12
1007 "#}),
1008 file: None,
1009 drop_on_error: false,
1010 drop_on_abort: false,
1011 ..Default::default()
1012 };
1013 let mut tform = remap(conf).unwrap();
1014
1015 let event = transform_one(&mut tform, event).unwrap();
1016
1017 assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1018 assert!(event.as_log().get("foo").is_none());
1019 assert!(event.as_log().get("baz").is_none());
1020 }
1021
1022 #[test]
1023 fn check_remap_abort_drop() {
1024 let event = {
1025 let mut event = Event::Log(LogEvent::from("augment me"));
1026 event.as_mut_log().insert("bar", "is a string");
1027 event
1028 };
1029
1030 let conf = RemapConfig {
1031 source: Some(formatdoc! {r#"
1032 .foo = "foo"
1033 abort
1034 .baz = 12
1035 "#}),
1036 file: None,
1037 drop_on_error: false,
1038 drop_on_abort: true,
1039 ..Default::default()
1040 };
1041 let mut tform = remap(conf).unwrap();
1042
1043 assert!(transform_one(&mut tform, event).is_none())
1044 }
1045
1046 #[test]
1047 fn check_remap_metric() {
1048 let metric = Event::Metric(Metric::new(
1049 "counter",
1050 MetricKind::Absolute,
1051 MetricValue::Counter { value: 1.0 },
1052 ));
1053 let metadata = metric.metadata().clone();
1054
1055 let conf = RemapConfig {
1056 source: Some(
1057 r#".tags.host = "zoobub"
1058 .name = "zork"
1059 .namespace = "zerk"
1060 .kind = "incremental""#
1061 .to_string(),
1062 ),
1063 file: None,
1064 drop_on_error: true,
1065 drop_on_abort: false,
1066 ..Default::default()
1067 };
1068 let mut tform = remap(conf).unwrap();
1069
1070 let result = transform_one(&mut tform, metric).unwrap();
1071 assert_eq!(
1072 result,
1073 Event::Metric(
1074 Metric::new_with_metadata(
1075 "zork",
1076 MetricKind::Incremental,
1077 MetricValue::Counter { value: 1.0 },
1078 metadata
1081 )
1082 .with_namespace(Some("zerk"))
1083 .with_tags(Some(metric_tags! {
1084 "host" => "zoobub",
1085 }))
1086 )
1087 );
1088 }
1089
1090 #[test]
1091 fn remap_timezone_fallback() {
1092 let error = Event::from_json_value(
1093 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1094 LogNamespace::Legacy,
1095 )
1096 .unwrap();
1097 let conf = RemapConfig {
1098 source: Some(formatdoc! {r#"
1099 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1100 "#}),
1101 drop_on_error: true,
1102 drop_on_abort: true,
1103 reroute_dropped: true,
1104 ..Default::default()
1105 };
1106 let context = TransformContext {
1107 key: Some(ComponentKey::from("remapper")),
1108 globals: GlobalOptions {
1109 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1110 ..Default::default()
1111 },
1112 ..Default::default()
1113 };
1114 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1115
1116 let output = transform_one_fallible(&mut tform, error).unwrap();
1117 let log = output.as_log();
1118 assert_eq!(
1119 log["timestamp"],
1120 DateTime::<chrono::Utc>::from(
1121 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1122 )
1123 .into()
1124 );
1125 }
1126
1127 #[test]
1128 fn remap_timezone_override() {
1129 let error = Event::from_json_value(
1130 serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1131 LogNamespace::Legacy,
1132 )
1133 .unwrap();
1134 let conf = RemapConfig {
1135 source: Some(formatdoc! {r#"
1136 .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1137 "#}),
1138 drop_on_error: true,
1139 drop_on_abort: true,
1140 reroute_dropped: true,
1141 timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1142 ..Default::default()
1143 };
1144 let context = TransformContext {
1145 key: Some(ComponentKey::from("remapper")),
1146 globals: GlobalOptions {
1147 timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1148 ..Default::default()
1149 },
1150 ..Default::default()
1151 };
1152 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1153
1154 let output = transform_one_fallible(&mut tform, error).unwrap();
1155 let log = output.as_log();
1156 assert_eq!(
1157 log["timestamp"],
1158 DateTime::<chrono::Utc>::from(
1159 DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1160 )
1161 .into()
1162 );
1163 }
1164
1165 #[test]
1166 fn check_remap_branching() {
1167 let happy =
1168 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1169 .unwrap();
1170 let abort = Event::from_json_value(
1171 serde_json::json!({"hello": "goodbye"}),
1172 LogNamespace::Legacy,
1173 )
1174 .unwrap();
1175 let error =
1176 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1177
1178 let happy_metric = {
1179 let mut metric = Metric::new(
1180 "counter",
1181 MetricKind::Absolute,
1182 MetricValue::Counter { value: 1.0 },
1183 );
1184 metric.replace_tag("hello".into(), "world".into());
1185 Event::Metric(metric)
1186 };
1187
1188 let abort_metric = {
1189 let mut metric = Metric::new(
1190 "counter",
1191 MetricKind::Absolute,
1192 MetricValue::Counter { value: 1.0 },
1193 );
1194 metric.replace_tag("hello".into(), "goodbye".into());
1195 Event::Metric(metric)
1196 };
1197
1198 let error_metric = {
1199 let mut metric = Metric::new(
1200 "counter",
1201 MetricKind::Absolute,
1202 MetricValue::Counter { value: 1.0 },
1203 );
1204 metric.replace_tag("not_hello".into(), "oops".into());
1205 Event::Metric(metric)
1206 };
1207
1208 let conf = RemapConfig {
1209 source: Some(formatdoc! {r#"
1210 if exists(.tags) {{
1211 # metrics
1212 .tags.foo = "bar"
1213 if string!(.tags.hello) == "goodbye" {{
1214 abort
1215 }}
1216 }} else {{
1217 # logs
1218 .foo = "bar"
1219 if string(.hello) == "goodbye" {{
1220 abort
1221 }}
1222 }}
1223 "#}),
1224 drop_on_error: true,
1225 drop_on_abort: true,
1226 reroute_dropped: true,
1227 ..Default::default()
1228 };
1229 let schema_definitions = HashMap::from([
1230 (
1231 None,
1232 [("source".into(), test_default_schema_definition())].into(),
1233 ),
1234 (
1235 Some(DROPPED.to_owned()),
1236 [("source".into(), test_dropped_schema_definition())].into(),
1237 ),
1238 ]);
1239 let context = TransformContext {
1240 key: Some(ComponentKey::from("remapper")),
1241 schema_definitions,
1242 merged_schema_definition: schema::Definition::new_with_default_metadata(
1243 Kind::any_object(),
1244 [LogNamespace::Legacy],
1245 )
1246 .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1247 ..Default::default()
1248 };
1249 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1250
1251 let output = transform_one_fallible(&mut tform, happy).unwrap();
1252 let log = output.as_log();
1253 assert_eq!(log["hello"], "world".into());
1254 assert_eq!(log["foo"], "bar".into());
1255 assert!(!log.contains(event_path!("metadata")));
1256
1257 let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1258 let log = output.as_log();
1259 assert_eq!(log["hello"], "goodbye".into());
1260 assert!(!log.contains(event_path!("foo")));
1261 assert_eq!(
1262 log["metadata"],
1263 serde_json::json!({
1264 "dropped": {
1265 "reason": "abort",
1266 "message": "aborted",
1267 "component_id": "remapper",
1268 "component_type": "remap",
1269 "component_kind": "transform",
1270 }
1271 })
1272 .try_into()
1273 .unwrap()
1274 );
1275
1276 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1277 let log = output.as_log();
1278 assert_eq!(log["hello"], 42.into());
1279 assert!(!log.contains(event_path!("foo")));
1280 assert_eq!(
1281 log["metadata"],
1282 serde_json::json!({
1283 "dropped": {
1284 "reason": "error",
1285 "message": "function call error for \"string\" at (160:174): expected string, got integer",
1286 "component_id": "remapper",
1287 "component_type": "remap",
1288 "component_kind": "transform",
1289 }
1290 })
1291 .try_into()
1292 .unwrap()
1293 );
1294
1295 let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1296 similar_asserts::assert_eq!(
1297 output,
1298 Event::Metric(
1299 Metric::new_with_metadata(
1300 "counter",
1301 MetricKind::Absolute,
1302 MetricValue::Counter { value: 1.0 },
1303 EventMetadata::default()
1306 .with_schema_definition(output.metadata().schema_definition()),
1307 )
1308 .with_tags(Some(metric_tags! {
1309 "hello" => "world",
1310 "foo" => "bar",
1311 }))
1312 )
1313 );
1314
1315 let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1316 similar_asserts::assert_eq!(
1317 output,
1318 Event::Metric(
1319 Metric::new_with_metadata(
1320 "counter",
1321 MetricKind::Absolute,
1322 MetricValue::Counter { value: 1.0 },
1323 EventMetadata::default()
1326 .with_schema_definition(output.metadata().schema_definition()),
1327 )
1328 .with_tags(Some(metric_tags! {
1329 "hello" => "goodbye",
1330 "metadata.dropped.reason" => "abort",
1331 "metadata.dropped.component_id" => "remapper",
1332 "metadata.dropped.component_type" => "remap",
1333 "metadata.dropped.component_kind" => "transform",
1334 }))
1335 )
1336 );
1337
1338 let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1339 similar_asserts::assert_eq!(
1340 output,
1341 Event::Metric(
1342 Metric::new_with_metadata(
1343 "counter",
1344 MetricKind::Absolute,
1345 MetricValue::Counter { value: 1.0 },
1346 EventMetadata::default()
1349 .with_schema_definition(output.metadata().schema_definition()),
1350 )
1351 .with_tags(Some(metric_tags! {
1352 "not_hello" => "oops",
1353 "metadata.dropped.reason" => "error",
1354 "metadata.dropped.component_id" => "remapper",
1355 "metadata.dropped.component_type" => "remap",
1356 "metadata.dropped.component_kind" => "transform",
1357 }))
1358 )
1359 );
1360 }
1361
1362 #[test]
1363 fn check_remap_branching_assert_with_message() {
1364 let error_trigger_assert_custom_message =
1365 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1366 let error_trigger_default_assert_message =
1367 Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1368 let conf = RemapConfig {
1369 source: Some(formatdoc! {r#"
1370 assert_eq!(.hello, 0, "custom message here")
1371 assert_eq!(.hello, 1)
1372 "#}),
1373 drop_on_error: true,
1374 drop_on_abort: true,
1375 reroute_dropped: true,
1376 ..Default::default()
1377 };
1378 let context = TransformContext {
1379 key: Some(ComponentKey::from("remapper")),
1380 ..Default::default()
1381 };
1382 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1383
1384 let output =
1385 transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1386 let log = output.as_log();
1387 assert_eq!(log["hello"], 42.into());
1388 assert!(!log.contains(event_path!("foo")));
1389 assert_eq!(
1390 log["metadata"],
1391 serde_json::json!({
1392 "dropped": {
1393 "reason": "error",
1394 "message": "custom message here",
1395 "component_id": "remapper",
1396 "component_type": "remap",
1397 "component_kind": "transform",
1398 }
1399 })
1400 .try_into()
1401 .unwrap()
1402 );
1403
1404 let output =
1405 transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1406 let log = output.as_log();
1407 assert_eq!(log["hello"], 0.into());
1408 assert!(!log.contains(event_path!("foo")));
1409 assert_eq!(
1410 log["metadata"],
1411 serde_json::json!({
1412 "dropped": {
1413 "reason": "error",
1414 "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1415 "component_id": "remapper",
1416 "component_type": "remap",
1417 "component_kind": "transform",
1418 }
1419 })
1420 .try_into()
1421 .unwrap()
1422 );
1423 }
1424
1425 #[test]
1426 fn check_remap_branching_abort_with_message() {
1427 let error =
1428 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1429 let conf = RemapConfig {
1430 source: Some(formatdoc! {r#"
1431 abort "custom message here"
1432 "#}),
1433 drop_on_error: true,
1434 drop_on_abort: true,
1435 reroute_dropped: true,
1436 ..Default::default()
1437 };
1438 let context = TransformContext {
1439 key: Some(ComponentKey::from("remapper")),
1440 ..Default::default()
1441 };
1442 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1443
1444 let output = transform_one_fallible(&mut tform, error).unwrap_err();
1445 let log = output.as_log();
1446 assert_eq!(log["hello"], 42.into());
1447 assert!(!log.contains(event_path!("foo")));
1448 assert_eq!(
1449 log["metadata"],
1450 serde_json::json!({
1451 "dropped": {
1452 "reason": "abort",
1453 "message": "custom message here",
1454 "component_id": "remapper",
1455 "component_type": "remap",
1456 "component_kind": "transform",
1457 }
1458 })
1459 .try_into()
1460 .unwrap()
1461 );
1462 }
1463
1464 #[test]
1465 fn check_remap_branching_disabled() {
1466 let happy =
1467 Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1468 .unwrap();
1469 let abort = Event::from_json_value(
1470 serde_json::json!({"hello": "goodbye"}),
1471 LogNamespace::Legacy,
1472 )
1473 .unwrap();
1474 let error =
1475 Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1476
1477 let conf = RemapConfig {
1478 source: Some(formatdoc! {r#"
1479 if exists(.tags) {{
1480 # metrics
1481 .tags.foo = "bar"
1482 if string!(.tags.hello) == "goodbye" {{
1483 abort
1484 }}
1485 }} else {{
1486 # logs
1487 .foo = "bar"
1488 if string!(.hello) == "goodbye" {{
1489 abort
1490 }}
1491 }}
1492 "#}),
1493 drop_on_error: true,
1494 drop_on_abort: true,
1495 reroute_dropped: false,
1496 ..Default::default()
1497 };
1498
1499 let schema_definition = schema::Definition::new_with_default_metadata(
1500 Kind::any_object(),
1501 [LogNamespace::Legacy],
1502 )
1503 .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1504 .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1505
1506 assert_eq!(
1507 conf.outputs(
1508 vector_lib::enrichment::TableRegistry::default(),
1509 &[(
1510 "test".into(),
1511 schema::Definition::new_with_default_metadata(
1512 Kind::any_object(),
1513 [LogNamespace::Legacy]
1514 )
1515 )],
1516 LogNamespace::Legacy
1517 ),
1518 vec![TransformOutput::new(
1519 DataType::all_bits(),
1520 [("test".into(), schema_definition)].into()
1521 )]
1522 );
1523
1524 let context = TransformContext {
1525 key: Some(ComponentKey::from("remapper")),
1526 ..Default::default()
1527 };
1528 let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1529
1530 let output = transform_one_fallible(&mut tform, happy).unwrap();
1531 let log = output.as_log();
1532 assert_eq!(log["hello"], "world".into());
1533 assert_eq!(log["foo"], "bar".into());
1534 assert!(!log.contains(event_path!("metadata")));
1535
1536 let out = collect_outputs(&mut tform, abort);
1537 assert!(out.primary.is_empty());
1538 assert!(out.named[DROPPED].is_empty());
1539
1540 let out = collect_outputs(&mut tform, error);
1541 assert!(out.primary.is_empty());
1542 assert!(out.named[DROPPED].is_empty());
1543 }
1544
1545 #[tokio::test]
1546 async fn check_remap_branching_metrics_with_output() {
1547 init_test();
1548
1549 let config: ConfigBuilder = toml::from_str(indoc! {r#"
1550 [transforms.foo]
1551 inputs = []
1552 type = "remap"
1553 drop_on_abort = true
1554 reroute_dropped = true
1555 source = "abort"
1556
1557 [[tests]]
1558 name = "metric output"
1559
1560 [tests.input]
1561 insert_at = "foo"
1562 value = "none"
1563
1564 [[tests.outputs]]
1565 extract_from = "foo.dropped"
1566 [[tests.outputs.conditions]]
1567 type = "vrl"
1568 source = "true"
1569 "#})
1570 .unwrap();
1571
1572 let mut tests = build_unit_tests(config).await.unwrap();
1573 assert!(tests.remove(0).run().await.errors.is_empty());
1574 COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1576 }
1577
1578 struct CollectedOuput {
1579 primary: OutputBuffer,
1580 named: HashMap<String, OutputBuffer>,
1581 }
1582
1583 fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput {
1584 let mut outputs = TransformOutputsBuf::new_with_capacity(
1585 vec![
1586 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1587 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1588 ],
1589 1,
1590 );
1591
1592 ft.transform(event, &mut outputs);
1593
1594 CollectedOuput {
1595 primary: outputs.take_primary(),
1596 named: outputs.take_all_named(),
1597 }
1598 }
1599
1600 fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1601 let out = collect_outputs(ft, event);
1602 assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1603 assert!(out.primary.len() <= 1);
1604 out.primary.into_events().next()
1605 }
1606
1607 fn transform_one_fallible(
1608 ft: &mut dyn SyncTransform,
1609 event: Event,
1610 ) -> std::result::Result<Event, Event> {
1611 let mut outputs = TransformOutputsBuf::new_with_capacity(
1612 vec![
1613 TransformOutput::new(DataType::all_bits(), HashMap::new()),
1614 TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1615 ],
1616 1,
1617 );
1618
1619 ft.transform(event, &mut outputs);
1620
1621 let mut buf = outputs.drain().collect::<Vec<_>>();
1622 let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1623
1624 assert!(buf.len() < 2);
1625 assert!(err_buf.len() < 2);
1626 match (buf.pop(), err_buf.pop()) {
1627 (Some(good), None) => Ok(good),
1628 (None, Some(bad)) => Err(bad),
1629 (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1630 }
1631 }
1632
1633 #[tokio::test]
1634 async fn emits_internal_events() {
1635 assert_transform_compliance(async move {
1636 let config = RemapConfig {
1637 source: Some("abort".to_owned()),
1638 drop_on_abort: true,
1639 ..Default::default()
1640 };
1641
1642 let (tx, rx) = mpsc::channel(1);
1643 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1644
1645 let log = LogEvent::from("hello world");
1646 tx.send(log.into()).await.unwrap();
1647
1648 drop(tx);
1649 topology.stop().await;
1650 assert_eq!(out.recv().await, None);
1651 })
1652 .await
1653 }
1654
1655 #[test]
1656 fn test_combined_transforms_simple() {
1657 let transform1 = RemapConfig {
1662 source: Some(r#".thing = "potato""#.to_string()),
1663 ..Default::default()
1664 };
1665
1666 let transform2 = RemapConfig {
1667 source: Some(".thang = .thing".to_string()),
1668 ..Default::default()
1669 };
1670
1671 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1672
1673 let outputs1 = transform1.outputs(
1674 enrichment_tables.clone(),
1675 &[("in".into(), schema::Definition::default_legacy_namespace())],
1676 LogNamespace::Legacy,
1677 );
1678
1679 assert_eq!(
1680 vec![TransformOutput::new(
1681 DataType::all_bits(),
1682 [(
1684 "in".into(),
1685 Definition::default_legacy_namespace().with_event_field(
1686 &owned_value_path!("thing"),
1687 Kind::bytes(),
1688 None
1689 ),
1690 )]
1691 .into()
1692 )],
1693 outputs1
1694 );
1695
1696 let outputs2 = transform2.outputs(
1697 enrichment_tables,
1698 &[(
1699 "in1".into(),
1700 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1701 )],
1702 LogNamespace::Legacy,
1703 );
1704
1705 assert_eq!(
1706 vec![TransformOutput::new(
1707 DataType::all_bits(),
1708 [(
1709 "in1".into(),
1710 Definition::default_legacy_namespace()
1711 .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1712 .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1713 )]
1714 .into(),
1715 )],
1716 outputs2
1717 );
1718 }
1719
1720 #[test]
1721 fn test_combined_transforms_unnest() {
1722 let transform1 = RemapConfig {
1727 source: Some(
1728 indoc! {
1729 r#"
1730 .thing = [{"cabbage": 32}, {"parsnips": 45}]
1731 . = unnest(.thing)
1732 "#
1733 }
1734 .to_string(),
1735 ),
1736 ..Default::default()
1737 };
1738
1739 let transform2 = RemapConfig {
1740 source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1741 ..Default::default()
1742 };
1743
1744 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1745
1746 let outputs1 = transform1.outputs(
1747 enrichment_tables.clone(),
1748 &[(
1749 "in".into(),
1750 schema::Definition::new_with_default_metadata(
1751 Kind::any_object(),
1752 [LogNamespace::Legacy],
1753 ),
1754 )],
1755 LogNamespace::Legacy,
1756 );
1757
1758 assert_eq!(
1759 vec![TransformOutput::new(
1760 DataType::all_bits(),
1761 [(
1762 "in".into(),
1763 Definition::new_with_default_metadata(
1764 Kind::any_object(),
1765 [LogNamespace::Legacy]
1766 )
1767 .with_event_field(
1768 &owned_value_path!("thing"),
1769 Kind::object(Collection::from(BTreeMap::from([
1770 ("cabbage".into(), Kind::integer().or_undefined(),),
1771 ("parsnips".into(), Kind::integer().or_undefined(),)
1772 ]))),
1773 None
1774 ),
1775 )]
1776 .into(),
1777 )],
1778 outputs1
1779 );
1780
1781 let outputs2 = transform2.outputs(
1782 enrichment_tables,
1783 &[(
1784 "in1".into(),
1785 outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1786 )],
1787 LogNamespace::Legacy,
1788 );
1789
1790 assert_eq!(
1791 vec![TransformOutput::new(
1792 DataType::all_bits(),
1793 [(
1794 "in1".into(),
1795 Definition::default_legacy_namespace()
1796 .with_event_field(
1797 &owned_value_path!("thing"),
1798 Kind::object(Collection::from(BTreeMap::from([
1799 ("cabbage".into(), Kind::integer().or_undefined(),),
1800 ("parsnips".into(), Kind::integer().or_undefined(),)
1801 ]))),
1802 None
1803 )
1804 .with_event_field(
1805 &owned_value_path!("thang"),
1806 Kind::integer().or_null(),
1807 None
1808 ),
1809 )]
1810 .into(),
1811 )],
1812 outputs2
1813 );
1814 }
1815
1816 #[test]
1817 fn test_transform_abort() {
1818 let transform1 = RemapConfig {
1821 source: Some(r"abort".to_string()),
1822 ..Default::default()
1823 };
1824
1825 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1826
1827 let outputs1 = transform1.outputs(
1828 enrichment_tables,
1829 &[(
1830 "in".into(),
1831 schema::Definition::new_with_default_metadata(
1832 Kind::any_object(),
1833 [LogNamespace::Legacy],
1834 ),
1835 )],
1836 LogNamespace::Legacy,
1837 );
1838
1839 assert_eq!(
1840 vec![TransformOutput::new(
1841 DataType::all_bits(),
1842 [(
1843 "in".into(),
1844 Definition::new_with_default_metadata(
1845 Kind::any_object(),
1846 [LogNamespace::Legacy]
1847 ),
1848 )]
1849 .into(),
1850 )],
1851 outputs1
1852 );
1853 }
1854
1855 #[test]
1856 fn test_error_outputs() {
1857 let transform1 = RemapConfig {
1862 source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1864 reroute_dropped: true,
1865 ..Default::default()
1866 };
1867
1868 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1869
1870 let outputs1 = transform1.outputs(
1871 enrichment_tables,
1872 &[(
1873 "in".into(),
1874 schema::Definition::new_with_default_metadata(
1875 Kind::any_object(),
1876 [LogNamespace::Legacy],
1877 ),
1878 )],
1879 LogNamespace::Legacy,
1880 );
1881
1882 assert_eq!(
1883 HashSet::from([None, Some("dropped".to_string())]),
1884 outputs1
1885 .into_iter()
1886 .map(|output| output.port)
1887 .collect::<HashSet<_>>()
1888 );
1889 }
1890
1891 #[test]
1892 fn test_non_object_events() {
1893 let transform1 = RemapConfig {
1894 source: Some(r#". = "fish" "#.to_string()),
1896 ..Default::default()
1897 };
1898
1899 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1900
1901 let outputs1 = transform1.outputs(
1902 enrichment_tables,
1903 &[(
1904 "in".into(),
1905 schema::Definition::new_with_default_metadata(
1906 Kind::any_object(),
1907 [LogNamespace::Legacy],
1908 ),
1909 )],
1910 LogNamespace::Legacy,
1911 );
1912
1913 let wanted = schema::Definition::new_with_default_metadata(
1914 Kind::object(Collection::from_unknown(Kind::undefined())),
1915 [LogNamespace::Legacy],
1916 )
1917 .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1918
1919 assert_eq!(
1920 HashMap::from([(OutputId::from("in"), wanted)]),
1921 outputs1[0].schema_definitions(true),
1922 );
1923 }
1924
1925 #[test]
1926 fn test_array_and_non_object_events() {
1927 let transform1 = RemapConfig {
1928 source: Some(
1929 indoc! {r#"
1930 if .lizard == true {
1931 .thing = [{"cabbage": 42}];
1932 . = unnest(.thing)
1933 } else {
1934 . = "fish"
1935 }
1936 "#}
1937 .to_string(),
1938 ),
1939 ..Default::default()
1940 };
1941
1942 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1943
1944 let outputs1 = transform1.outputs(
1945 enrichment_tables,
1946 &[(
1947 "in".into(),
1948 schema::Definition::new_with_default_metadata(
1949 Kind::any_object(),
1950 [LogNamespace::Legacy],
1951 ),
1952 )],
1953 LogNamespace::Legacy,
1954 );
1955
1956 let wanted = schema::Definition::new_with_default_metadata(
1957 Kind::any_object(),
1958 [LogNamespace::Legacy],
1959 )
1960 .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1961 .with_event_field(
1962 &owned_value_path!("thing"),
1963 Kind::object(Collection::from(BTreeMap::from([(
1964 "cabbage".into(),
1965 Kind::integer(),
1966 )])))
1967 .or_undefined(),
1968 None,
1969 );
1970
1971 assert_eq!(
1972 HashMap::from([(OutputId::from("in"), wanted)]),
1973 outputs1[0].schema_definitions(true),
1974 );
1975 }
1976
1977 #[test]
1978 fn check_remap_array_vector_namespace() {
1979 let event = {
1980 let mut event = LogEvent::from("input");
1981 event
1983 .metadata_mut()
1984 .value_mut()
1985 .insert("vector", BTreeMap::new());
1986 Event::from(event)
1987 };
1988
1989 let conf = RemapConfig {
1990 source: Some(
1991 r". = [null]
1992"
1993 .to_string(),
1994 ),
1995 file: None,
1996 drop_on_error: true,
1997 drop_on_abort: false,
1998 ..Default::default()
1999 };
2000 let mut tform = remap(conf.clone()).unwrap();
2001 let result = transform_one(&mut tform, event).unwrap();
2002
2003 assert_eq!(result.as_log().get("."), Some(&Value::Null));
2005
2006 let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
2007 let outputs1 = conf.outputs(
2008 enrichment_tables,
2009 &[(
2010 "in".into(),
2011 schema::Definition::new_with_default_metadata(
2012 Kind::any_object(),
2013 [LogNamespace::Vector],
2014 ),
2015 )],
2016 LogNamespace::Vector,
2017 );
2018
2019 let wanted =
2020 schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
2021
2022 assert_eq!(
2023 HashMap::from([(OutputId::from("in"), wanted)]),
2024 outputs1[0].schema_definitions(true),
2025 );
2026 }
2027
2028 fn assert_no_metrics(source: String) {
2029 vector_lib::metrics::init_test();
2030
2031 let config = RemapConfig {
2032 source: Some(source),
2033 drop_on_error: true,
2034 drop_on_abort: true,
2035 reroute_dropped: true,
2036 ..Default::default()
2037 };
2038 let mut ast_runner = remap(config).unwrap();
2039 let input_event =
2040 Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2041 let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2042 let dropped_log = dropped_event.as_log();
2043 assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2044
2045 let controller = Controller::get().expect("no controller");
2046 let metrics = controller
2047 .capture_metrics()
2048 .into_iter()
2049 .map(|metric| (metric.name().to_string(), metric))
2050 .collect::<BTreeMap<String, Metric>>();
2051 assert_eq!(metrics.get("component_discarded_events_total"), None);
2052 assert_eq!(metrics.get("component_errors_total"), None);
2053 }
2054 #[test]
2055 fn do_not_emit_metrics_when_dropped() {
2056 assert_no_metrics("abort".to_string());
2057 }
2058
2059 #[test]
2060 fn do_not_emit_metrics_when_errored() {
2061 assert_no_metrics("parse_key_value!(.message)".to_string());
2062 }
2063}