vector/transforms/
remap.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    fs::File,
4    io::{self, Read},
5    path::PathBuf,
6    sync::Mutex,
7};
8
9use snafu::{ResultExt, Snafu};
10use vector_lib::{
11    TimeZone,
12    codecs::MetricTagValues,
13    compile_vrl,
14    config::LogNamespace,
15    configurable::configurable_component,
16    enrichment::TableRegistry,
17    lookup::{PathPrefix, metadata_path, owned_value_path},
18    schema::Definition,
19};
20use vector_vrl_functions::set_semantic_meaning::MeaningList;
21use vrl::{
22    compiler::{
23        CompileConfig, ExpressionError, Program, TypeState, VrlRuntime,
24        runtime::{Runtime, Terminate},
25        state::ExternalEnv,
26    },
27    diagnostic::{DiagnosticMessage, Note},
28    path,
29    path::ValuePath,
30    value::{Kind, Value},
31};
32
33use crate::{
34    Result,
35    config::{
36        ComponentKey, DataType, Input, OutputId, TransformConfig, TransformContext,
37        TransformOutput, log_schema,
38    },
39    event::{Event, TargetEvents, VrlTarget},
40    format_vrl_diagnostics,
41    internal_events::{RemapMappingAbort, RemapMappingError},
42    schema,
43    transforms::{SyncTransform, Transform, TransformOutputsBuf},
44};
45
46const DROPPED: &str = "dropped";
47type CacheKey = (TableRegistry, schema::Definition);
48type CacheValue = (Program, String, MeaningList);
49
50/// Configuration for the `remap` transform.
51#[configurable_component(transform(
52    "remap",
53    "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
54))]
55#[derive(Derivative)]
56#[serde(deny_unknown_fields)]
57#[derivative(Default, Debug)]
58pub struct RemapConfig {
59    /// The [Vector Remap Language][vrl] (VRL) program to execute for each event.
60    ///
61    /// Required if `file` is missing.
62    ///
63    /// [vrl]: https://vector.dev/docs/reference/vrl
64    #[configurable(metadata(
65        docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
66        docs::syntax_override = "remap_program"
67    ))]
68    pub source: Option<String>,
69
70    /// File path to the [Vector Remap Language][vrl] (VRL) program to execute for each event.
71    ///
72    /// If a relative path is provided, its root is the current working directory.
73    ///
74    /// Required if `source` is missing.
75    ///
76    /// [vrl]: https://vector.dev/docs/reference/vrl
77    #[configurable(metadata(docs::examples = "./my/program.vrl"))]
78    pub file: Option<PathBuf>,
79
80    /// File paths to the [Vector Remap Language][vrl] (VRL) programs to execute for each event.
81    ///
82    /// If a relative path is provided, its root is the current working directory.
83    ///
84    /// Required if `source` or `file` are missing.
85    ///
86    /// [vrl]: https://vector.dev/docs/reference/vrl
87    #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
88    pub files: Option<Vec<PathBuf>>,
89
90    /// When set to `single`, metric tag values are exposed as single strings, the
91    /// same as they were before this config option. Tags with multiple values show the last assigned value, and null values
92    /// are ignored.
93    ///
94    /// When set to `full`, all metric tags are exposed as arrays of either string or null
95    /// values.
96    #[serde(default)]
97    pub metric_tag_values: MetricTagValues,
98
99    /// The name of the timezone to apply to timestamp conversions that do not contain an explicit
100    /// time zone.
101    ///
102    /// This overrides the [global `timezone`][global_timezone] option. The time zone name may be
103    /// any name in the [TZ database][tz_database], or `local` to indicate system local time.
104    ///
105    /// [global_timezone]: https://vector.dev/docs/reference/configuration//global-options#timezone
106    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
107    #[serde(default)]
108    #[configurable(metadata(docs::advanced))]
109    pub timezone: Option<TimeZone>,
110
111    /// Drops any event that encounters an error during processing.
112    ///
113    /// Normally, if a VRL program encounters an error when processing an event, the original,
114    /// unmodified event is sent downstream. In some cases, you may not want to send the event
115    /// any further, such as if certain transformation or enrichment is strictly required. Setting
116    /// `drop_on_error` to `true` allows you to ensure these events do not get processed any
117    /// further.
118    ///
119    /// Additionally, dropped events can potentially be diverted to a specially named output for
120    /// further logging and analysis by setting `reroute_dropped`.
121    #[serde(default = "crate::serde::default_false")]
122    #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
123    pub drop_on_error: bool,
124
125    /// Drops any event that is manually aborted during processing.
126    ///
127    /// If a VRL program is manually aborted (using [`abort`][vrl_docs_abort]) when
128    /// processing an event, this option controls whether the original, unmodified event is sent
129    /// downstream without any modifications or if it is dropped.
130    ///
131    /// Additionally, dropped events can potentially be diverted to a specially-named output for
132    /// further logging and analysis by setting `reroute_dropped`.
133    ///
134    /// [vrl_docs_abort]: https://vector.dev/docs/reference/vrl/expressions/#abort
135    #[serde(default = "crate::serde::default_true")]
136    #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
137    pub drop_on_abort: bool,
138
139    /// Reroutes dropped events to a named output instead of halting processing on them.
140    ///
141    /// When using `drop_on_error` or `drop_on_abort`, events that are "dropped" are processed no
142    /// further. In some cases, it may be desirable to keep the events around for further analysis,
143    /// debugging, or retrying.
144    ///
145    /// In these cases, `reroute_dropped` can be set to `true` which forwards the original event
146    /// to a specially-named output, `dropped`. The original event is annotated with additional
147    /// fields describing why the event was dropped.
148    #[serde(default = "crate::serde::default_false")]
149    #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
150    pub reroute_dropped: bool,
151
152    #[configurable(derived, metadata(docs::hidden))]
153    #[serde(default)]
154    pub runtime: VrlRuntime,
155
156    #[configurable(derived, metadata(docs::hidden))]
157    #[serde(skip)]
158    #[derivative(Debug = "ignore")]
159    /// Cache can't be `BTreeMap` or `HashMap` because of `TableRegistry`, which doesn't allow us to inspect tables inside it.
160    /// And even if we allowed the inspection, the tables can be huge, resulting in a long comparison or hash computation
161    /// while using `Vec` allows us to use just a shallow comparison
162    pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
163}
164
165impl Clone for RemapConfig {
166    fn clone(&self) -> Self {
167        Self {
168            source: self.source.clone(),
169            file: self.file.clone(),
170            files: self.files.clone(),
171            metric_tag_values: self.metric_tag_values,
172            timezone: self.timezone,
173            drop_on_error: self.drop_on_error,
174            drop_on_abort: self.drop_on_abort,
175            reroute_dropped: self.reroute_dropped,
176            runtime: self.runtime,
177            cache: Mutex::new(Default::default()),
178        }
179    }
180}
181
182impl RemapConfig {
183    fn compile_vrl_program(
184        &self,
185        enrichment_tables: TableRegistry,
186        merged_schema_definition: schema::Definition,
187    ) -> Result<(Program, String, MeaningList)> {
188        if let Some((_, res)) = self
189            .cache
190            .lock()
191            .expect("Data poisoned")
192            .iter()
193            .find(|v| v.0.0 == enrichment_tables && v.0.1 == merged_schema_definition)
194        {
195            return res.clone().map_err(Into::into);
196        }
197
198        let source = match (&self.source, &self.file, &self.files) {
199            (Some(source), None, None) => source.to_owned(),
200            (None, Some(path), None) => Self::read_file(path)?,
201            (None, None, Some(paths)) => {
202                let mut combined_source = String::new();
203                for path in paths {
204                    let content = Self::read_file(path)?;
205                    combined_source.push_str(&content);
206                    combined_source.push('\n');
207                }
208                combined_source
209            }
210            _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
211        };
212
213        let mut functions = vrl::stdlib::all();
214        functions.append(&mut vector_lib::enrichment::vrl_functions());
215        #[cfg(feature = "sources-dnstap")]
216        functions.append(&mut dnstap_parser::vrl_functions());
217        functions.append(&mut vector_vrl_functions::all());
218
219        let state = TypeState {
220            local: Default::default(),
221            external: ExternalEnv::new_with_kind(
222                merged_schema_definition.event_kind().clone(),
223                merged_schema_definition.metadata_kind().clone(),
224            ),
225        };
226        let mut config = CompileConfig::default();
227
228        config.set_custom(enrichment_tables.clone());
229        config.set_custom(MeaningList::default());
230
231        let res = compile_vrl(&source, &functions, &state, config)
232            .map_err(|diagnostics| format_vrl_diagnostics(&source, diagnostics))
233            .map(|result| {
234                (
235                    result.program,
236                    format_vrl_diagnostics(&source, result.warnings),
237                    result.config.get_custom::<MeaningList>().unwrap().clone(),
238                )
239            });
240
241        self.cache
242            .lock()
243            .expect("Data poisoned")
244            .push(((enrichment_tables, merged_schema_definition), res.clone()));
245
246        res.map_err(Into::into)
247    }
248
249    fn read_file(path: &PathBuf) -> Result<String> {
250        let mut buffer = String::new();
251        File::open(path)
252            .with_context(|_| FileOpenFailedSnafu { path })?
253            .read_to_string(&mut buffer)
254            .with_context(|_| FileReadFailedSnafu { path })?;
255        Ok(buffer)
256    }
257}
258
259impl_generate_config_from_default!(RemapConfig);
260
261#[async_trait::async_trait]
262#[typetag::serde(name = "remap")]
263impl TransformConfig for RemapConfig {
264    async fn build(&self, context: &TransformContext) -> Result<Transform> {
265        let (transform, warnings) = match self.runtime {
266            VrlRuntime::Ast => {
267                let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
268                (Transform::synchronous(remap), warnings)
269            }
270        };
271
272        // TODO: We could improve on this by adding support for non-fatal error
273        // messages in the topology. This would make the topology responsible
274        // for printing warnings (including potentially emitting metrics),
275        // instead of individual transforms.
276        if !warnings.is_empty() {
277            warn!(message = "VRL compilation warning.", %warnings);
278        }
279
280        Ok(transform)
281    }
282
283    fn input(&self) -> Input {
284        Input::all()
285    }
286
287    fn outputs(
288        &self,
289        enrichment_tables: vector_lib::enrichment::TableRegistry,
290        input_definitions: &[(OutputId, schema::Definition)],
291        _: LogNamespace,
292    ) -> Vec<TransformOutput> {
293        let merged_definition: Definition = input_definitions
294            .iter()
295            .map(|(_output, definition)| definition.clone())
296            .reduce(Definition::merge)
297            .unwrap_or_else(Definition::any);
298
299        // We need to compile the VRL program in order to know the schema definition output of this
300        // transform. We ignore any compilation errors, as those are caught by the transform build
301        // step.
302        let compiled = self
303            .compile_vrl_program(enrichment_tables, merged_definition)
304            .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
305            .map_err(|_| ());
306
307        let mut dropped_definitions = HashMap::new();
308        let mut default_definitions = HashMap::new();
309
310        for (output_id, input_definition) in input_definitions {
311            let default_definition = compiled
312                .clone()
313                .map(|(state, meaning)| {
314                    let mut new_type_def = Definition::new(
315                        state.external.target_kind().clone(),
316                        state.external.metadata_kind().clone(),
317                        input_definition.log_namespaces().clone(),
318                    );
319
320                    for (id, path) in input_definition.meanings() {
321                        // Attempt to copy over the meanings from the input definition.
322                        // The function will fail if the meaning that now points to a field that no longer exists,
323                        // this is fine since we will no longer want that meaning in the output definition.
324                        let _ = new_type_def.try_with_meaning(path.clone(), id);
325                    }
326
327                    // Apply any semantic meanings set in the VRL program
328                    for (id, path) in meaning {
329                        // currently only event paths are supported
330                        new_type_def = new_type_def.with_meaning(path, &id);
331                    }
332                    new_type_def
333                })
334                .unwrap_or_else(|_| {
335                    Definition::new_with_default_metadata(
336                        // The program failed to compile, so it can "never" return a value
337                        Kind::never(),
338                        input_definition.log_namespaces().clone(),
339                    )
340                });
341
342            // When a message is dropped and re-routed, we keep the original event, but also annotate
343            // it with additional metadata.
344            let dropped_definition = Definition::combine_log_namespaces(
345                input_definition.log_namespaces(),
346                input_definition.clone().with_event_field(
347                    log_schema().metadata_key().expect("valid metadata key"),
348                    Kind::object(BTreeMap::from([
349                        ("reason".into(), Kind::bytes()),
350                        ("message".into(), Kind::bytes()),
351                        ("component_id".into(), Kind::bytes()),
352                        ("component_type".into(), Kind::bytes()),
353                        ("component_kind".into(), Kind::bytes()),
354                    ])),
355                    Some("metadata"),
356                ),
357                input_definition
358                    .clone()
359                    .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
360                    .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
361                    .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
362                    .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
363                    .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
364            );
365
366            default_definitions.insert(
367                output_id.clone(),
368                VrlTarget::modify_schema_definition_for_into_events(default_definition),
369            );
370            dropped_definitions.insert(
371                output_id.clone(),
372                VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
373            );
374        }
375
376        let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
377
378        if self.reroute_dropped {
379            vec![
380                default_output,
381                TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
382            ]
383        } else {
384            vec![default_output]
385        }
386    }
387
388    fn enable_concurrency(&self) -> bool {
389        true
390    }
391
392    fn files_to_watch(&self) -> Vec<&PathBuf> {
393        self.file
394            .iter()
395            .chain(self.files.iter().flatten())
396            .collect()
397    }
398}
399
400#[derive(Debug, Clone)]
401pub struct Remap<Runner>
402where
403    Runner: VrlRunner,
404{
405    component_key: Option<ComponentKey>,
406    program: Program,
407    timezone: TimeZone,
408    drop_on_error: bool,
409    drop_on_abort: bool,
410    reroute_dropped: bool,
411    runner: Runner,
412    metric_tag_values: MetricTagValues,
413}
414
415pub trait VrlRunner {
416    fn run(
417        &mut self,
418        target: &mut VrlTarget,
419        program: &Program,
420        timezone: &TimeZone,
421    ) -> std::result::Result<Value, Terminate>;
422}
423
424#[derive(Debug)]
425pub struct AstRunner {
426    pub runtime: Runtime,
427}
428
429impl Clone for AstRunner {
430    fn clone(&self) -> Self {
431        Self {
432            runtime: Runtime::default(),
433        }
434    }
435}
436
437impl VrlRunner for AstRunner {
438    fn run(
439        &mut self,
440        target: &mut VrlTarget,
441        program: &Program,
442        timezone: &TimeZone,
443    ) -> std::result::Result<Value, Terminate> {
444        let result = self.runtime.resolve(target, program, timezone);
445        self.runtime.clear();
446        result
447    }
448}
449
450impl Remap<AstRunner> {
451    pub fn new_ast(
452        config: RemapConfig,
453        context: &TransformContext,
454    ) -> crate::Result<(Self, String)> {
455        let (program, warnings, _) = config.compile_vrl_program(
456            context.enrichment_tables.clone(),
457            context.merged_schema_definition.clone(),
458        )?;
459
460        let runtime = Runtime::default();
461        let runner = AstRunner { runtime };
462
463        Self::new(config, context, program, runner).map(|remap| (remap, warnings))
464    }
465}
466
467impl<Runner> Remap<Runner>
468where
469    Runner: VrlRunner,
470{
471    fn new(
472        config: RemapConfig,
473        context: &TransformContext,
474        program: Program,
475        runner: Runner,
476    ) -> crate::Result<Self> {
477        Ok(Remap {
478            component_key: context.key.clone(),
479            program,
480            timezone: config
481                .timezone
482                .unwrap_or_else(|| context.globals.timezone()),
483            drop_on_error: config.drop_on_error,
484            drop_on_abort: config.drop_on_abort,
485            reroute_dropped: config.reroute_dropped,
486            runner,
487            metric_tag_values: config.metric_tag_values,
488        })
489    }
490
491    #[cfg(test)]
492    const fn runner(&self) -> &Runner {
493        &self.runner
494    }
495
496    fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
497        let message = error
498            .notes()
499            .iter()
500            .filter(|note| matches!(note, Note::UserErrorMessage(_)))
501            .next_back()
502            .map(|note| note.to_string())
503            .unwrap_or_else(|| error.to_string());
504        serde_json::json!({
505                "reason": reason,
506                "message": message,
507                "component_id": self.component_key,
508                "component_type": "remap",
509                "component_kind": "transform",
510        })
511    }
512
513    fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
514        match event {
515            Event::Log(log) => match log.namespace() {
516                LogNamespace::Legacy => {
517                    if let Some(metadata_key) = log_schema().metadata_key() {
518                        log.insert(
519                            (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
520                            self.dropped_data(reason, error),
521                        );
522                    }
523                }
524                LogNamespace::Vector => {
525                    log.insert(
526                        metadata_path!("vector", "dropped"),
527                        self.dropped_data(reason, error),
528                    );
529                }
530            },
531            Event::Metric(metric) => {
532                if let Some(metadata_key) = log_schema().metadata_key() {
533                    metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
534                    metric.replace_tag(
535                        format!("{metadata_key}.dropped.component_id"),
536                        self.component_key
537                            .as_ref()
538                            .map(ToString::to_string)
539                            .unwrap_or_default(),
540                    );
541                    metric.replace_tag(
542                        format!("{metadata_key}.dropped.component_type"),
543                        "remap".into(),
544                    );
545                    metric.replace_tag(
546                        format!("{metadata_key}.dropped.component_kind"),
547                        "transform".into(),
548                    );
549                }
550            }
551            Event::Trace(trace) => {
552                trace.maybe_insert(log_schema().metadata_key_target_path(), || {
553                    self.dropped_data(reason, error).into()
554                });
555            }
556        }
557    }
558
559    fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
560        self.runner.run(target, &self.program, &self.timezone)
561    }
562}
563
564impl<Runner> SyncTransform for Remap<Runner>
565where
566    Runner: VrlRunner + Clone + Send + Sync,
567{
568    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
569        // If a program can fail or abort at runtime and we know that we will still need to forward
570        // the event in that case (either to the main output or `dropped`, depending on the
571        // config), we need to clone the original event and keep it around, to allow us to discard
572        // any mutations made to the event while the VRL program runs, before it failed or aborted.
573        //
574        // The `drop_on_{error, abort}` transform config allows operators to remove events from the
575        // main output if they're failed or aborted, in which case we can skip the cloning, since
576        // any mutations made by VRL will be ignored regardless. If they hav configured
577        // `reroute_dropped`, however, we still need to do the clone to ensure that we can forward
578        // the event to the `dropped` output.
579        let forward_on_error = !self.drop_on_error || self.reroute_dropped;
580        let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
581        let original_event = if (self.program.info().fallible && forward_on_error)
582            || (self.program.info().abortable && forward_on_abort)
583        {
584            Some(event.clone())
585        } else {
586            None
587        };
588
589        let log_namespace = event
590            .maybe_as_log()
591            .map(|log| log.namespace())
592            .unwrap_or(LogNamespace::Legacy);
593
594        let mut target = VrlTarget::new(
595            event,
596            self.program.info(),
597            match self.metric_tag_values {
598                MetricTagValues::Single => false,
599                MetricTagValues::Full => true,
600            },
601        );
602        let result = self.run_vrl(&mut target);
603
604        match result {
605            Ok(_) => match target.into_events(log_namespace) {
606                TargetEvents::One(event) => push_default(event, output),
607                TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
608                TargetEvents::Traces(events) => {
609                    events.for_each(|event| push_default(event, output))
610                }
611            },
612            Err(reason) => {
613                let (reason, error, drop) = match reason {
614                    Terminate::Abort(error) => {
615                        if !self.reroute_dropped {
616                            emit!(RemapMappingAbort {
617                                event_dropped: self.drop_on_abort,
618                            });
619                        }
620                        ("abort", error, self.drop_on_abort)
621                    }
622                    Terminate::Error(error) => {
623                        if !self.reroute_dropped {
624                            emit!(RemapMappingError {
625                                error: error.to_string(),
626                                event_dropped: self.drop_on_error,
627                            });
628                        }
629                        ("error", error, self.drop_on_error)
630                    }
631                };
632
633                if !drop {
634                    let event = original_event.expect("event will be set");
635
636                    push_default(event, output);
637                } else if self.reroute_dropped {
638                    let mut event = original_event.expect("event will be set");
639
640                    self.annotate_dropped(&mut event, reason, error);
641                    push_dropped(event, output);
642                }
643            }
644        }
645    }
646}
647
648#[inline]
649fn push_default(event: Event, output: &mut TransformOutputsBuf) {
650    output.push(None, event)
651}
652
653#[inline]
654fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
655    output.push(Some(DROPPED), event);
656}
657
658#[derive(Debug, Snafu)]
659pub enum BuildError {
660    #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
661    SourceAndOrFileOrFiles,
662
663    #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
664    FileOpenFailed { path: PathBuf, source: io::Error },
665    #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
666    FileReadFailed { path: PathBuf, source: io::Error },
667}
668
669#[cfg(test)]
670mod tests {
671    use std::{
672        collections::{HashMap, HashSet},
673        sync::Arc,
674    };
675
676    use chrono::DateTime;
677    use indoc::{formatdoc, indoc};
678    use tokio::sync::mpsc;
679    use tokio_stream::wrappers::ReceiverStream;
680    use vector_lib::{
681        config::GlobalOptions, enrichment::TableRegistry, event::EventMetadata, metric_tags,
682    };
683    use vrl::{btreemap, event_path, value::kind::Collection};
684
685    use super::*;
686    use crate::{
687        config::{ConfigBuilder, build_unit_tests},
688        event::{
689            LogEvent, Metric, Value,
690            metric::{MetricKind, MetricValue},
691        },
692        metrics::Controller,
693        schema,
694        test_util::components::{
695            COMPONENT_MULTIPLE_OUTPUTS_TESTS, assert_transform_compliance, init_test,
696        },
697        transforms::{OutputBuffer, test::create_topology},
698    };
699
700    fn test_default_schema_definition() -> schema::Definition {
701        schema::Definition::empty_legacy_namespace().with_event_field(
702            &owned_value_path!("a default field"),
703            Kind::integer().or_bytes(),
704            Some("default"),
705        )
706    }
707
708    fn test_dropped_schema_definition() -> schema::Definition {
709        schema::Definition::empty_legacy_namespace().with_event_field(
710            &owned_value_path!("a dropped field"),
711            Kind::boolean().or_null(),
712            Some("dropped"),
713        )
714    }
715
716    fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
717        let schema_definitions = HashMap::from([
718            (
719                None,
720                [("source".into(), test_default_schema_definition())].into(),
721            ),
722            (
723                Some(DROPPED.to_owned()),
724                [("source".into(), test_dropped_schema_definition())].into(),
725            ),
726        ]);
727
728        Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
729            .map(|(remap, _)| remap)
730    }
731
732    #[test]
733    fn generate_config() {
734        crate::test_util::test_generate_config::<RemapConfig>();
735    }
736
737    #[test]
738    fn config_missing_source_and_file() {
739        let config = RemapConfig {
740            source: None,
741            file: None,
742            ..Default::default()
743        };
744
745        let err = remap(config).unwrap_err().to_string();
746        assert_eq!(
747            &err,
748            "must provide exactly one of `source` or `file` or `files` configuration"
749        )
750    }
751
752    #[test]
753    fn config_both_source_and_file() {
754        let config = RemapConfig {
755            source: Some("".to_owned()),
756            file: Some("".into()),
757            ..Default::default()
758        };
759
760        let err = remap(config).unwrap_err().to_string();
761        assert_eq!(
762            &err,
763            "must provide exactly one of `source` or `file` or `files` configuration"
764        )
765    }
766
767    fn get_field_string(event: &Event, field: &str) -> String {
768        event
769            .as_log()
770            .get(field)
771            .unwrap()
772            .to_string_lossy()
773            .into_owned()
774    }
775
776    #[test]
777    fn check_remap_doesnt_share_state_between_events() {
778        let conf = RemapConfig {
779            source: Some(".foo = .sentinel".to_string()),
780            file: None,
781            drop_on_error: true,
782            drop_on_abort: false,
783            ..Default::default()
784        };
785        let mut tform = remap(conf).unwrap();
786        assert!(tform.runner().runtime.is_empty());
787
788        let event1 = {
789            let mut event1 = LogEvent::from("event1");
790            event1.insert("sentinel", "bar");
791            Event::from(event1)
792        };
793        let result1 = transform_one(&mut tform, event1).unwrap();
794        assert_eq!(get_field_string(&result1, "message"), "event1");
795        assert_eq!(get_field_string(&result1, "foo"), "bar");
796        assert!(tform.runner().runtime.is_empty());
797
798        let event2 = {
799            let event2 = LogEvent::from("event2");
800            Event::from(event2)
801        };
802        let result2 = transform_one(&mut tform, event2).unwrap();
803        assert_eq!(get_field_string(&result2, "message"), "event2");
804        assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
805        assert!(tform.runner().runtime.is_empty());
806    }
807
808    #[test]
809    fn remap_return_raw_string_vector_namespace() {
810        let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
811
812        let event = {
813            let mut metadata = EventMetadata::default()
814                .with_schema_definition(&Arc::new(initial_definition.clone()));
815            // the Vector metadata field is required for an event to correctly detect the namespace at runtime
816            metadata
817                .value_mut()
818                .insert(&owned_value_path!("vector"), BTreeMap::new());
819
820            let mut event = LogEvent::new_with_metadata(metadata);
821            event.insert("copy_from", "buz");
822            Event::from(event)
823        };
824
825        let conf = RemapConfig {
826            source: Some(r#"  . = "root string";"#.to_string()),
827            file: None,
828            drop_on_error: true,
829            drop_on_abort: false,
830            ..Default::default()
831        };
832        let mut tform = remap(conf.clone()).unwrap();
833        let result = transform_one(&mut tform, event).unwrap();
834        assert_eq!(get_field_string(&result, "."), "root string");
835
836        let mut outputs = conf.outputs(
837            TableRegistry::default(),
838            &[(OutputId::dummy(), initial_definition)],
839            LogNamespace::Vector,
840        );
841
842        assert_eq!(outputs.len(), 1);
843        let output = outputs.pop().unwrap();
844        assert_eq!(output.port, None);
845        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
846        let expected_schema =
847            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
848        assert_eq!(actual_schema_def, expected_schema);
849    }
850
851    #[test]
852    fn check_remap_adds() {
853        let event = {
854            let mut event = LogEvent::from("augment me");
855            event.insert("copy_from", "buz");
856            Event::from(event)
857        };
858
859        let conf = RemapConfig {
860            source: Some(
861                r#"  .foo = "bar"
862  .bar = "baz"
863  .copy = .copy_from
864"#
865                .to_string(),
866            ),
867            file: None,
868            drop_on_error: true,
869            drop_on_abort: false,
870            ..Default::default()
871        };
872        let mut tform = remap(conf).unwrap();
873        let result = transform_one(&mut tform, event).unwrap();
874        assert_eq!(get_field_string(&result, "message"), "augment me");
875        assert_eq!(get_field_string(&result, "copy_from"), "buz");
876        assert_eq!(get_field_string(&result, "foo"), "bar");
877        assert_eq!(get_field_string(&result, "bar"), "baz");
878        assert_eq!(get_field_string(&result, "copy"), "buz");
879    }
880
881    #[test]
882    fn check_remap_emits_multiple() {
883        let event = {
884            let mut event = LogEvent::from("augment me");
885            event.insert(
886                "events",
887                vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
888            );
889            Event::from(event)
890        };
891
892        let conf = RemapConfig {
893            source: Some(
894                indoc! {r"
895                . = .events
896            "}
897                .to_owned(),
898            ),
899            file: None,
900            drop_on_error: true,
901            drop_on_abort: false,
902            ..Default::default()
903        };
904        let mut tform = remap(conf).unwrap();
905
906        let out = collect_outputs(&mut tform, event);
907        assert_eq!(2, out.primary.len());
908        let mut result = out.primary.into_events();
909
910        let r = result.next().unwrap();
911        assert_eq!(get_field_string(&r, "message"), "foo");
912        let r = result.next().unwrap();
913        assert_eq!(get_field_string(&r, "message"), "bar");
914    }
915
916    #[test]
917    fn check_remap_error() {
918        let event = {
919            let mut event = Event::Log(LogEvent::from("augment me"));
920            event.as_mut_log().insert("bar", "is a string");
921            event
922        };
923
924        let conf = RemapConfig {
925            source: Some(formatdoc! {r#"
926                .foo = "foo"
927                .not_an_int = int!(.bar)
928                .baz = 12
929            "#}),
930            file: None,
931            drop_on_error: false,
932            drop_on_abort: false,
933            ..Default::default()
934        };
935        let mut tform = remap(conf).unwrap();
936
937        let event = transform_one(&mut tform, event).unwrap();
938
939        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
940        assert!(event.as_log().get("foo").is_none());
941        assert!(event.as_log().get("baz").is_none());
942    }
943
944    #[test]
945    fn check_remap_error_drop() {
946        let event = {
947            let mut event = Event::Log(LogEvent::from("augment me"));
948            event.as_mut_log().insert("bar", "is a string");
949            event
950        };
951
952        let conf = RemapConfig {
953            source: Some(formatdoc! {r#"
954                .foo = "foo"
955                .not_an_int = int!(.bar)
956                .baz = 12
957            "#}),
958            file: None,
959            drop_on_error: true,
960            drop_on_abort: false,
961            ..Default::default()
962        };
963        let mut tform = remap(conf).unwrap();
964
965        assert!(transform_one(&mut tform, event).is_none())
966    }
967
968    #[test]
969    fn check_remap_error_infallible() {
970        let event = {
971            let mut event = Event::Log(LogEvent::from("augment me"));
972            event.as_mut_log().insert("bar", "is a string");
973            event
974        };
975
976        let conf = RemapConfig {
977            source: Some(formatdoc! {r#"
978                .foo = "foo"
979                .baz = 12
980            "#}),
981            file: None,
982            drop_on_error: false,
983            drop_on_abort: false,
984            ..Default::default()
985        };
986        let mut tform = remap(conf).unwrap();
987
988        let event = transform_one(&mut tform, event).unwrap();
989
990        assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
991        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
992        assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
993    }
994
995    #[test]
996    fn check_remap_abort() {
997        let event = {
998            let mut event = Event::Log(LogEvent::from("augment me"));
999            event.as_mut_log().insert("bar", "is a string");
1000            event
1001        };
1002
1003        let conf = RemapConfig {
1004            source: Some(formatdoc! {r#"
1005                .foo = "foo"
1006                abort
1007                .baz = 12
1008            "#}),
1009            file: None,
1010            drop_on_error: false,
1011            drop_on_abort: false,
1012            ..Default::default()
1013        };
1014        let mut tform = remap(conf).unwrap();
1015
1016        let event = transform_one(&mut tform, event).unwrap();
1017
1018        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1019        assert!(event.as_log().get("foo").is_none());
1020        assert!(event.as_log().get("baz").is_none());
1021    }
1022
1023    #[test]
1024    fn check_remap_abort_drop() {
1025        let event = {
1026            let mut event = Event::Log(LogEvent::from("augment me"));
1027            event.as_mut_log().insert("bar", "is a string");
1028            event
1029        };
1030
1031        let conf = RemapConfig {
1032            source: Some(formatdoc! {r#"
1033                .foo = "foo"
1034                abort
1035                .baz = 12
1036            "#}),
1037            file: None,
1038            drop_on_error: false,
1039            drop_on_abort: true,
1040            ..Default::default()
1041        };
1042        let mut tform = remap(conf).unwrap();
1043
1044        assert!(transform_one(&mut tform, event).is_none())
1045    }
1046
1047    #[test]
1048    fn check_remap_metric() {
1049        let metric = Event::Metric(Metric::new(
1050            "counter",
1051            MetricKind::Absolute,
1052            MetricValue::Counter { value: 1.0 },
1053        ));
1054        let metadata = metric.metadata().clone();
1055
1056        let conf = RemapConfig {
1057            source: Some(
1058                r#".tags.host = "zoobub"
1059                       .name = "zork"
1060                       .namespace = "zerk"
1061                       .kind = "incremental""#
1062                    .to_string(),
1063            ),
1064            file: None,
1065            drop_on_error: true,
1066            drop_on_abort: false,
1067            ..Default::default()
1068        };
1069        let mut tform = remap(conf).unwrap();
1070
1071        let result = transform_one(&mut tform, metric).unwrap();
1072        assert_eq!(
1073            result,
1074            Event::Metric(
1075                Metric::new_with_metadata(
1076                    "zork",
1077                    MetricKind::Incremental,
1078                    MetricValue::Counter { value: 1.0 },
1079                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1080                    // to the actual value to skip the assertion here
1081                    metadata
1082                )
1083                .with_namespace(Some("zerk"))
1084                .with_tags(Some(metric_tags! {
1085                    "host" => "zoobub",
1086                }))
1087            )
1088        );
1089    }
1090
1091    #[test]
1092    fn remap_timezone_fallback() {
1093        let error = Event::from_json_value(
1094            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1095            LogNamespace::Legacy,
1096        )
1097        .unwrap();
1098        let conf = RemapConfig {
1099            source: Some(formatdoc! {r#"
1100                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1101            "#}),
1102            drop_on_error: true,
1103            drop_on_abort: true,
1104            reroute_dropped: true,
1105            ..Default::default()
1106        };
1107        let context = TransformContext {
1108            key: Some(ComponentKey::from("remapper")),
1109            globals: GlobalOptions {
1110                timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1111                ..Default::default()
1112            },
1113            ..Default::default()
1114        };
1115        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1116
1117        let output = transform_one_fallible(&mut tform, error).unwrap();
1118        let log = output.as_log();
1119        assert_eq!(
1120            log["timestamp"],
1121            DateTime::<chrono::Utc>::from(
1122                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1123            )
1124            .into()
1125        );
1126    }
1127
1128    #[test]
1129    fn remap_timezone_override() {
1130        let error = Event::from_json_value(
1131            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1132            LogNamespace::Legacy,
1133        )
1134        .unwrap();
1135        let conf = RemapConfig {
1136            source: Some(formatdoc! {r#"
1137                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1138            "#}),
1139            drop_on_error: true,
1140            drop_on_abort: true,
1141            reroute_dropped: true,
1142            timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1143            ..Default::default()
1144        };
1145        let context = TransformContext {
1146            key: Some(ComponentKey::from("remapper")),
1147            globals: GlobalOptions {
1148                timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1149                ..Default::default()
1150            },
1151            ..Default::default()
1152        };
1153        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1154
1155        let output = transform_one_fallible(&mut tform, error).unwrap();
1156        let log = output.as_log();
1157        assert_eq!(
1158            log["timestamp"],
1159            DateTime::<chrono::Utc>::from(
1160                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1161            )
1162            .into()
1163        );
1164    }
1165
1166    #[test]
1167    fn check_remap_branching() {
1168        let happy =
1169            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1170                .unwrap();
1171        let abort = Event::from_json_value(
1172            serde_json::json!({"hello": "goodbye"}),
1173            LogNamespace::Legacy,
1174        )
1175        .unwrap();
1176        let error =
1177            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1178
1179        let happy_metric = {
1180            let mut metric = Metric::new(
1181                "counter",
1182                MetricKind::Absolute,
1183                MetricValue::Counter { value: 1.0 },
1184            );
1185            metric.replace_tag("hello".into(), "world".into());
1186            Event::Metric(metric)
1187        };
1188
1189        let abort_metric = {
1190            let mut metric = Metric::new(
1191                "counter",
1192                MetricKind::Absolute,
1193                MetricValue::Counter { value: 1.0 },
1194            );
1195            metric.replace_tag("hello".into(), "goodbye".into());
1196            Event::Metric(metric)
1197        };
1198
1199        let error_metric = {
1200            let mut metric = Metric::new(
1201                "counter",
1202                MetricKind::Absolute,
1203                MetricValue::Counter { value: 1.0 },
1204            );
1205            metric.replace_tag("not_hello".into(), "oops".into());
1206            Event::Metric(metric)
1207        };
1208
1209        let conf = RemapConfig {
1210            source: Some(formatdoc! {r#"
1211                if exists(.tags) {{
1212                    # metrics
1213                    .tags.foo = "bar"
1214                    if string!(.tags.hello) == "goodbye" {{
1215                      abort
1216                    }}
1217                }} else {{
1218                    # logs
1219                    .foo = "bar"
1220                    if string(.hello) == "goodbye" {{
1221                      abort
1222                    }}
1223                }}
1224            "#}),
1225            drop_on_error: true,
1226            drop_on_abort: true,
1227            reroute_dropped: true,
1228            ..Default::default()
1229        };
1230        let schema_definitions = HashMap::from([
1231            (
1232                None,
1233                [("source".into(), test_default_schema_definition())].into(),
1234            ),
1235            (
1236                Some(DROPPED.to_owned()),
1237                [("source".into(), test_dropped_schema_definition())].into(),
1238            ),
1239        ]);
1240        let context = TransformContext {
1241            key: Some(ComponentKey::from("remapper")),
1242            schema_definitions,
1243            merged_schema_definition: schema::Definition::new_with_default_metadata(
1244                Kind::any_object(),
1245                [LogNamespace::Legacy],
1246            )
1247            .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1248            ..Default::default()
1249        };
1250        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1251
1252        let output = transform_one_fallible(&mut tform, happy).unwrap();
1253        let log = output.as_log();
1254        assert_eq!(log["hello"], "world".into());
1255        assert_eq!(log["foo"], "bar".into());
1256        assert!(!log.contains(event_path!("metadata")));
1257
1258        let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1259        let log = output.as_log();
1260        assert_eq!(log["hello"], "goodbye".into());
1261        assert!(!log.contains(event_path!("foo")));
1262        assert_eq!(
1263            log["metadata"],
1264            serde_json::json!({
1265                "dropped": {
1266                    "reason": "abort",
1267                    "message": "aborted",
1268                    "component_id": "remapper",
1269                    "component_type": "remap",
1270                    "component_kind": "transform",
1271                }
1272            })
1273            .try_into()
1274            .unwrap()
1275        );
1276
1277        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1278        let log = output.as_log();
1279        assert_eq!(log["hello"], 42.into());
1280        assert!(!log.contains(event_path!("foo")));
1281        assert_eq!(
1282            log["metadata"],
1283            serde_json::json!({
1284                "dropped": {
1285                    "reason": "error",
1286                    "message": "function call error for \"string\" at (160:174): expected string, got integer",
1287                    "component_id": "remapper",
1288                    "component_type": "remap",
1289                    "component_kind": "transform",
1290                }
1291            })
1292            .try_into()
1293            .unwrap()
1294        );
1295
1296        let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1297        similar_asserts::assert_eq!(
1298            output,
1299            Event::Metric(
1300                Metric::new_with_metadata(
1301                    "counter",
1302                    MetricKind::Absolute,
1303                    MetricValue::Counter { value: 1.0 },
1304                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1305                    // to the actual value to skip the assertion here
1306                    EventMetadata::default()
1307                        .with_schema_definition(output.metadata().schema_definition()),
1308                )
1309                .with_tags(Some(metric_tags! {
1310                    "hello" => "world",
1311                    "foo" => "bar",
1312                }))
1313            )
1314        );
1315
1316        let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1317        similar_asserts::assert_eq!(
1318            output,
1319            Event::Metric(
1320                Metric::new_with_metadata(
1321                    "counter",
1322                    MetricKind::Absolute,
1323                    MetricValue::Counter { value: 1.0 },
1324                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1325                    // to the actual value to skip the assertion here
1326                    EventMetadata::default()
1327                        .with_schema_definition(output.metadata().schema_definition()),
1328                )
1329                .with_tags(Some(metric_tags! {
1330                    "hello" => "goodbye",
1331                    "metadata.dropped.reason" => "abort",
1332                    "metadata.dropped.component_id" => "remapper",
1333                    "metadata.dropped.component_type" => "remap",
1334                    "metadata.dropped.component_kind" => "transform",
1335                }))
1336            )
1337        );
1338
1339        let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1340        similar_asserts::assert_eq!(
1341            output,
1342            Event::Metric(
1343                Metric::new_with_metadata(
1344                    "counter",
1345                    MetricKind::Absolute,
1346                    MetricValue::Counter { value: 1.0 },
1347                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1348                    // to the actual value to skip the assertion here
1349                    EventMetadata::default()
1350                        .with_schema_definition(output.metadata().schema_definition()),
1351                )
1352                .with_tags(Some(metric_tags! {
1353                    "not_hello" => "oops",
1354                    "metadata.dropped.reason" => "error",
1355                    "metadata.dropped.component_id" => "remapper",
1356                    "metadata.dropped.component_type" => "remap",
1357                    "metadata.dropped.component_kind" => "transform",
1358                }))
1359            )
1360        );
1361    }
1362
1363    #[test]
1364    fn check_remap_branching_assert_with_message() {
1365        let error_trigger_assert_custom_message =
1366            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1367        let error_trigger_default_assert_message =
1368            Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1369        let conf = RemapConfig {
1370            source: Some(formatdoc! {r#"
1371                assert_eq!(.hello, 0, "custom message here")
1372                assert_eq!(.hello, 1)
1373            "#}),
1374            drop_on_error: true,
1375            drop_on_abort: true,
1376            reroute_dropped: true,
1377            ..Default::default()
1378        };
1379        let context = TransformContext {
1380            key: Some(ComponentKey::from("remapper")),
1381            ..Default::default()
1382        };
1383        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1384
1385        let output =
1386            transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1387        let log = output.as_log();
1388        assert_eq!(log["hello"], 42.into());
1389        assert!(!log.contains(event_path!("foo")));
1390        assert_eq!(
1391            log["metadata"],
1392            serde_json::json!({
1393                "dropped": {
1394                    "reason": "error",
1395                    "message": "custom message here",
1396                    "component_id": "remapper",
1397                    "component_type": "remap",
1398                    "component_kind": "transform",
1399                }
1400            })
1401            .try_into()
1402            .unwrap()
1403        );
1404
1405        let output =
1406            transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1407        let log = output.as_log();
1408        assert_eq!(log["hello"], 0.into());
1409        assert!(!log.contains(event_path!("foo")));
1410        assert_eq!(
1411            log["metadata"],
1412            serde_json::json!({
1413                "dropped": {
1414                    "reason": "error",
1415                    "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1416                    "component_id": "remapper",
1417                    "component_type": "remap",
1418                    "component_kind": "transform",
1419                }
1420            })
1421            .try_into()
1422            .unwrap()
1423        );
1424    }
1425
1426    #[test]
1427    fn check_remap_branching_abort_with_message() {
1428        let error =
1429            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1430        let conf = RemapConfig {
1431            source: Some(formatdoc! {r#"
1432                abort "custom message here"
1433            "#}),
1434            drop_on_error: true,
1435            drop_on_abort: true,
1436            reroute_dropped: true,
1437            ..Default::default()
1438        };
1439        let context = TransformContext {
1440            key: Some(ComponentKey::from("remapper")),
1441            ..Default::default()
1442        };
1443        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1444
1445        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1446        let log = output.as_log();
1447        assert_eq!(log["hello"], 42.into());
1448        assert!(!log.contains(event_path!("foo")));
1449        assert_eq!(
1450            log["metadata"],
1451            serde_json::json!({
1452                "dropped": {
1453                    "reason": "abort",
1454                    "message": "custom message here",
1455                    "component_id": "remapper",
1456                    "component_type": "remap",
1457                    "component_kind": "transform",
1458                }
1459            })
1460            .try_into()
1461            .unwrap()
1462        );
1463    }
1464
1465    #[test]
1466    fn check_remap_branching_disabled() {
1467        let happy =
1468            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1469                .unwrap();
1470        let abort = Event::from_json_value(
1471            serde_json::json!({"hello": "goodbye"}),
1472            LogNamespace::Legacy,
1473        )
1474        .unwrap();
1475        let error =
1476            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1477
1478        let conf = RemapConfig {
1479            source: Some(formatdoc! {r#"
1480                if exists(.tags) {{
1481                    # metrics
1482                    .tags.foo = "bar"
1483                    if string!(.tags.hello) == "goodbye" {{
1484                      abort
1485                    }}
1486                }} else {{
1487                    # logs
1488                    .foo = "bar"
1489                    if string!(.hello) == "goodbye" {{
1490                      abort
1491                    }}
1492                }}
1493            "#}),
1494            drop_on_error: true,
1495            drop_on_abort: true,
1496            reroute_dropped: false,
1497            ..Default::default()
1498        };
1499
1500        let schema_definition = schema::Definition::new_with_default_metadata(
1501            Kind::any_object(),
1502            [LogNamespace::Legacy],
1503        )
1504        .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1505        .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1506
1507        assert_eq!(
1508            conf.outputs(
1509                vector_lib::enrichment::TableRegistry::default(),
1510                &[(
1511                    "test".into(),
1512                    schema::Definition::new_with_default_metadata(
1513                        Kind::any_object(),
1514                        [LogNamespace::Legacy]
1515                    )
1516                )],
1517                LogNamespace::Legacy
1518            ),
1519            vec![TransformOutput::new(
1520                DataType::all_bits(),
1521                [("test".into(), schema_definition)].into()
1522            )]
1523        );
1524
1525        let context = TransformContext {
1526            key: Some(ComponentKey::from("remapper")),
1527            ..Default::default()
1528        };
1529        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1530
1531        let output = transform_one_fallible(&mut tform, happy).unwrap();
1532        let log = output.as_log();
1533        assert_eq!(log["hello"], "world".into());
1534        assert_eq!(log["foo"], "bar".into());
1535        assert!(!log.contains(event_path!("metadata")));
1536
1537        let out = collect_outputs(&mut tform, abort);
1538        assert!(out.primary.is_empty());
1539        assert!(out.named[DROPPED].is_empty());
1540
1541        let out = collect_outputs(&mut tform, error);
1542        assert!(out.primary.is_empty());
1543        assert!(out.named[DROPPED].is_empty());
1544    }
1545
1546    #[tokio::test]
1547    async fn check_remap_branching_metrics_with_output() {
1548        init_test();
1549
1550        let config: ConfigBuilder = toml::from_str(indoc! {r#"
1551            [transforms.foo]
1552            inputs = []
1553            type = "remap"
1554            drop_on_abort = true
1555            reroute_dropped = true
1556            source = "abort"
1557
1558            [[tests]]
1559            name = "metric output"
1560
1561            [tests.input]
1562                insert_at = "foo"
1563                value = "none"
1564
1565            [[tests.outputs]]
1566                extract_from = "foo.dropped"
1567                [[tests.outputs.conditions]]
1568                type = "vrl"
1569                source = "true"
1570        "#})
1571        .unwrap();
1572
1573        let mut tests = build_unit_tests(config).await.unwrap();
1574        assert!(tests.remove(0).run().await.errors.is_empty());
1575        // Check that metrics were emitted with output tag
1576        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1577    }
1578
1579    struct CollectedOuput {
1580        primary: OutputBuffer,
1581        named: HashMap<String, OutputBuffer>,
1582    }
1583
1584    fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput {
1585        let mut outputs = TransformOutputsBuf::new_with_capacity(
1586            vec![
1587                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1588                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1589            ],
1590            1,
1591        );
1592
1593        ft.transform(event, &mut outputs);
1594
1595        CollectedOuput {
1596            primary: outputs.take_primary(),
1597            named: outputs.take_all_named(),
1598        }
1599    }
1600
1601    fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1602        let out = collect_outputs(ft, event);
1603        assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1604        assert!(out.primary.len() <= 1);
1605        out.primary.into_events().next()
1606    }
1607
1608    fn transform_one_fallible(
1609        ft: &mut dyn SyncTransform,
1610        event: Event,
1611    ) -> std::result::Result<Event, Event> {
1612        let mut outputs = TransformOutputsBuf::new_with_capacity(
1613            vec![
1614                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1615                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1616            ],
1617            1,
1618        );
1619
1620        ft.transform(event, &mut outputs);
1621
1622        let mut buf = outputs.drain().collect::<Vec<_>>();
1623        let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1624
1625        assert!(buf.len() < 2);
1626        assert!(err_buf.len() < 2);
1627        match (buf.pop(), err_buf.pop()) {
1628            (Some(good), None) => Ok(good),
1629            (None, Some(bad)) => Err(bad),
1630            (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1631        }
1632    }
1633
1634    #[tokio::test]
1635    async fn emits_internal_events() {
1636        assert_transform_compliance(async move {
1637            let config = RemapConfig {
1638                source: Some("abort".to_owned()),
1639                drop_on_abort: true,
1640                ..Default::default()
1641            };
1642
1643            let (tx, rx) = mpsc::channel(1);
1644            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1645
1646            let log = LogEvent::from("hello world");
1647            tx.send(log.into()).await.unwrap();
1648
1649            drop(tx);
1650            topology.stop().await;
1651            assert_eq!(out.recv().await, None);
1652        })
1653        .await
1654    }
1655
1656    #[test]
1657    fn test_combined_transforms_simple() {
1658        // Make sure that when getting the definitions from one transform and
1659        // passing them to another the correct definition is still produced.
1660
1661        // Transform 1 sets a simple value.
1662        let transform1 = RemapConfig {
1663            source: Some(r#".thing = "potato""#.to_string()),
1664            ..Default::default()
1665        };
1666
1667        let transform2 = RemapConfig {
1668            source: Some(".thang = .thing".to_string()),
1669            ..Default::default()
1670        };
1671
1672        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1673
1674        let outputs1 = transform1.outputs(
1675            enrichment_tables.clone(),
1676            &[("in".into(), schema::Definition::default_legacy_namespace())],
1677            LogNamespace::Legacy,
1678        );
1679
1680        assert_eq!(
1681            vec![TransformOutput::new(
1682                DataType::all_bits(),
1683                // The `never` definition should have been passed on to the end.
1684                [(
1685                    "in".into(),
1686                    Definition::default_legacy_namespace().with_event_field(
1687                        &owned_value_path!("thing"),
1688                        Kind::bytes(),
1689                        None
1690                    ),
1691                )]
1692                .into()
1693            )],
1694            outputs1
1695        );
1696
1697        let outputs2 = transform2.outputs(
1698            enrichment_tables,
1699            &[(
1700                "in1".into(),
1701                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1702            )],
1703            LogNamespace::Legacy,
1704        );
1705
1706        assert_eq!(
1707            vec![TransformOutput::new(
1708                DataType::all_bits(),
1709                [(
1710                    "in1".into(),
1711                    Definition::default_legacy_namespace()
1712                        .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1713                        .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1714                )]
1715                .into(),
1716            )],
1717            outputs2
1718        );
1719    }
1720
1721    #[test]
1722    fn test_combined_transforms_unnest() {
1723        // Make sure that when getting the definitions from one transform and
1724        // passing them to another the correct definition is still produced.
1725
1726        // Transform 1 sets a simple value.
1727        let transform1 = RemapConfig {
1728            source: Some(
1729                indoc! {
1730                r#"
1731                .thing = [{"cabbage": 32}, {"parsnips": 45}]
1732                . = unnest(.thing)
1733                "#
1734                }
1735                .to_string(),
1736            ),
1737            ..Default::default()
1738        };
1739
1740        let transform2 = RemapConfig {
1741            source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1742            ..Default::default()
1743        };
1744
1745        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1746
1747        let outputs1 = transform1.outputs(
1748            enrichment_tables.clone(),
1749            &[(
1750                "in".into(),
1751                schema::Definition::new_with_default_metadata(
1752                    Kind::any_object(),
1753                    [LogNamespace::Legacy],
1754                ),
1755            )],
1756            LogNamespace::Legacy,
1757        );
1758
1759        assert_eq!(
1760            vec![TransformOutput::new(
1761                DataType::all_bits(),
1762                [(
1763                    "in".into(),
1764                    Definition::new_with_default_metadata(
1765                        Kind::any_object(),
1766                        [LogNamespace::Legacy]
1767                    )
1768                    .with_event_field(
1769                        &owned_value_path!("thing"),
1770                        Kind::object(Collection::from(BTreeMap::from([
1771                            ("cabbage".into(), Kind::integer().or_undefined(),),
1772                            ("parsnips".into(), Kind::integer().or_undefined(),)
1773                        ]))),
1774                        None
1775                    ),
1776                )]
1777                .into(),
1778            )],
1779            outputs1
1780        );
1781
1782        let outputs2 = transform2.outputs(
1783            enrichment_tables,
1784            &[(
1785                "in1".into(),
1786                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1787            )],
1788            LogNamespace::Legacy,
1789        );
1790
1791        assert_eq!(
1792            vec![TransformOutput::new(
1793                DataType::all_bits(),
1794                [(
1795                    "in1".into(),
1796                    Definition::default_legacy_namespace()
1797                        .with_event_field(
1798                            &owned_value_path!("thing"),
1799                            Kind::object(Collection::from(BTreeMap::from([
1800                                ("cabbage".into(), Kind::integer().or_undefined(),),
1801                                ("parsnips".into(), Kind::integer().or_undefined(),)
1802                            ]))),
1803                            None
1804                        )
1805                        .with_event_field(
1806                            &owned_value_path!("thang"),
1807                            Kind::integer().or_null(),
1808                            None
1809                        ),
1810                )]
1811                .into(),
1812            )],
1813            outputs2
1814        );
1815    }
1816
1817    #[test]
1818    fn test_transform_abort() {
1819        // An abort should not change the typedef.
1820
1821        let transform1 = RemapConfig {
1822            source: Some(r"abort".to_string()),
1823            ..Default::default()
1824        };
1825
1826        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1827
1828        let outputs1 = transform1.outputs(
1829            enrichment_tables,
1830            &[(
1831                "in".into(),
1832                schema::Definition::new_with_default_metadata(
1833                    Kind::any_object(),
1834                    [LogNamespace::Legacy],
1835                ),
1836            )],
1837            LogNamespace::Legacy,
1838        );
1839
1840        assert_eq!(
1841            vec![TransformOutput::new(
1842                DataType::all_bits(),
1843                [(
1844                    "in".into(),
1845                    Definition::new_with_default_metadata(
1846                        Kind::any_object(),
1847                        [LogNamespace::Legacy]
1848                    ),
1849                )]
1850                .into(),
1851            )],
1852            outputs1
1853        );
1854    }
1855
1856    #[test]
1857    fn test_error_outputs() {
1858        // Even if we fail to compile the VRL it should still output
1859        // the correct ports. This may change if we separate the
1860        // `outputs` function into one returning outputs and a separate
1861        // returning schema definitions.
1862        let transform1 = RemapConfig {
1863            // This enrichment table does not exist.
1864            source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1865            reroute_dropped: true,
1866            ..Default::default()
1867        };
1868
1869        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1870
1871        let outputs1 = transform1.outputs(
1872            enrichment_tables,
1873            &[(
1874                "in".into(),
1875                schema::Definition::new_with_default_metadata(
1876                    Kind::any_object(),
1877                    [LogNamespace::Legacy],
1878                ),
1879            )],
1880            LogNamespace::Legacy,
1881        );
1882
1883        assert_eq!(
1884            HashSet::from([None, Some("dropped".to_string())]),
1885            outputs1
1886                .into_iter()
1887                .map(|output| output.port)
1888                .collect::<HashSet<_>>()
1889        );
1890    }
1891
1892    #[test]
1893    fn test_non_object_events() {
1894        let transform1 = RemapConfig {
1895            // This enrichment table does not exist.
1896            source: Some(r#". = "fish" "#.to_string()),
1897            ..Default::default()
1898        };
1899
1900        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1901
1902        let outputs1 = transform1.outputs(
1903            enrichment_tables,
1904            &[(
1905                "in".into(),
1906                schema::Definition::new_with_default_metadata(
1907                    Kind::any_object(),
1908                    [LogNamespace::Legacy],
1909                ),
1910            )],
1911            LogNamespace::Legacy,
1912        );
1913
1914        let wanted = schema::Definition::new_with_default_metadata(
1915            Kind::object(Collection::from_unknown(Kind::undefined())),
1916            [LogNamespace::Legacy],
1917        )
1918        .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1919
1920        assert_eq!(
1921            HashMap::from([(OutputId::from("in"), wanted)]),
1922            outputs1[0].schema_definitions(true),
1923        );
1924    }
1925
1926    #[test]
1927    fn test_array_and_non_object_events() {
1928        let transform1 = RemapConfig {
1929            source: Some(
1930                indoc! {r#"
1931                    if .lizard == true {
1932                        .thing = [{"cabbage": 42}];
1933                        . = unnest(.thing)
1934                    } else {
1935                      . = "fish"
1936                    }
1937                    "#}
1938                .to_string(),
1939            ),
1940            ..Default::default()
1941        };
1942
1943        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1944
1945        let outputs1 = transform1.outputs(
1946            enrichment_tables,
1947            &[(
1948                "in".into(),
1949                schema::Definition::new_with_default_metadata(
1950                    Kind::any_object(),
1951                    [LogNamespace::Legacy],
1952                ),
1953            )],
1954            LogNamespace::Legacy,
1955        );
1956
1957        let wanted = schema::Definition::new_with_default_metadata(
1958            Kind::any_object(),
1959            [LogNamespace::Legacy],
1960        )
1961        .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1962        .with_event_field(
1963            &owned_value_path!("thing"),
1964            Kind::object(Collection::from(BTreeMap::from([(
1965                "cabbage".into(),
1966                Kind::integer(),
1967            )])))
1968            .or_undefined(),
1969            None,
1970        );
1971
1972        assert_eq!(
1973            HashMap::from([(OutputId::from("in"), wanted)]),
1974            outputs1[0].schema_definitions(true),
1975        );
1976    }
1977
1978    #[test]
1979    fn check_remap_array_vector_namespace() {
1980        let event = {
1981            let mut event = LogEvent::from("input");
1982            // mark the event as a "Vector" namespaced log
1983            event
1984                .metadata_mut()
1985                .value_mut()
1986                .insert("vector", BTreeMap::new());
1987            Event::from(event)
1988        };
1989
1990        let conf = RemapConfig {
1991            source: Some(
1992                r". = [null]
1993"
1994                .to_string(),
1995            ),
1996            file: None,
1997            drop_on_error: true,
1998            drop_on_abort: false,
1999            ..Default::default()
2000        };
2001        let mut tform = remap(conf.clone()).unwrap();
2002        let result = transform_one(&mut tform, event).unwrap();
2003
2004        // Legacy namespace nests this under "message", Vector should set it as the root
2005        assert_eq!(result.as_log().get("."), Some(&Value::Null));
2006
2007        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
2008        let outputs1 = conf.outputs(
2009            enrichment_tables,
2010            &[(
2011                "in".into(),
2012                schema::Definition::new_with_default_metadata(
2013                    Kind::any_object(),
2014                    [LogNamespace::Vector],
2015                ),
2016            )],
2017            LogNamespace::Vector,
2018        );
2019
2020        let wanted =
2021            schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
2022
2023        assert_eq!(
2024            HashMap::from([(OutputId::from("in"), wanted)]),
2025            outputs1[0].schema_definitions(true),
2026        );
2027    }
2028
2029    fn assert_no_metrics(source: String) {
2030        vector_lib::metrics::init_test();
2031
2032        let config = RemapConfig {
2033            source: Some(source),
2034            drop_on_error: true,
2035            drop_on_abort: true,
2036            reroute_dropped: true,
2037            ..Default::default()
2038        };
2039        let mut ast_runner = remap(config).unwrap();
2040        let input_event =
2041            Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2042        let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2043        let dropped_log = dropped_event.as_log();
2044        assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2045
2046        let controller = Controller::get().expect("no controller");
2047        let metrics = controller
2048            .capture_metrics()
2049            .into_iter()
2050            .map(|metric| (metric.name().to_string(), metric))
2051            .collect::<BTreeMap<String, Metric>>();
2052        assert_eq!(metrics.get("component_discarded_events_total"), None);
2053        assert_eq!(metrics.get("component_errors_total"), None);
2054    }
2055    #[test]
2056    fn do_not_emit_metrics_when_dropped() {
2057        assert_no_metrics("abort".to_string());
2058    }
2059
2060    #[test]
2061    fn do_not_emit_metrics_when_errored() {
2062        assert_no_metrics("parse_key_value!(.message)".to_string());
2063    }
2064}