vector/transforms/
remap.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    fs::File,
4    io::{self, Read},
5    path::PathBuf,
6    sync::Mutex,
7};
8
9use snafu::{ResultExt, Snafu};
10use vector_lib::{
11    TimeZone,
12    codecs::MetricTagValues,
13    compile_vrl,
14    config::LogNamespace,
15    configurable::configurable_component,
16    enrichment::TableRegistry,
17    lookup::{PathPrefix, metadata_path, owned_value_path},
18    schema::Definition,
19};
20use vector_vrl_functions::set_semantic_meaning::MeaningList;
21use vector_vrl_metrics::MetricsStorage;
22use vrl::{
23    compiler::{
24        CompileConfig, ExpressionError, Program, TypeState, VrlRuntime,
25        runtime::{Runtime, Terminate},
26        state::ExternalEnv,
27    },
28    diagnostic::{DiagnosticMessage, Note},
29    path,
30    path::ValuePath,
31    value::{Kind, Value},
32};
33
34use crate::{
35    Result,
36    config::{
37        ComponentKey, DataType, Input, OutputId, TransformConfig, TransformContext,
38        TransformOutput, log_schema,
39    },
40    event::{Event, TargetEvents, VrlTarget},
41    format_vrl_diagnostics,
42    internal_events::{RemapMappingAbort, RemapMappingError},
43    schema,
44    transforms::{SyncTransform, Transform, TransformOutputsBuf},
45};
46
47const DROPPED: &str = "dropped";
48type CacheKey = (TableRegistry, schema::Definition);
49type CacheValue = (Program, String, MeaningList);
50
51/// Configuration for the `remap` transform.
52#[configurable_component(transform(
53    "remap",
54    "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
55))]
56#[derive(Derivative)]
57#[serde(deny_unknown_fields)]
58#[derivative(Default, Debug)]
59pub struct RemapConfig {
60    /// The [Vector Remap Language][vrl] (VRL) program to execute for each event.
61    ///
62    /// Required if `file` is missing.
63    ///
64    /// [vrl]: https://vector.dev/docs/reference/vrl
65    #[configurable(metadata(
66        docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
67        docs::syntax_override = "remap_program"
68    ))]
69    pub source: Option<String>,
70
71    /// File path to the [Vector Remap Language][vrl] (VRL) program to execute for each event.
72    ///
73    /// If a relative path is provided, its root is the current working directory.
74    ///
75    /// Required if `source` is missing.
76    ///
77    /// [vrl]: https://vector.dev/docs/reference/vrl
78    #[configurable(metadata(docs::examples = "./my/program.vrl"))]
79    pub file: Option<PathBuf>,
80
81    /// File paths to the [Vector Remap Language][vrl] (VRL) programs to execute for each event.
82    ///
83    /// If a relative path is provided, its root is the current working directory.
84    ///
85    /// Required if `source` or `file` are missing.
86    ///
87    /// [vrl]: https://vector.dev/docs/reference/vrl
88    #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
89    pub files: Option<Vec<PathBuf>>,
90
91    /// When set to `single`, metric tag values are exposed as single strings, the
92    /// same as they were before this config option. Tags with multiple values show the last assigned value, and null values
93    /// are ignored.
94    ///
95    /// When set to `full`, all metric tags are exposed as arrays of either string or null
96    /// values.
97    #[serde(default)]
98    pub metric_tag_values: MetricTagValues,
99
100    /// The name of the timezone to apply to timestamp conversions that do not contain an explicit
101    /// time zone.
102    ///
103    /// This overrides the [global `timezone`][global_timezone] option. The time zone name may be
104    /// any name in the [TZ database][tz_database], or `local` to indicate system local time.
105    ///
106    /// [global_timezone]: https://vector.dev/docs/reference/configuration//global-options#timezone
107    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
108    #[serde(default)]
109    #[configurable(metadata(docs::advanced))]
110    pub timezone: Option<TimeZone>,
111
112    /// Drops any event that encounters an error during processing.
113    ///
114    /// Normally, if a VRL program encounters an error when processing an event, the original,
115    /// unmodified event is sent downstream. In some cases, you may not want to send the event
116    /// any further, such as if certain transformation or enrichment is strictly required. Setting
117    /// `drop_on_error` to `true` allows you to ensure these events do not get processed any
118    /// further.
119    ///
120    /// Additionally, dropped events can potentially be diverted to a specially named output for
121    /// further logging and analysis by setting `reroute_dropped`.
122    #[serde(default = "crate::serde::default_false")]
123    #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
124    pub drop_on_error: bool,
125
126    /// Drops any event that is manually aborted during processing.
127    ///
128    /// If a VRL program is manually aborted (using [`abort`][vrl_docs_abort]) when
129    /// processing an event, this option controls whether the original, unmodified event is sent
130    /// downstream without any modifications or if it is dropped.
131    ///
132    /// Additionally, dropped events can potentially be diverted to a specially-named output for
133    /// further logging and analysis by setting `reroute_dropped`.
134    ///
135    /// [vrl_docs_abort]: https://vector.dev/docs/reference/vrl/expressions/#abort
136    #[serde(default = "crate::serde::default_true")]
137    #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
138    pub drop_on_abort: bool,
139
140    /// Reroutes dropped events to a named output instead of halting processing on them.
141    ///
142    /// When using `drop_on_error` or `drop_on_abort`, events that are "dropped" are processed no
143    /// further. In some cases, it may be desirable to keep the events around for further analysis,
144    /// debugging, or retrying.
145    ///
146    /// In these cases, `reroute_dropped` can be set to `true` which forwards the original event
147    /// to a specially-named output, `dropped`. The original event is annotated with additional
148    /// fields describing why the event was dropped.
149    #[serde(default = "crate::serde::default_false")]
150    #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
151    pub reroute_dropped: bool,
152
153    #[configurable(derived, metadata(docs::hidden))]
154    #[serde(default)]
155    pub runtime: VrlRuntime,
156
157    #[configurable(derived, metadata(docs::hidden))]
158    #[serde(skip)]
159    #[derivative(Debug = "ignore")]
160    /// Cache can't be `BTreeMap` or `HashMap` because of `TableRegistry`, which doesn't allow us to inspect tables inside it.
161    /// And even if we allowed the inspection, the tables can be huge, resulting in a long comparison or hash computation
162    /// while using `Vec` allows us to use just a shallow comparison
163    pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
164}
165
166impl Clone for RemapConfig {
167    fn clone(&self) -> Self {
168        Self {
169            source: self.source.clone(),
170            file: self.file.clone(),
171            files: self.files.clone(),
172            metric_tag_values: self.metric_tag_values,
173            timezone: self.timezone,
174            drop_on_error: self.drop_on_error,
175            drop_on_abort: self.drop_on_abort,
176            reroute_dropped: self.reroute_dropped,
177            runtime: self.runtime,
178            cache: Mutex::new(Default::default()),
179        }
180    }
181}
182
183impl RemapConfig {
184    fn compile_vrl_program(
185        &self,
186        enrichment_tables: TableRegistry,
187        metrics_storage: MetricsStorage,
188        merged_schema_definition: schema::Definition,
189    ) -> Result<(Program, String, MeaningList)> {
190        if let Some((_, res)) = self
191            .cache
192            .lock()
193            .expect("Data poisoned")
194            .iter()
195            .find(|v| v.0.0 == enrichment_tables && v.0.1 == merged_schema_definition)
196        {
197            return res.clone().map_err(Into::into);
198        }
199
200        let source = match (&self.source, &self.file, &self.files) {
201            (Some(source), None, None) => source.to_owned(),
202            (None, Some(path), None) => Self::read_file(path)?,
203            (None, None, Some(paths)) => {
204                let mut combined_source = String::new();
205                for path in paths {
206                    let content = Self::read_file(path)?;
207                    combined_source.push_str(&content);
208                    combined_source.push('\n');
209                }
210                combined_source
211            }
212            _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
213        };
214
215        let state = TypeState {
216            local: Default::default(),
217            external: ExternalEnv::new_with_kind(
218                merged_schema_definition.event_kind().clone(),
219                merged_schema_definition.metadata_kind().clone(),
220            ),
221        };
222        let mut config = CompileConfig::default();
223
224        config.set_custom(enrichment_tables.clone());
225        config.set_custom(metrics_storage);
226        config.set_custom(MeaningList::default());
227
228        let res = compile_vrl(&source, &vector_vrl_functions::all(), &state, config)
229            .map_err(|diagnostics| format_vrl_diagnostics(&source, diagnostics))
230            .map(|result| {
231                (
232                    result.program,
233                    format_vrl_diagnostics(&source, result.warnings),
234                    result.config.get_custom::<MeaningList>().unwrap().clone(),
235                )
236            });
237
238        self.cache
239            .lock()
240            .expect("Data poisoned")
241            .push(((enrichment_tables, merged_schema_definition), res.clone()));
242
243        res.map_err(Into::into)
244    }
245
246    fn read_file(path: &PathBuf) -> Result<String> {
247        let mut buffer = String::new();
248        File::open(path)
249            .with_context(|_| FileOpenFailedSnafu { path })?
250            .read_to_string(&mut buffer)
251            .with_context(|_| FileReadFailedSnafu { path })?;
252        Ok(buffer)
253    }
254}
255
256impl_generate_config_from_default!(RemapConfig);
257
258#[async_trait::async_trait]
259#[typetag::serde(name = "remap")]
260impl TransformConfig for RemapConfig {
261    async fn build(&self, context: &TransformContext) -> Result<Transform> {
262        let (transform, warnings) = match self.runtime {
263            VrlRuntime::Ast => {
264                let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
265                (Transform::synchronous(remap), warnings)
266            }
267        };
268
269        // TODO: We could improve on this by adding support for non-fatal error
270        // messages in the topology. This would make the topology responsible
271        // for printing warnings (including potentially emitting metrics),
272        // instead of individual transforms.
273        if !warnings.is_empty() {
274            warn!(message = "VRL compilation warning.", %warnings);
275        }
276
277        Ok(transform)
278    }
279
280    fn input(&self) -> Input {
281        Input::all()
282    }
283
284    fn outputs(
285        &self,
286        context: &TransformContext,
287        input_definitions: &[(OutputId, schema::Definition)],
288    ) -> Vec<TransformOutput> {
289        let merged_definition: Definition = input_definitions
290            .iter()
291            .map(|(_output, definition)| definition.clone())
292            .reduce(Definition::merge)
293            .unwrap_or_else(Definition::any);
294
295        // We need to compile the VRL program in order to know the schema definition output of this
296        // transform. We ignore any compilation errors, as those are caught by the transform build
297        // step.
298        let compiled = self
299            .compile_vrl_program(
300                context.enrichment_tables.clone(),
301                context.metrics_storage.clone(),
302                merged_definition,
303            )
304            .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
305            .map_err(|_| ());
306
307        let mut dropped_definitions = HashMap::new();
308        let mut default_definitions = HashMap::new();
309
310        for (output_id, input_definition) in input_definitions {
311            let default_definition = compiled
312                .clone()
313                .map(|(state, meaning)| {
314                    let mut new_type_def = Definition::new(
315                        state.external.target_kind().clone(),
316                        state.external.metadata_kind().clone(),
317                        input_definition.log_namespaces().clone(),
318                    );
319
320                    for (id, path) in input_definition.meanings() {
321                        // Attempt to copy over the meanings from the input definition.
322                        // The function will fail if the meaning that now points to a field that no longer exists,
323                        // this is fine since we will no longer want that meaning in the output definition.
324                        let _ = new_type_def.try_with_meaning(path.clone(), id);
325                    }
326
327                    // Apply any semantic meanings set in the VRL program
328                    for (id, path) in meaning {
329                        // currently only event paths are supported
330                        new_type_def = new_type_def.with_meaning(path, &id);
331                    }
332                    new_type_def
333                })
334                .unwrap_or_else(|_| {
335                    Definition::new_with_default_metadata(
336                        // The program failed to compile, so it can "never" return a value
337                        Kind::never(),
338                        input_definition.log_namespaces().clone(),
339                    )
340                });
341
342            // When a message is dropped and re-routed, we keep the original event, but also annotate
343            // it with additional metadata.
344            let dropped_definition = Definition::combine_log_namespaces(
345                input_definition.log_namespaces(),
346                input_definition.clone().with_event_field(
347                    log_schema().metadata_key().expect("valid metadata key"),
348                    Kind::object(BTreeMap::from([
349                        ("reason".into(), Kind::bytes()),
350                        ("message".into(), Kind::bytes()),
351                        ("component_id".into(), Kind::bytes()),
352                        ("component_type".into(), Kind::bytes()),
353                        ("component_kind".into(), Kind::bytes()),
354                    ])),
355                    Some("metadata"),
356                ),
357                input_definition
358                    .clone()
359                    .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
360                    .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
361                    .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
362                    .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
363                    .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
364            );
365
366            default_definitions.insert(
367                output_id.clone(),
368                VrlTarget::modify_schema_definition_for_into_events(default_definition),
369            );
370            dropped_definitions.insert(
371                output_id.clone(),
372                VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
373            );
374        }
375
376        let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
377
378        if self.reroute_dropped {
379            vec![
380                default_output,
381                TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
382            ]
383        } else {
384            vec![default_output]
385        }
386    }
387
388    fn enable_concurrency(&self) -> bool {
389        true
390    }
391
392    fn files_to_watch(&self) -> Vec<&PathBuf> {
393        self.file
394            .iter()
395            .chain(self.files.iter().flatten())
396            .collect()
397    }
398}
399
400#[derive(Debug, Clone)]
401pub struct Remap<Runner>
402where
403    Runner: VrlRunner,
404{
405    component_key: Option<ComponentKey>,
406    program: Program,
407    timezone: TimeZone,
408    drop_on_error: bool,
409    drop_on_abort: bool,
410    reroute_dropped: bool,
411    runner: Runner,
412    metric_tag_values: MetricTagValues,
413}
414
415pub trait VrlRunner {
416    fn run(
417        &mut self,
418        target: &mut VrlTarget,
419        program: &Program,
420        timezone: &TimeZone,
421    ) -> std::result::Result<Value, Terminate>;
422}
423
424#[derive(Debug)]
425pub struct AstRunner {
426    pub runtime: Runtime,
427}
428
429impl Clone for AstRunner {
430    fn clone(&self) -> Self {
431        Self {
432            runtime: Runtime::default(),
433        }
434    }
435}
436
437impl VrlRunner for AstRunner {
438    fn run(
439        &mut self,
440        target: &mut VrlTarget,
441        program: &Program,
442        timezone: &TimeZone,
443    ) -> std::result::Result<Value, Terminate> {
444        let result = self.runtime.resolve(target, program, timezone);
445        self.runtime.clear();
446        result
447    }
448}
449
450impl Remap<AstRunner> {
451    pub fn new_ast(
452        config: RemapConfig,
453        context: &TransformContext,
454    ) -> crate::Result<(Self, String)> {
455        let (program, warnings, _) = config.compile_vrl_program(
456            context.enrichment_tables.clone(),
457            context.metrics_storage.clone(),
458            context.merged_schema_definition.clone(),
459        )?;
460
461        let runtime = Runtime::default();
462        let runner = AstRunner { runtime };
463
464        Self::new(config, context, program, runner).map(|remap| (remap, warnings))
465    }
466}
467
468impl<Runner> Remap<Runner>
469where
470    Runner: VrlRunner,
471{
472    fn new(
473        config: RemapConfig,
474        context: &TransformContext,
475        program: Program,
476        runner: Runner,
477    ) -> crate::Result<Self> {
478        Ok(Remap {
479            component_key: context.key.clone(),
480            program,
481            timezone: config
482                .timezone
483                .unwrap_or_else(|| context.globals.timezone()),
484            drop_on_error: config.drop_on_error,
485            drop_on_abort: config.drop_on_abort,
486            reroute_dropped: config.reroute_dropped,
487            runner,
488            metric_tag_values: config.metric_tag_values,
489        })
490    }
491
492    #[cfg(test)]
493    const fn runner(&self) -> &Runner {
494        &self.runner
495    }
496
497    fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
498        let message = error
499            .notes()
500            .iter()
501            .rfind(|note| matches!(note, Note::UserErrorMessage(_)))
502            .map(|note| note.to_string())
503            .unwrap_or_else(|| error.to_string());
504        serde_json::json!({
505                "reason": reason,
506                "message": message,
507                "component_id": self.component_key,
508                "component_type": "remap",
509                "component_kind": "transform",
510        })
511    }
512
513    fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
514        match event {
515            Event::Log(log) => match log.namespace() {
516                LogNamespace::Legacy => {
517                    if let Some(metadata_key) = log_schema().metadata_key() {
518                        log.insert(
519                            (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
520                            self.dropped_data(reason, error),
521                        );
522                    }
523                }
524                LogNamespace::Vector => {
525                    log.insert(
526                        metadata_path!("vector", "dropped"),
527                        self.dropped_data(reason, error),
528                    );
529                }
530            },
531            Event::Metric(metric) => {
532                if let Some(metadata_key) = log_schema().metadata_key() {
533                    metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
534                    metric.replace_tag(
535                        format!("{metadata_key}.dropped.component_id"),
536                        self.component_key
537                            .as_ref()
538                            .map(ToString::to_string)
539                            .unwrap_or_default(),
540                    );
541                    metric.replace_tag(
542                        format!("{metadata_key}.dropped.component_type"),
543                        "remap".into(),
544                    );
545                    metric.replace_tag(
546                        format!("{metadata_key}.dropped.component_kind"),
547                        "transform".into(),
548                    );
549                }
550            }
551            Event::Trace(trace) => {
552                trace.maybe_insert(log_schema().metadata_key_target_path(), || {
553                    self.dropped_data(reason, error).into()
554                });
555            }
556        }
557    }
558
559    fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
560        self.runner.run(target, &self.program, &self.timezone)
561    }
562}
563
564impl<Runner> SyncTransform for Remap<Runner>
565where
566    Runner: VrlRunner + Clone + Send + Sync,
567{
568    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
569        // If a program can fail or abort at runtime and we know that we will still need to forward
570        // the event in that case (either to the main output or `dropped`, depending on the
571        // config), we need to clone the original event and keep it around, to allow us to discard
572        // any mutations made to the event while the VRL program runs, before it failed or aborted.
573        //
574        // The `drop_on_{error, abort}` transform config allows operators to remove events from the
575        // main output if they're failed or aborted, in which case we can skip the cloning, since
576        // any mutations made by VRL will be ignored regardless. If they hav configured
577        // `reroute_dropped`, however, we still need to do the clone to ensure that we can forward
578        // the event to the `dropped` output.
579        let forward_on_error = !self.drop_on_error || self.reroute_dropped;
580        let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
581        let original_event = if (self.program.info().fallible && forward_on_error)
582            || (self.program.info().abortable && forward_on_abort)
583        {
584            Some(event.clone())
585        } else {
586            None
587        };
588
589        let log_namespace = event
590            .maybe_as_log()
591            .map(|log| log.namespace())
592            .unwrap_or(LogNamespace::Legacy);
593
594        let mut target = VrlTarget::new(
595            event,
596            self.program.info(),
597            match self.metric_tag_values {
598                MetricTagValues::Single => false,
599                MetricTagValues::Full => true,
600            },
601        );
602        let result = self.run_vrl(&mut target);
603
604        match result {
605            Ok(_) => match target.into_events(log_namespace) {
606                TargetEvents::One(event) => push_default(event, output),
607                TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
608                TargetEvents::Traces(events) => {
609                    events.for_each(|event| push_default(event, output))
610                }
611            },
612            Err(reason) => {
613                let (reason, error, drop) = match reason {
614                    Terminate::Abort(error) => {
615                        if !self.reroute_dropped {
616                            emit!(RemapMappingAbort {
617                                event_dropped: self.drop_on_abort,
618                            });
619                        }
620                        ("abort", error, self.drop_on_abort)
621                    }
622                    Terminate::Error(error) => {
623                        if !self.reroute_dropped {
624                            emit!(RemapMappingError {
625                                error: error.to_string(),
626                                event_dropped: self.drop_on_error,
627                            });
628                        }
629                        ("error", error, self.drop_on_error)
630                    }
631                };
632
633                if !drop {
634                    let event = original_event.expect("event will be set");
635
636                    push_default(event, output);
637                } else if self.reroute_dropped {
638                    let mut event = original_event.expect("event will be set");
639
640                    self.annotate_dropped(&mut event, reason, error);
641                    push_dropped(event, output);
642                }
643            }
644        }
645    }
646}
647
648#[inline]
649fn push_default(event: Event, output: &mut TransformOutputsBuf) {
650    output.push(None, event)
651}
652
653#[inline]
654fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
655    output.push(Some(DROPPED), event);
656}
657
658#[derive(Debug, Snafu)]
659pub enum BuildError {
660    #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
661    SourceAndOrFileOrFiles,
662
663    #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
664    FileOpenFailed { path: PathBuf, source: io::Error },
665    #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
666    FileReadFailed { path: PathBuf, source: io::Error },
667}
668
669#[cfg(test)]
670mod tests {
671    use std::{
672        collections::{HashMap, HashSet},
673        sync::Arc,
674    };
675
676    use chrono::DateTime;
677    use indoc::{formatdoc, indoc};
678    use tokio::sync::mpsc;
679    use tokio_stream::wrappers::ReceiverStream;
680    use vector_lib::{config::GlobalOptions, event::EventMetadata, metric_tags};
681    use vrl::{btreemap, event_path, value::kind::Collection};
682
683    use super::*;
684    use crate::{
685        config::{ConfigBuilder, build_unit_tests},
686        event::{
687            LogEvent, Metric, Value,
688            metric::{MetricKind, MetricValue},
689        },
690        metrics::Controller,
691        schema,
692        test_util::components::{
693            COMPONENT_MULTIPLE_OUTPUTS_TESTS, assert_transform_compliance, init_test,
694        },
695        transforms::{OutputBuffer, test::create_topology},
696    };
697
698    fn test_default_schema_definition() -> schema::Definition {
699        schema::Definition::empty_legacy_namespace().with_event_field(
700            &owned_value_path!("a default field"),
701            Kind::integer().or_bytes(),
702            Some("default"),
703        )
704    }
705
706    fn test_dropped_schema_definition() -> schema::Definition {
707        schema::Definition::empty_legacy_namespace().with_event_field(
708            &owned_value_path!("a dropped field"),
709            Kind::boolean().or_null(),
710            Some("dropped"),
711        )
712    }
713
714    fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
715        let schema_definitions = HashMap::from([
716            (
717                None,
718                [("source".into(), test_default_schema_definition())].into(),
719            ),
720            (
721                Some(DROPPED.to_owned()),
722                [("source".into(), test_dropped_schema_definition())].into(),
723            ),
724        ]);
725
726        Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
727            .map(|(remap, _)| remap)
728    }
729
730    #[test]
731    fn generate_config() {
732        crate::test_util::test_generate_config::<RemapConfig>();
733    }
734
735    #[test]
736    fn config_missing_source_and_file() {
737        let config = RemapConfig {
738            source: None,
739            file: None,
740            ..Default::default()
741        };
742
743        let err = remap(config).unwrap_err().to_string();
744        assert_eq!(
745            &err,
746            "must provide exactly one of `source` or `file` or `files` configuration"
747        )
748    }
749
750    #[test]
751    fn config_both_source_and_file() {
752        let config = RemapConfig {
753            source: Some("".to_owned()),
754            file: Some("".into()),
755            ..Default::default()
756        };
757
758        let err = remap(config).unwrap_err().to_string();
759        assert_eq!(
760            &err,
761            "must provide exactly one of `source` or `file` or `files` configuration"
762        )
763    }
764
765    fn get_field_string(event: &Event, field: &str) -> String {
766        event
767            .as_log()
768            .get(field)
769            .unwrap()
770            .to_string_lossy()
771            .into_owned()
772    }
773
774    #[test]
775    fn check_remap_doesnt_share_state_between_events() {
776        let conf = RemapConfig {
777            source: Some(".foo = .sentinel".to_string()),
778            file: None,
779            drop_on_error: true,
780            drop_on_abort: false,
781            ..Default::default()
782        };
783        let mut tform = remap(conf).unwrap();
784        assert!(tform.runner().runtime.is_empty());
785
786        let event1 = {
787            let mut event1 = LogEvent::from("event1");
788            event1.insert("sentinel", "bar");
789            Event::from(event1)
790        };
791        let result1 = transform_one(&mut tform, event1).unwrap();
792        assert_eq!(get_field_string(&result1, "message"), "event1");
793        assert_eq!(get_field_string(&result1, "foo"), "bar");
794        assert!(tform.runner().runtime.is_empty());
795
796        let event2 = {
797            let event2 = LogEvent::from("event2");
798            Event::from(event2)
799        };
800        let result2 = transform_one(&mut tform, event2).unwrap();
801        assert_eq!(get_field_string(&result2, "message"), "event2");
802        assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
803        assert!(tform.runner().runtime.is_empty());
804    }
805
806    #[test]
807    fn remap_return_raw_string_vector_namespace() {
808        let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
809
810        let event = {
811            let mut metadata = EventMetadata::default()
812                .with_schema_definition(&Arc::new(initial_definition.clone()));
813            // the Vector metadata field is required for an event to correctly detect the namespace at runtime
814            metadata
815                .value_mut()
816                .insert(&owned_value_path!("vector"), BTreeMap::new());
817
818            let mut event = LogEvent::new_with_metadata(metadata);
819            event.insert("copy_from", "buz");
820            Event::from(event)
821        };
822
823        let conf = RemapConfig {
824            source: Some(r#"  . = "root string";"#.to_string()),
825            file: None,
826            drop_on_error: true,
827            drop_on_abort: false,
828            ..Default::default()
829        };
830        let mut tform = remap(conf.clone()).unwrap();
831        let result = transform_one(&mut tform, event).unwrap();
832        assert_eq!(get_field_string(&result, "."), "root string");
833
834        let mut outputs = conf.outputs(
835            &Default::default(),
836            &[(OutputId::dummy(), initial_definition)],
837        );
838
839        assert_eq!(outputs.len(), 1);
840        let output = outputs.pop().unwrap();
841        assert_eq!(output.port, None);
842        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
843        let expected_schema =
844            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
845        assert_eq!(actual_schema_def, expected_schema);
846    }
847
848    #[test]
849    fn check_remap_adds() {
850        let event = {
851            let mut event = LogEvent::from("augment me");
852            event.insert("copy_from", "buz");
853            Event::from(event)
854        };
855
856        let conf = RemapConfig {
857            source: Some(
858                r#"  .foo = "bar"
859  .bar = "baz"
860  .copy = .copy_from
861"#
862                .to_string(),
863            ),
864            file: None,
865            drop_on_error: true,
866            drop_on_abort: false,
867            ..Default::default()
868        };
869        let mut tform = remap(conf).unwrap();
870        let result = transform_one(&mut tform, event).unwrap();
871        assert_eq!(get_field_string(&result, "message"), "augment me");
872        assert_eq!(get_field_string(&result, "copy_from"), "buz");
873        assert_eq!(get_field_string(&result, "foo"), "bar");
874        assert_eq!(get_field_string(&result, "bar"), "baz");
875        assert_eq!(get_field_string(&result, "copy"), "buz");
876    }
877
878    #[test]
879    fn check_remap_emits_multiple() {
880        let event = {
881            let mut event = LogEvent::from("augment me");
882            event.insert(
883                "events",
884                vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
885            );
886            Event::from(event)
887        };
888
889        let conf = RemapConfig {
890            source: Some(
891                indoc! {r"
892                . = .events
893            "}
894                .to_owned(),
895            ),
896            file: None,
897            drop_on_error: true,
898            drop_on_abort: false,
899            ..Default::default()
900        };
901        let mut tform = remap(conf).unwrap();
902
903        let out = collect_outputs(&mut tform, event);
904        assert_eq!(2, out.primary.len());
905        let mut result = out.primary.into_events();
906
907        let r = result.next().unwrap();
908        assert_eq!(get_field_string(&r, "message"), "foo");
909        let r = result.next().unwrap();
910        assert_eq!(get_field_string(&r, "message"), "bar");
911    }
912
913    #[test]
914    fn check_remap_error() {
915        let event = {
916            let mut event = Event::Log(LogEvent::from("augment me"));
917            event.as_mut_log().insert("bar", "is a string");
918            event
919        };
920
921        let conf = RemapConfig {
922            source: Some(formatdoc! {r#"
923                .foo = "foo"
924                .not_an_int = int!(.bar)
925                .baz = 12
926            "#}),
927            file: None,
928            drop_on_error: false,
929            drop_on_abort: false,
930            ..Default::default()
931        };
932        let mut tform = remap(conf).unwrap();
933
934        let event = transform_one(&mut tform, event).unwrap();
935
936        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
937        assert!(event.as_log().get("foo").is_none());
938        assert!(event.as_log().get("baz").is_none());
939    }
940
941    #[test]
942    fn check_remap_error_drop() {
943        let event = {
944            let mut event = Event::Log(LogEvent::from("augment me"));
945            event.as_mut_log().insert("bar", "is a string");
946            event
947        };
948
949        let conf = RemapConfig {
950            source: Some(formatdoc! {r#"
951                .foo = "foo"
952                .not_an_int = int!(.bar)
953                .baz = 12
954            "#}),
955            file: None,
956            drop_on_error: true,
957            drop_on_abort: false,
958            ..Default::default()
959        };
960        let mut tform = remap(conf).unwrap();
961
962        assert!(transform_one(&mut tform, event).is_none())
963    }
964
965    #[test]
966    fn check_remap_error_infallible() {
967        let event = {
968            let mut event = Event::Log(LogEvent::from("augment me"));
969            event.as_mut_log().insert("bar", "is a string");
970            event
971        };
972
973        let conf = RemapConfig {
974            source: Some(formatdoc! {r#"
975                .foo = "foo"
976                .baz = 12
977            "#}),
978            file: None,
979            drop_on_error: false,
980            drop_on_abort: false,
981            ..Default::default()
982        };
983        let mut tform = remap(conf).unwrap();
984
985        let event = transform_one(&mut tform, event).unwrap();
986
987        assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
988        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
989        assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
990    }
991
992    #[test]
993    fn check_remap_abort() {
994        let event = {
995            let mut event = Event::Log(LogEvent::from("augment me"));
996            event.as_mut_log().insert("bar", "is a string");
997            event
998        };
999
1000        let conf = RemapConfig {
1001            source: Some(formatdoc! {r#"
1002                .foo = "foo"
1003                abort
1004                .baz = 12
1005            "#}),
1006            file: None,
1007            drop_on_error: false,
1008            drop_on_abort: false,
1009            ..Default::default()
1010        };
1011        let mut tform = remap(conf).unwrap();
1012
1013        let event = transform_one(&mut tform, event).unwrap();
1014
1015        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1016        assert!(event.as_log().get("foo").is_none());
1017        assert!(event.as_log().get("baz").is_none());
1018    }
1019
1020    #[test]
1021    fn check_remap_abort_drop() {
1022        let event = {
1023            let mut event = Event::Log(LogEvent::from("augment me"));
1024            event.as_mut_log().insert("bar", "is a string");
1025            event
1026        };
1027
1028        let conf = RemapConfig {
1029            source: Some(formatdoc! {r#"
1030                .foo = "foo"
1031                abort
1032                .baz = 12
1033            "#}),
1034            file: None,
1035            drop_on_error: false,
1036            drop_on_abort: true,
1037            ..Default::default()
1038        };
1039        let mut tform = remap(conf).unwrap();
1040
1041        assert!(transform_one(&mut tform, event).is_none())
1042    }
1043
1044    #[test]
1045    fn check_remap_metric() {
1046        let metric = Event::Metric(Metric::new(
1047            "counter",
1048            MetricKind::Absolute,
1049            MetricValue::Counter { value: 1.0 },
1050        ));
1051        let metadata = metric.metadata().clone();
1052
1053        let conf = RemapConfig {
1054            source: Some(
1055                r#".tags.host = "zoobub"
1056                       .name = "zork"
1057                       .namespace = "zerk"
1058                       .kind = "incremental""#
1059                    .to_string(),
1060            ),
1061            file: None,
1062            drop_on_error: true,
1063            drop_on_abort: false,
1064            ..Default::default()
1065        };
1066        let mut tform = remap(conf).unwrap();
1067
1068        let result = transform_one(&mut tform, metric).unwrap();
1069        assert_eq!(
1070            result,
1071            Event::Metric(
1072                Metric::new_with_metadata(
1073                    "zork",
1074                    MetricKind::Incremental,
1075                    MetricValue::Counter { value: 1.0 },
1076                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1077                    // to the actual value to skip the assertion here
1078                    metadata
1079                )
1080                .with_namespace(Some("zerk"))
1081                .with_tags(Some(metric_tags! {
1082                    "host" => "zoobub",
1083                }))
1084            )
1085        );
1086    }
1087
1088    #[test]
1089    fn remap_timezone_fallback() {
1090        let error = Event::from_json_value(
1091            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1092            LogNamespace::Legacy,
1093        )
1094        .unwrap();
1095        let conf = RemapConfig {
1096            source: Some(formatdoc! {r#"
1097                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1098            "#}),
1099            drop_on_error: true,
1100            drop_on_abort: true,
1101            reroute_dropped: true,
1102            ..Default::default()
1103        };
1104        let context = TransformContext {
1105            key: Some(ComponentKey::from("remapper")),
1106            globals: GlobalOptions {
1107                timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1108                ..Default::default()
1109            },
1110            ..Default::default()
1111        };
1112        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1113
1114        let output = transform_one_fallible(&mut tform, error).unwrap();
1115        let log = output.as_log();
1116        assert_eq!(
1117            log["timestamp"],
1118            DateTime::<chrono::Utc>::from(
1119                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1120            )
1121            .into()
1122        );
1123    }
1124
1125    #[test]
1126    fn remap_timezone_override() {
1127        let error = Event::from_json_value(
1128            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1129            LogNamespace::Legacy,
1130        )
1131        .unwrap();
1132        let conf = RemapConfig {
1133            source: Some(formatdoc! {r#"
1134                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1135            "#}),
1136            drop_on_error: true,
1137            drop_on_abort: true,
1138            reroute_dropped: true,
1139            timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1140            ..Default::default()
1141        };
1142        let context = TransformContext {
1143            key: Some(ComponentKey::from("remapper")),
1144            globals: GlobalOptions {
1145                timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1146                ..Default::default()
1147            },
1148            ..Default::default()
1149        };
1150        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1151
1152        let output = transform_one_fallible(&mut tform, error).unwrap();
1153        let log = output.as_log();
1154        assert_eq!(
1155            log["timestamp"],
1156            DateTime::<chrono::Utc>::from(
1157                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1158            )
1159            .into()
1160        );
1161    }
1162
1163    #[test]
1164    fn check_remap_branching() {
1165        let happy =
1166            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1167                .unwrap();
1168        let abort = Event::from_json_value(
1169            serde_json::json!({"hello": "goodbye"}),
1170            LogNamespace::Legacy,
1171        )
1172        .unwrap();
1173        let error =
1174            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1175
1176        let happy_metric = {
1177            let mut metric = Metric::new(
1178                "counter",
1179                MetricKind::Absolute,
1180                MetricValue::Counter { value: 1.0 },
1181            );
1182            metric.replace_tag("hello".into(), "world".into());
1183            Event::Metric(metric)
1184        };
1185
1186        let abort_metric = {
1187            let mut metric = Metric::new(
1188                "counter",
1189                MetricKind::Absolute,
1190                MetricValue::Counter { value: 1.0 },
1191            );
1192            metric.replace_tag("hello".into(), "goodbye".into());
1193            Event::Metric(metric)
1194        };
1195
1196        let error_metric = {
1197            let mut metric = Metric::new(
1198                "counter",
1199                MetricKind::Absolute,
1200                MetricValue::Counter { value: 1.0 },
1201            );
1202            metric.replace_tag("not_hello".into(), "oops".into());
1203            Event::Metric(metric)
1204        };
1205
1206        let conf = RemapConfig {
1207            source: Some(formatdoc! {r#"
1208                if exists(.tags) {{
1209                    # metrics
1210                    .tags.foo = "bar"
1211                    if string!(.tags.hello) == "goodbye" {{
1212                      abort
1213                    }}
1214                }} else {{
1215                    # logs
1216                    .foo = "bar"
1217                    if string(.hello) == "goodbye" {{
1218                      abort
1219                    }}
1220                }}
1221            "#}),
1222            drop_on_error: true,
1223            drop_on_abort: true,
1224            reroute_dropped: true,
1225            ..Default::default()
1226        };
1227        let schema_definitions = HashMap::from([
1228            (
1229                None,
1230                [("source".into(), test_default_schema_definition())].into(),
1231            ),
1232            (
1233                Some(DROPPED.to_owned()),
1234                [("source".into(), test_dropped_schema_definition())].into(),
1235            ),
1236        ]);
1237        let context = TransformContext {
1238            key: Some(ComponentKey::from("remapper")),
1239            schema_definitions,
1240            merged_schema_definition: schema::Definition::new_with_default_metadata(
1241                Kind::any_object(),
1242                [LogNamespace::Legacy],
1243            )
1244            .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1245            ..Default::default()
1246        };
1247        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1248
1249        let output = transform_one_fallible(&mut tform, happy).unwrap();
1250        let log = output.as_log();
1251        assert_eq!(log["hello"], "world".into());
1252        assert_eq!(log["foo"], "bar".into());
1253        assert!(!log.contains(event_path!("metadata")));
1254
1255        let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1256        let log = output.as_log();
1257        assert_eq!(log["hello"], "goodbye".into());
1258        assert!(!log.contains(event_path!("foo")));
1259        assert_eq!(
1260            log["metadata"],
1261            serde_json::json!({
1262                "dropped": {
1263                    "reason": "abort",
1264                    "message": "aborted",
1265                    "component_id": "remapper",
1266                    "component_type": "remap",
1267                    "component_kind": "transform",
1268                }
1269            })
1270            .try_into()
1271            .unwrap()
1272        );
1273
1274        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1275        let log = output.as_log();
1276        assert_eq!(log["hello"], 42.into());
1277        assert!(!log.contains(event_path!("foo")));
1278        assert_eq!(
1279            log["metadata"],
1280            serde_json::json!({
1281                "dropped": {
1282                    "reason": "error",
1283                    "message": "function call error for \"string\" at (160:174): expected string, got integer",
1284                    "component_id": "remapper",
1285                    "component_type": "remap",
1286                    "component_kind": "transform",
1287                }
1288            })
1289            .try_into()
1290            .unwrap()
1291        );
1292
1293        let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1294        similar_asserts::assert_eq!(
1295            output,
1296            Event::Metric(
1297                Metric::new_with_metadata(
1298                    "counter",
1299                    MetricKind::Absolute,
1300                    MetricValue::Counter { value: 1.0 },
1301                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1302                    // to the actual value to skip the assertion here
1303                    EventMetadata::default()
1304                        .with_schema_definition(output.metadata().schema_definition()),
1305                )
1306                .with_tags(Some(metric_tags! {
1307                    "hello" => "world",
1308                    "foo" => "bar",
1309                }))
1310            )
1311        );
1312
1313        let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1314        similar_asserts::assert_eq!(
1315            output,
1316            Event::Metric(
1317                Metric::new_with_metadata(
1318                    "counter",
1319                    MetricKind::Absolute,
1320                    MetricValue::Counter { value: 1.0 },
1321                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1322                    // to the actual value to skip the assertion here
1323                    EventMetadata::default()
1324                        .with_schema_definition(output.metadata().schema_definition()),
1325                )
1326                .with_tags(Some(metric_tags! {
1327                    "hello" => "goodbye",
1328                    "metadata.dropped.reason" => "abort",
1329                    "metadata.dropped.component_id" => "remapper",
1330                    "metadata.dropped.component_type" => "remap",
1331                    "metadata.dropped.component_kind" => "transform",
1332                }))
1333            )
1334        );
1335
1336        let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1337        similar_asserts::assert_eq!(
1338            output,
1339            Event::Metric(
1340                Metric::new_with_metadata(
1341                    "counter",
1342                    MetricKind::Absolute,
1343                    MetricValue::Counter { value: 1.0 },
1344                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1345                    // to the actual value to skip the assertion here
1346                    EventMetadata::default()
1347                        .with_schema_definition(output.metadata().schema_definition()),
1348                )
1349                .with_tags(Some(metric_tags! {
1350                    "not_hello" => "oops",
1351                    "metadata.dropped.reason" => "error",
1352                    "metadata.dropped.component_id" => "remapper",
1353                    "metadata.dropped.component_type" => "remap",
1354                    "metadata.dropped.component_kind" => "transform",
1355                }))
1356            )
1357        );
1358    }
1359
1360    #[test]
1361    fn check_remap_branching_assert_with_message() {
1362        let error_trigger_assert_custom_message =
1363            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1364        let error_trigger_default_assert_message =
1365            Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1366        let conf = RemapConfig {
1367            source: Some(formatdoc! {r#"
1368                assert_eq!(.hello, 0, "custom message here")
1369                assert_eq!(.hello, 1)
1370            "#}),
1371            drop_on_error: true,
1372            drop_on_abort: true,
1373            reroute_dropped: true,
1374            ..Default::default()
1375        };
1376        let context = TransformContext {
1377            key: Some(ComponentKey::from("remapper")),
1378            ..Default::default()
1379        };
1380        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1381
1382        let output =
1383            transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1384        let log = output.as_log();
1385        assert_eq!(log["hello"], 42.into());
1386        assert!(!log.contains(event_path!("foo")));
1387        assert_eq!(
1388            log["metadata"],
1389            serde_json::json!({
1390                "dropped": {
1391                    "reason": "error",
1392                    "message": "custom message here",
1393                    "component_id": "remapper",
1394                    "component_type": "remap",
1395                    "component_kind": "transform",
1396                }
1397            })
1398            .try_into()
1399            .unwrap()
1400        );
1401
1402        let output =
1403            transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1404        let log = output.as_log();
1405        assert_eq!(log["hello"], 0.into());
1406        assert!(!log.contains(event_path!("foo")));
1407        assert_eq!(
1408            log["metadata"],
1409            serde_json::json!({
1410                "dropped": {
1411                    "reason": "error",
1412                    "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1413                    "component_id": "remapper",
1414                    "component_type": "remap",
1415                    "component_kind": "transform",
1416                }
1417            })
1418            .try_into()
1419            .unwrap()
1420        );
1421    }
1422
1423    #[test]
1424    fn check_remap_branching_abort_with_message() {
1425        let error =
1426            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1427        let conf = RemapConfig {
1428            source: Some(formatdoc! {r#"
1429                abort "custom message here"
1430            "#}),
1431            drop_on_error: true,
1432            drop_on_abort: true,
1433            reroute_dropped: true,
1434            ..Default::default()
1435        };
1436        let context = TransformContext {
1437            key: Some(ComponentKey::from("remapper")),
1438            ..Default::default()
1439        };
1440        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1441
1442        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1443        let log = output.as_log();
1444        assert_eq!(log["hello"], 42.into());
1445        assert!(!log.contains(event_path!("foo")));
1446        assert_eq!(
1447            log["metadata"],
1448            serde_json::json!({
1449                "dropped": {
1450                    "reason": "abort",
1451                    "message": "custom message here",
1452                    "component_id": "remapper",
1453                    "component_type": "remap",
1454                    "component_kind": "transform",
1455                }
1456            })
1457            .try_into()
1458            .unwrap()
1459        );
1460    }
1461
1462    #[test]
1463    fn check_remap_branching_disabled() {
1464        let happy =
1465            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1466                .unwrap();
1467        let abort = Event::from_json_value(
1468            serde_json::json!({"hello": "goodbye"}),
1469            LogNamespace::Legacy,
1470        )
1471        .unwrap();
1472        let error =
1473            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1474
1475        let conf = RemapConfig {
1476            source: Some(formatdoc! {r#"
1477                if exists(.tags) {{
1478                    # metrics
1479                    .tags.foo = "bar"
1480                    if string!(.tags.hello) == "goodbye" {{
1481                      abort
1482                    }}
1483                }} else {{
1484                    # logs
1485                    .foo = "bar"
1486                    if string!(.hello) == "goodbye" {{
1487                      abort
1488                    }}
1489                }}
1490            "#}),
1491            drop_on_error: true,
1492            drop_on_abort: true,
1493            reroute_dropped: false,
1494            ..Default::default()
1495        };
1496
1497        let schema_definition = schema::Definition::new_with_default_metadata(
1498            Kind::any_object(),
1499            [LogNamespace::Legacy],
1500        )
1501        .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1502        .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1503
1504        assert_eq!(
1505            conf.outputs(
1506                &Default::default(),
1507                &[(
1508                    "test".into(),
1509                    schema::Definition::new_with_default_metadata(
1510                        Kind::any_object(),
1511                        [LogNamespace::Legacy]
1512                    )
1513                )],
1514            ),
1515            vec![TransformOutput::new(
1516                DataType::all_bits(),
1517                [("test".into(), schema_definition)].into()
1518            )]
1519        );
1520
1521        let context = TransformContext {
1522            key: Some(ComponentKey::from("remapper")),
1523            ..Default::default()
1524        };
1525        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1526
1527        let output = transform_one_fallible(&mut tform, happy).unwrap();
1528        let log = output.as_log();
1529        assert_eq!(log["hello"], "world".into());
1530        assert_eq!(log["foo"], "bar".into());
1531        assert!(!log.contains(event_path!("metadata")));
1532
1533        let out = collect_outputs(&mut tform, abort);
1534        assert!(out.primary.is_empty());
1535        assert!(out.named[DROPPED].is_empty());
1536
1537        let out = collect_outputs(&mut tform, error);
1538        assert!(out.primary.is_empty());
1539        assert!(out.named[DROPPED].is_empty());
1540    }
1541
1542    #[tokio::test]
1543    async fn check_remap_branching_metrics_with_output() {
1544        init_test();
1545
1546        let config: ConfigBuilder = toml::from_str(indoc! {r#"
1547            [transforms.foo]
1548            inputs = []
1549            type = "remap"
1550            drop_on_abort = true
1551            reroute_dropped = true
1552            source = "abort"
1553
1554            [[tests]]
1555            name = "metric output"
1556
1557            [tests.input]
1558                insert_at = "foo"
1559                value = "none"
1560
1561            [[tests.outputs]]
1562                extract_from = "foo.dropped"
1563                [[tests.outputs.conditions]]
1564                type = "vrl"
1565                source = "true"
1566        "#})
1567        .unwrap();
1568
1569        let mut tests = build_unit_tests(config).await.unwrap();
1570        assert!(tests.remove(0).run().await.errors.is_empty());
1571        // Check that metrics were emitted with output tag
1572        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1573    }
1574
1575    struct CollectedOuput {
1576        primary: OutputBuffer,
1577        named: HashMap<String, OutputBuffer>,
1578    }
1579
1580    fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput {
1581        let mut outputs = TransformOutputsBuf::new_with_capacity(
1582            vec![
1583                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1584                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1585            ],
1586            1,
1587        );
1588
1589        ft.transform(event, &mut outputs);
1590
1591        CollectedOuput {
1592            primary: outputs.take_primary(),
1593            named: outputs.take_all_named(),
1594        }
1595    }
1596
1597    fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1598        let out = collect_outputs(ft, event);
1599        assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1600        assert!(out.primary.len() <= 1);
1601        out.primary.into_events().next()
1602    }
1603
1604    fn transform_one_fallible(
1605        ft: &mut dyn SyncTransform,
1606        event: Event,
1607    ) -> std::result::Result<Event, Event> {
1608        let mut outputs = TransformOutputsBuf::new_with_capacity(
1609            vec![
1610                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1611                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1612            ],
1613            1,
1614        );
1615
1616        ft.transform(event, &mut outputs);
1617
1618        let mut buf = outputs.drain().collect::<Vec<_>>();
1619        let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1620
1621        assert!(buf.len() < 2);
1622        assert!(err_buf.len() < 2);
1623        match (buf.pop(), err_buf.pop()) {
1624            (Some(good), None) => Ok(good),
1625            (None, Some(bad)) => Err(bad),
1626            (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1627        }
1628    }
1629
1630    #[tokio::test]
1631    async fn emits_internal_events() {
1632        assert_transform_compliance(async move {
1633            let config = RemapConfig {
1634                source: Some("abort".to_owned()),
1635                drop_on_abort: true,
1636                ..Default::default()
1637            };
1638
1639            let (tx, rx) = mpsc::channel(1);
1640            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1641
1642            let log = LogEvent::from("hello world");
1643            tx.send(log.into()).await.unwrap();
1644
1645            drop(tx);
1646            topology.stop().await;
1647            assert_eq!(out.recv().await, None);
1648        })
1649        .await
1650    }
1651
1652    #[test]
1653    fn test_combined_transforms_simple() {
1654        // Make sure that when getting the definitions from one transform and
1655        // passing them to another the correct definition is still produced.
1656
1657        // Transform 1 sets a simple value.
1658        let transform1 = RemapConfig {
1659            source: Some(r#".thing = "potato""#.to_string()),
1660            ..Default::default()
1661        };
1662
1663        let transform2 = RemapConfig {
1664            source: Some(".thang = .thing".to_string()),
1665            ..Default::default()
1666        };
1667
1668        let outputs1 = transform1.outputs(
1669            &Default::default(),
1670            &[("in".into(), schema::Definition::default_legacy_namespace())],
1671        );
1672
1673        assert_eq!(
1674            vec![TransformOutput::new(
1675                DataType::all_bits(),
1676                // The `never` definition should have been passed on to the end.
1677                [(
1678                    "in".into(),
1679                    Definition::default_legacy_namespace().with_event_field(
1680                        &owned_value_path!("thing"),
1681                        Kind::bytes(),
1682                        None
1683                    ),
1684                )]
1685                .into()
1686            )],
1687            outputs1
1688        );
1689
1690        let outputs2 = transform2.outputs(
1691            &Default::default(),
1692            &[(
1693                "in1".into(),
1694                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1695            )],
1696        );
1697
1698        assert_eq!(
1699            vec![TransformOutput::new(
1700                DataType::all_bits(),
1701                [(
1702                    "in1".into(),
1703                    Definition::default_legacy_namespace()
1704                        .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1705                        .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1706                )]
1707                .into(),
1708            )],
1709            outputs2
1710        );
1711    }
1712
1713    #[test]
1714    fn test_combined_transforms_unnest() {
1715        // Make sure that when getting the definitions from one transform and
1716        // passing them to another the correct definition is still produced.
1717
1718        // Transform 1 sets a simple value.
1719        let transform1 = RemapConfig {
1720            source: Some(
1721                indoc! {
1722                r#"
1723                .thing = [{"cabbage": 32}, {"parsnips": 45}]
1724                . = unnest(.thing)
1725                "#
1726                }
1727                .to_string(),
1728            ),
1729            ..Default::default()
1730        };
1731
1732        let transform2 = RemapConfig {
1733            source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1734            ..Default::default()
1735        };
1736
1737        let outputs1 = transform1.outputs(
1738            &Default::default(),
1739            &[(
1740                "in".into(),
1741                schema::Definition::new_with_default_metadata(
1742                    Kind::any_object(),
1743                    [LogNamespace::Legacy],
1744                ),
1745            )],
1746        );
1747
1748        assert_eq!(
1749            vec![TransformOutput::new(
1750                DataType::all_bits(),
1751                [(
1752                    "in".into(),
1753                    Definition::new_with_default_metadata(
1754                        Kind::any_object(),
1755                        [LogNamespace::Legacy]
1756                    )
1757                    .with_event_field(
1758                        &owned_value_path!("thing"),
1759                        Kind::object(Collection::from(BTreeMap::from([
1760                            ("cabbage".into(), Kind::integer().or_undefined(),),
1761                            ("parsnips".into(), Kind::integer().or_undefined(),)
1762                        ]))),
1763                        None
1764                    ),
1765                )]
1766                .into(),
1767            )],
1768            outputs1
1769        );
1770
1771        let outputs2 = transform2.outputs(
1772            &Default::default(),
1773            &[(
1774                "in1".into(),
1775                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1776            )],
1777        );
1778
1779        assert_eq!(
1780            vec![TransformOutput::new(
1781                DataType::all_bits(),
1782                [(
1783                    "in1".into(),
1784                    Definition::default_legacy_namespace()
1785                        .with_event_field(
1786                            &owned_value_path!("thing"),
1787                            Kind::object(Collection::from(BTreeMap::from([
1788                                ("cabbage".into(), Kind::integer().or_undefined(),),
1789                                ("parsnips".into(), Kind::integer().or_undefined(),)
1790                            ]))),
1791                            None
1792                        )
1793                        .with_event_field(
1794                            &owned_value_path!("thang"),
1795                            Kind::integer().or_null(),
1796                            None
1797                        ),
1798                )]
1799                .into(),
1800            )],
1801            outputs2
1802        );
1803    }
1804
1805    #[test]
1806    fn test_transform_abort() {
1807        // An abort should not change the typedef.
1808
1809        let transform1 = RemapConfig {
1810            source: Some(r"abort".to_string()),
1811            ..Default::default()
1812        };
1813
1814        let outputs1 = transform1.outputs(
1815            &Default::default(),
1816            &[(
1817                "in".into(),
1818                schema::Definition::new_with_default_metadata(
1819                    Kind::any_object(),
1820                    [LogNamespace::Legacy],
1821                ),
1822            )],
1823        );
1824
1825        assert_eq!(
1826            vec![TransformOutput::new(
1827                DataType::all_bits(),
1828                [(
1829                    "in".into(),
1830                    Definition::new_with_default_metadata(
1831                        Kind::any_object(),
1832                        [LogNamespace::Legacy]
1833                    ),
1834                )]
1835                .into(),
1836            )],
1837            outputs1
1838        );
1839    }
1840
1841    #[test]
1842    fn test_error_outputs() {
1843        // Even if we fail to compile the VRL it should still output
1844        // the correct ports. This may change if we separate the
1845        // `outputs` function into one returning outputs and a separate
1846        // returning schema definitions.
1847        let transform1 = RemapConfig {
1848            // This enrichment table does not exist.
1849            source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1850            reroute_dropped: true,
1851            ..Default::default()
1852        };
1853
1854        let outputs1 = transform1.outputs(
1855            &Default::default(),
1856            &[(
1857                "in".into(),
1858                schema::Definition::new_with_default_metadata(
1859                    Kind::any_object(),
1860                    [LogNamespace::Legacy],
1861                ),
1862            )],
1863        );
1864
1865        assert_eq!(
1866            HashSet::from([None, Some("dropped".to_string())]),
1867            outputs1
1868                .into_iter()
1869                .map(|output| output.port)
1870                .collect::<HashSet<_>>()
1871        );
1872    }
1873
1874    #[test]
1875    fn test_non_object_events() {
1876        let transform1 = RemapConfig {
1877            // This enrichment table does not exist.
1878            source: Some(r#". = "fish" "#.to_string()),
1879            ..Default::default()
1880        };
1881
1882        let outputs1 = transform1.outputs(
1883            &Default::default(),
1884            &[(
1885                "in".into(),
1886                schema::Definition::new_with_default_metadata(
1887                    Kind::any_object(),
1888                    [LogNamespace::Legacy],
1889                ),
1890            )],
1891        );
1892
1893        let wanted = schema::Definition::new_with_default_metadata(
1894            Kind::object(Collection::from_unknown(Kind::undefined())),
1895            [LogNamespace::Legacy],
1896        )
1897        .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1898
1899        assert_eq!(
1900            HashMap::from([(OutputId::from("in"), wanted)]),
1901            outputs1[0].schema_definitions(true),
1902        );
1903    }
1904
1905    #[test]
1906    fn test_array_and_non_object_events() {
1907        let transform1 = RemapConfig {
1908            source: Some(
1909                indoc! {r#"
1910                    if .lizard == true {
1911                        .thing = [{"cabbage": 42}];
1912                        . = unnest(.thing)
1913                    } else {
1914                      . = "fish"
1915                    }
1916                    "#}
1917                .to_string(),
1918            ),
1919            ..Default::default()
1920        };
1921
1922        let outputs1 = transform1.outputs(
1923            &Default::default(),
1924            &[(
1925                "in".into(),
1926                schema::Definition::new_with_default_metadata(
1927                    Kind::any_object(),
1928                    [LogNamespace::Legacy],
1929                ),
1930            )],
1931        );
1932
1933        let wanted = schema::Definition::new_with_default_metadata(
1934            Kind::any_object(),
1935            [LogNamespace::Legacy],
1936        )
1937        .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1938        .with_event_field(
1939            &owned_value_path!("thing"),
1940            Kind::object(Collection::from(BTreeMap::from([(
1941                "cabbage".into(),
1942                Kind::integer(),
1943            )])))
1944            .or_undefined(),
1945            None,
1946        );
1947
1948        assert_eq!(
1949            HashMap::from([(OutputId::from("in"), wanted)]),
1950            outputs1[0].schema_definitions(true),
1951        );
1952    }
1953
1954    #[test]
1955    fn check_remap_array_vector_namespace() {
1956        let event = {
1957            let mut event = LogEvent::from("input");
1958            // mark the event as a "Vector" namespaced log
1959            event
1960                .metadata_mut()
1961                .value_mut()
1962                .insert("vector", BTreeMap::new());
1963            Event::from(event)
1964        };
1965
1966        let conf = RemapConfig {
1967            source: Some(
1968                r". = [null]
1969"
1970                .to_string(),
1971            ),
1972            file: None,
1973            drop_on_error: true,
1974            drop_on_abort: false,
1975            ..Default::default()
1976        };
1977        let mut tform = remap(conf.clone()).unwrap();
1978        let result = transform_one(&mut tform, event).unwrap();
1979
1980        // Legacy namespace nests this under "message", Vector should set it as the root
1981        assert_eq!(result.as_log().get("."), Some(&Value::Null));
1982
1983        let outputs1 = conf.outputs(
1984            &Default::default(),
1985            &[(
1986                "in".into(),
1987                schema::Definition::new_with_default_metadata(
1988                    Kind::any_object(),
1989                    [LogNamespace::Vector],
1990                ),
1991            )],
1992        );
1993
1994        let wanted =
1995            schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
1996
1997        assert_eq!(
1998            HashMap::from([(OutputId::from("in"), wanted)]),
1999            outputs1[0].schema_definitions(true),
2000        );
2001    }
2002
2003    fn assert_no_metrics(source: String) {
2004        vector_lib::metrics::init_test();
2005
2006        let config = RemapConfig {
2007            source: Some(source),
2008            drop_on_error: true,
2009            drop_on_abort: true,
2010            reroute_dropped: true,
2011            ..Default::default()
2012        };
2013        let mut ast_runner = remap(config).unwrap();
2014        let input_event =
2015            Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2016        let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2017        let dropped_log = dropped_event.as_log();
2018        assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2019
2020        let controller = Controller::get().expect("no controller");
2021        let metrics = controller
2022            .capture_metrics()
2023            .into_iter()
2024            .map(|metric| (metric.name().to_string(), metric))
2025            .collect::<BTreeMap<String, Metric>>();
2026        assert_eq!(metrics.get("component_discarded_events_total"), None);
2027        assert_eq!(metrics.get("component_errors_total"), None);
2028    }
2029    #[test]
2030    fn do_not_emit_metrics_when_dropped() {
2031        assert_no_metrics("abort".to_string());
2032    }
2033
2034    #[test]
2035    fn do_not_emit_metrics_when_errored() {
2036        assert_no_metrics("parse_key_value!(.message)".to_string());
2037    }
2038}