vector/transforms/
remap.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    fs::File,
4    io::{self, Read},
5    path::PathBuf,
6    sync::Mutex,
7};
8
9use snafu::{ResultExt, Snafu};
10use vector_lib::{
11    TimeZone,
12    codecs::MetricTagValues,
13    compile_vrl,
14    config::LogNamespace,
15    configurable::configurable_component,
16    enrichment::TableRegistry,
17    lookup::{PathPrefix, metadata_path, owned_value_path},
18    schema::Definition,
19};
20use vector_vrl_functions::set_semantic_meaning::MeaningList;
21use vrl::{
22    compiler::{
23        CompileConfig, ExpressionError, Program, TypeState, VrlRuntime,
24        runtime::{Runtime, Terminate},
25        state::ExternalEnv,
26    },
27    diagnostic::{DiagnosticMessage, Formatter, Note},
28    path,
29    path::ValuePath,
30    value::{Kind, Value},
31};
32
33use crate::{
34    Result,
35    config::{
36        ComponentKey, DataType, Input, OutputId, TransformConfig, TransformContext,
37        TransformOutput, log_schema,
38    },
39    event::{Event, TargetEvents, VrlTarget},
40    internal_events::{RemapMappingAbort, RemapMappingError},
41    schema,
42    transforms::{SyncTransform, Transform, TransformOutputsBuf},
43};
44
45const DROPPED: &str = "dropped";
46type CacheKey = (TableRegistry, schema::Definition);
47type CacheValue = (Program, String, MeaningList);
48
49/// Configuration for the `remap` transform.
50#[configurable_component(transform(
51    "remap",
52    "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
53))]
54#[derive(Derivative)]
55#[serde(deny_unknown_fields)]
56#[derivative(Default, Debug)]
57pub struct RemapConfig {
58    /// The [Vector Remap Language][vrl] (VRL) program to execute for each event.
59    ///
60    /// Required if `file` is missing.
61    ///
62    /// [vrl]: https://vector.dev/docs/reference/vrl
63    #[configurable(metadata(
64        docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
65        docs::syntax_override = "remap_program"
66    ))]
67    pub source: Option<String>,
68
69    /// File path to the [Vector Remap Language][vrl] (VRL) program to execute for each event.
70    ///
71    /// If a relative path is provided, its root is the current working directory.
72    ///
73    /// Required if `source` is missing.
74    ///
75    /// [vrl]: https://vector.dev/docs/reference/vrl
76    #[configurable(metadata(docs::examples = "./my/program.vrl"))]
77    pub file: Option<PathBuf>,
78
79    /// File paths to the [Vector Remap Language][vrl] (VRL) programs to execute for each event.
80    ///
81    /// If a relative path is provided, its root is the current working directory.
82    ///
83    /// Required if `source` or `file` are missing.
84    ///
85    /// [vrl]: https://vector.dev/docs/reference/vrl
86    #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
87    pub files: Option<Vec<PathBuf>>,
88
89    /// When set to `single`, metric tag values are exposed as single strings, the
90    /// same as they were before this config option. Tags with multiple values show the last assigned value, and null values
91    /// are ignored.
92    ///
93    /// When set to `full`, all metric tags are exposed as arrays of either string or null
94    /// values.
95    #[serde(default)]
96    pub metric_tag_values: MetricTagValues,
97
98    /// The name of the timezone to apply to timestamp conversions that do not contain an explicit
99    /// time zone.
100    ///
101    /// This overrides the [global `timezone`][global_timezone] option. The time zone name may be
102    /// any name in the [TZ database][tz_database], or `local` to indicate system local time.
103    ///
104    /// [global_timezone]: https://vector.dev/docs/reference/configuration//global-options#timezone
105    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
106    #[serde(default)]
107    #[configurable(metadata(docs::advanced))]
108    pub timezone: Option<TimeZone>,
109
110    /// Drops any event that encounters an error during processing.
111    ///
112    /// Normally, if a VRL program encounters an error when processing an event, the original,
113    /// unmodified event is sent downstream. In some cases, you may not want to send the event
114    /// any further, such as if certain transformation or enrichment is strictly required. Setting
115    /// `drop_on_error` to `true` allows you to ensure these events do not get processed any
116    /// further.
117    ///
118    /// Additionally, dropped events can potentially be diverted to a specially named output for
119    /// further logging and analysis by setting `reroute_dropped`.
120    #[serde(default = "crate::serde::default_false")]
121    #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
122    pub drop_on_error: bool,
123
124    /// Drops any event that is manually aborted during processing.
125    ///
126    /// If a VRL program is manually aborted (using [`abort`][vrl_docs_abort]) when
127    /// processing an event, this option controls whether the original, unmodified event is sent
128    /// downstream without any modifications or if it is dropped.
129    ///
130    /// Additionally, dropped events can potentially be diverted to a specially-named output for
131    /// further logging and analysis by setting `reroute_dropped`.
132    ///
133    /// [vrl_docs_abort]: https://vector.dev/docs/reference/vrl/expressions/#abort
134    #[serde(default = "crate::serde::default_true")]
135    #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
136    pub drop_on_abort: bool,
137
138    /// Reroutes dropped events to a named output instead of halting processing on them.
139    ///
140    /// When using `drop_on_error` or `drop_on_abort`, events that are "dropped" are processed no
141    /// further. In some cases, it may be desirable to keep the events around for further analysis,
142    /// debugging, or retrying.
143    ///
144    /// In these cases, `reroute_dropped` can be set to `true` which forwards the original event
145    /// to a specially-named output, `dropped`. The original event is annotated with additional
146    /// fields describing why the event was dropped.
147    #[serde(default = "crate::serde::default_false")]
148    #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
149    pub reroute_dropped: bool,
150
151    #[configurable(derived, metadata(docs::hidden))]
152    #[serde(default)]
153    pub runtime: VrlRuntime,
154
155    #[configurable(derived, metadata(docs::hidden))]
156    #[serde(skip)]
157    #[derivative(Debug = "ignore")]
158    /// Cache can't be `BTreeMap` or `HashMap` because of `TableRegistry`, which doesn't allow us to inspect tables inside it.
159    /// And even if we allowed the inspection, the tables can be huge, resulting in a long comparison or hash computation
160    /// while using `Vec` allows us to use just a shallow comparison
161    pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
162}
163
164impl Clone for RemapConfig {
165    fn clone(&self) -> Self {
166        Self {
167            source: self.source.clone(),
168            file: self.file.clone(),
169            files: self.files.clone(),
170            metric_tag_values: self.metric_tag_values,
171            timezone: self.timezone,
172            drop_on_error: self.drop_on_error,
173            drop_on_abort: self.drop_on_abort,
174            reroute_dropped: self.reroute_dropped,
175            runtime: self.runtime,
176            cache: Mutex::new(Default::default()),
177        }
178    }
179}
180
181impl RemapConfig {
182    fn compile_vrl_program(
183        &self,
184        enrichment_tables: TableRegistry,
185        merged_schema_definition: schema::Definition,
186    ) -> Result<(Program, String, MeaningList)> {
187        if let Some((_, res)) = self
188            .cache
189            .lock()
190            .expect("Data poisoned")
191            .iter()
192            .find(|v| v.0.0 == enrichment_tables && v.0.1 == merged_schema_definition)
193        {
194            return res.clone().map_err(Into::into);
195        }
196
197        let source = match (&self.source, &self.file, &self.files) {
198            (Some(source), None, None) => source.to_owned(),
199            (None, Some(path), None) => Self::read_file(path)?,
200            (None, None, Some(paths)) => {
201                let mut combined_source = String::new();
202                for path in paths {
203                    let content = Self::read_file(path)?;
204                    combined_source.push_str(&content);
205                    combined_source.push('\n');
206                }
207                combined_source
208            }
209            _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
210        };
211
212        let mut functions = vrl::stdlib::all();
213        functions.append(&mut vector_lib::enrichment::vrl_functions());
214        #[cfg(feature = "sources-dnstap")]
215        functions.append(&mut dnstap_parser::vrl_functions());
216        functions.append(&mut vector_vrl_functions::all());
217
218        let state = TypeState {
219            local: Default::default(),
220            external: ExternalEnv::new_with_kind(
221                merged_schema_definition.event_kind().clone(),
222                merged_schema_definition.metadata_kind().clone(),
223            ),
224        };
225        let mut config = CompileConfig::default();
226
227        config.set_custom(enrichment_tables.clone());
228        config.set_custom(MeaningList::default());
229
230        let res = compile_vrl(&source, &functions, &state, config)
231            .map_err(|diagnostics| Formatter::new(&source, diagnostics).colored().to_string())
232            .map(|result| {
233                (
234                    result.program,
235                    Formatter::new(&source, result.warnings).to_string(),
236                    result.config.get_custom::<MeaningList>().unwrap().clone(),
237                )
238            });
239
240        self.cache
241            .lock()
242            .expect("Data poisoned")
243            .push(((enrichment_tables, merged_schema_definition), res.clone()));
244
245        res.map_err(Into::into)
246    }
247
248    fn read_file(path: &PathBuf) -> Result<String> {
249        let mut buffer = String::new();
250        File::open(path)
251            .with_context(|_| FileOpenFailedSnafu { path })?
252            .read_to_string(&mut buffer)
253            .with_context(|_| FileReadFailedSnafu { path })?;
254        Ok(buffer)
255    }
256}
257
258impl_generate_config_from_default!(RemapConfig);
259
260#[async_trait::async_trait]
261#[typetag::serde(name = "remap")]
262impl TransformConfig for RemapConfig {
263    async fn build(&self, context: &TransformContext) -> Result<Transform> {
264        let (transform, warnings) = match self.runtime {
265            VrlRuntime::Ast => {
266                let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
267                (Transform::synchronous(remap), warnings)
268            }
269        };
270
271        // TODO: We could improve on this by adding support for non-fatal error
272        // messages in the topology. This would make the topology responsible
273        // for printing warnings (including potentially emitting metrics),
274        // instead of individual transforms.
275        if !warnings.is_empty() {
276            warn!(message = "VRL compilation warning.", %warnings);
277        }
278
279        Ok(transform)
280    }
281
282    fn input(&self) -> Input {
283        Input::all()
284    }
285
286    fn outputs(
287        &self,
288        enrichment_tables: vector_lib::enrichment::TableRegistry,
289        input_definitions: &[(OutputId, schema::Definition)],
290        _: LogNamespace,
291    ) -> Vec<TransformOutput> {
292        let merged_definition: Definition = input_definitions
293            .iter()
294            .map(|(_output, definition)| definition.clone())
295            .reduce(Definition::merge)
296            .unwrap_or_else(Definition::any);
297
298        // We need to compile the VRL program in order to know the schema definition output of this
299        // transform. We ignore any compilation errors, as those are caught by the transform build
300        // step.
301        let compiled = self
302            .compile_vrl_program(enrichment_tables, merged_definition)
303            .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
304            .map_err(|_| ());
305
306        let mut dropped_definitions = HashMap::new();
307        let mut default_definitions = HashMap::new();
308
309        for (output_id, input_definition) in input_definitions {
310            let default_definition = compiled
311                .clone()
312                .map(|(state, meaning)| {
313                    let mut new_type_def = Definition::new(
314                        state.external.target_kind().clone(),
315                        state.external.metadata_kind().clone(),
316                        input_definition.log_namespaces().clone(),
317                    );
318
319                    for (id, path) in input_definition.meanings() {
320                        // Attempt to copy over the meanings from the input definition.
321                        // The function will fail if the meaning that now points to a field that no longer exists,
322                        // this is fine since we will no longer want that meaning in the output definition.
323                        let _ = new_type_def.try_with_meaning(path.clone(), id);
324                    }
325
326                    // Apply any semantic meanings set in the VRL program
327                    for (id, path) in meaning {
328                        // currently only event paths are supported
329                        new_type_def = new_type_def.with_meaning(path, &id);
330                    }
331                    new_type_def
332                })
333                .unwrap_or_else(|_| {
334                    Definition::new_with_default_metadata(
335                        // The program failed to compile, so it can "never" return a value
336                        Kind::never(),
337                        input_definition.log_namespaces().clone(),
338                    )
339                });
340
341            // When a message is dropped and re-routed, we keep the original event, but also annotate
342            // it with additional metadata.
343            let dropped_definition = Definition::combine_log_namespaces(
344                input_definition.log_namespaces(),
345                input_definition.clone().with_event_field(
346                    log_schema().metadata_key().expect("valid metadata key"),
347                    Kind::object(BTreeMap::from([
348                        ("reason".into(), Kind::bytes()),
349                        ("message".into(), Kind::bytes()),
350                        ("component_id".into(), Kind::bytes()),
351                        ("component_type".into(), Kind::bytes()),
352                        ("component_kind".into(), Kind::bytes()),
353                    ])),
354                    Some("metadata"),
355                ),
356                input_definition
357                    .clone()
358                    .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
359                    .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
360                    .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
361                    .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
362                    .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
363            );
364
365            default_definitions.insert(
366                output_id.clone(),
367                VrlTarget::modify_schema_definition_for_into_events(default_definition),
368            );
369            dropped_definitions.insert(
370                output_id.clone(),
371                VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
372            );
373        }
374
375        let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
376
377        if self.reroute_dropped {
378            vec![
379                default_output,
380                TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
381            ]
382        } else {
383            vec![default_output]
384        }
385    }
386
387    fn enable_concurrency(&self) -> bool {
388        true
389    }
390
391    fn files_to_watch(&self) -> Vec<&PathBuf> {
392        self.file
393            .iter()
394            .chain(self.files.iter().flatten())
395            .collect()
396    }
397}
398
399#[derive(Debug, Clone)]
400pub struct Remap<Runner>
401where
402    Runner: VrlRunner,
403{
404    component_key: Option<ComponentKey>,
405    program: Program,
406    timezone: TimeZone,
407    drop_on_error: bool,
408    drop_on_abort: bool,
409    reroute_dropped: bool,
410    runner: Runner,
411    metric_tag_values: MetricTagValues,
412}
413
414pub trait VrlRunner {
415    fn run(
416        &mut self,
417        target: &mut VrlTarget,
418        program: &Program,
419        timezone: &TimeZone,
420    ) -> std::result::Result<Value, Terminate>;
421}
422
423#[derive(Debug)]
424pub struct AstRunner {
425    pub runtime: Runtime,
426}
427
428impl Clone for AstRunner {
429    fn clone(&self) -> Self {
430        Self {
431            runtime: Runtime::default(),
432        }
433    }
434}
435
436impl VrlRunner for AstRunner {
437    fn run(
438        &mut self,
439        target: &mut VrlTarget,
440        program: &Program,
441        timezone: &TimeZone,
442    ) -> std::result::Result<Value, Terminate> {
443        let result = self.runtime.resolve(target, program, timezone);
444        self.runtime.clear();
445        result
446    }
447}
448
449impl Remap<AstRunner> {
450    pub fn new_ast(
451        config: RemapConfig,
452        context: &TransformContext,
453    ) -> crate::Result<(Self, String)> {
454        let (program, warnings, _) = config.compile_vrl_program(
455            context.enrichment_tables.clone(),
456            context.merged_schema_definition.clone(),
457        )?;
458
459        let runtime = Runtime::default();
460        let runner = AstRunner { runtime };
461
462        Self::new(config, context, program, runner).map(|remap| (remap, warnings))
463    }
464}
465
466impl<Runner> Remap<Runner>
467where
468    Runner: VrlRunner,
469{
470    fn new(
471        config: RemapConfig,
472        context: &TransformContext,
473        program: Program,
474        runner: Runner,
475    ) -> crate::Result<Self> {
476        Ok(Remap {
477            component_key: context.key.clone(),
478            program,
479            timezone: config
480                .timezone
481                .unwrap_or_else(|| context.globals.timezone()),
482            drop_on_error: config.drop_on_error,
483            drop_on_abort: config.drop_on_abort,
484            reroute_dropped: config.reroute_dropped,
485            runner,
486            metric_tag_values: config.metric_tag_values,
487        })
488    }
489
490    #[cfg(test)]
491    const fn runner(&self) -> &Runner {
492        &self.runner
493    }
494
495    fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
496        let message = error
497            .notes()
498            .iter()
499            .filter(|note| matches!(note, Note::UserErrorMessage(_)))
500            .next_back()
501            .map(|note| note.to_string())
502            .unwrap_or_else(|| error.to_string());
503        serde_json::json!({
504                "reason": reason,
505                "message": message,
506                "component_id": self.component_key,
507                "component_type": "remap",
508                "component_kind": "transform",
509        })
510    }
511
512    fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
513        match event {
514            Event::Log(log) => match log.namespace() {
515                LogNamespace::Legacy => {
516                    if let Some(metadata_key) = log_schema().metadata_key() {
517                        log.insert(
518                            (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
519                            self.dropped_data(reason, error),
520                        );
521                    }
522                }
523                LogNamespace::Vector => {
524                    log.insert(
525                        metadata_path!("vector", "dropped"),
526                        self.dropped_data(reason, error),
527                    );
528                }
529            },
530            Event::Metric(metric) => {
531                if let Some(metadata_key) = log_schema().metadata_key() {
532                    metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
533                    metric.replace_tag(
534                        format!("{metadata_key}.dropped.component_id"),
535                        self.component_key
536                            .as_ref()
537                            .map(ToString::to_string)
538                            .unwrap_or_default(),
539                    );
540                    metric.replace_tag(
541                        format!("{metadata_key}.dropped.component_type"),
542                        "remap".into(),
543                    );
544                    metric.replace_tag(
545                        format!("{metadata_key}.dropped.component_kind"),
546                        "transform".into(),
547                    );
548                }
549            }
550            Event::Trace(trace) => {
551                trace.maybe_insert(log_schema().metadata_key_target_path(), || {
552                    self.dropped_data(reason, error).into()
553                });
554            }
555        }
556    }
557
558    fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
559        self.runner.run(target, &self.program, &self.timezone)
560    }
561}
562
563impl<Runner> SyncTransform for Remap<Runner>
564where
565    Runner: VrlRunner + Clone + Send + Sync,
566{
567    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
568        // If a program can fail or abort at runtime and we know that we will still need to forward
569        // the event in that case (either to the main output or `dropped`, depending on the
570        // config), we need to clone the original event and keep it around, to allow us to discard
571        // any mutations made to the event while the VRL program runs, before it failed or aborted.
572        //
573        // The `drop_on_{error, abort}` transform config allows operators to remove events from the
574        // main output if they're failed or aborted, in which case we can skip the cloning, since
575        // any mutations made by VRL will be ignored regardless. If they hav configured
576        // `reroute_dropped`, however, we still need to do the clone to ensure that we can forward
577        // the event to the `dropped` output.
578        let forward_on_error = !self.drop_on_error || self.reroute_dropped;
579        let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
580        let original_event = if (self.program.info().fallible && forward_on_error)
581            || (self.program.info().abortable && forward_on_abort)
582        {
583            Some(event.clone())
584        } else {
585            None
586        };
587
588        let log_namespace = event
589            .maybe_as_log()
590            .map(|log| log.namespace())
591            .unwrap_or(LogNamespace::Legacy);
592
593        let mut target = VrlTarget::new(
594            event,
595            self.program.info(),
596            match self.metric_tag_values {
597                MetricTagValues::Single => false,
598                MetricTagValues::Full => true,
599            },
600        );
601        let result = self.run_vrl(&mut target);
602
603        match result {
604            Ok(_) => match target.into_events(log_namespace) {
605                TargetEvents::One(event) => push_default(event, output),
606                TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
607                TargetEvents::Traces(events) => {
608                    events.for_each(|event| push_default(event, output))
609                }
610            },
611            Err(reason) => {
612                let (reason, error, drop) = match reason {
613                    Terminate::Abort(error) => {
614                        if !self.reroute_dropped {
615                            emit!(RemapMappingAbort {
616                                event_dropped: self.drop_on_abort,
617                            });
618                        }
619                        ("abort", error, self.drop_on_abort)
620                    }
621                    Terminate::Error(error) => {
622                        if !self.reroute_dropped {
623                            emit!(RemapMappingError {
624                                error: error.to_string(),
625                                event_dropped: self.drop_on_error,
626                            });
627                        }
628                        ("error", error, self.drop_on_error)
629                    }
630                };
631
632                if !drop {
633                    let event = original_event.expect("event will be set");
634
635                    push_default(event, output);
636                } else if self.reroute_dropped {
637                    let mut event = original_event.expect("event will be set");
638
639                    self.annotate_dropped(&mut event, reason, error);
640                    push_dropped(event, output);
641                }
642            }
643        }
644    }
645}
646
647#[inline]
648fn push_default(event: Event, output: &mut TransformOutputsBuf) {
649    output.push(None, event)
650}
651
652#[inline]
653fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
654    output.push(Some(DROPPED), event);
655}
656
657#[derive(Debug, Snafu)]
658pub enum BuildError {
659    #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
660    SourceAndOrFileOrFiles,
661
662    #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
663    FileOpenFailed { path: PathBuf, source: io::Error },
664    #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
665    FileReadFailed { path: PathBuf, source: io::Error },
666}
667
668#[cfg(test)]
669mod tests {
670    use std::{
671        collections::{HashMap, HashSet},
672        sync::Arc,
673    };
674
675    use chrono::DateTime;
676    use indoc::{formatdoc, indoc};
677    use tokio::sync::mpsc;
678    use tokio_stream::wrappers::ReceiverStream;
679    use vector_lib::{
680        config::GlobalOptions, enrichment::TableRegistry, event::EventMetadata, metric_tags,
681    };
682    use vrl::{btreemap, event_path, value::kind::Collection};
683
684    use super::*;
685    use crate::{
686        config::{ConfigBuilder, build_unit_tests},
687        event::{
688            LogEvent, Metric, Value,
689            metric::{MetricKind, MetricValue},
690        },
691        metrics::Controller,
692        schema,
693        test_util::components::{
694            COMPONENT_MULTIPLE_OUTPUTS_TESTS, assert_transform_compliance, init_test,
695        },
696        transforms::{OutputBuffer, test::create_topology},
697    };
698
699    fn test_default_schema_definition() -> schema::Definition {
700        schema::Definition::empty_legacy_namespace().with_event_field(
701            &owned_value_path!("a default field"),
702            Kind::integer().or_bytes(),
703            Some("default"),
704        )
705    }
706
707    fn test_dropped_schema_definition() -> schema::Definition {
708        schema::Definition::empty_legacy_namespace().with_event_field(
709            &owned_value_path!("a dropped field"),
710            Kind::boolean().or_null(),
711            Some("dropped"),
712        )
713    }
714
715    fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
716        let schema_definitions = HashMap::from([
717            (
718                None,
719                [("source".into(), test_default_schema_definition())].into(),
720            ),
721            (
722                Some(DROPPED.to_owned()),
723                [("source".into(), test_dropped_schema_definition())].into(),
724            ),
725        ]);
726
727        Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
728            .map(|(remap, _)| remap)
729    }
730
731    #[test]
732    fn generate_config() {
733        crate::test_util::test_generate_config::<RemapConfig>();
734    }
735
736    #[test]
737    fn config_missing_source_and_file() {
738        let config = RemapConfig {
739            source: None,
740            file: None,
741            ..Default::default()
742        };
743
744        let err = remap(config).unwrap_err().to_string();
745        assert_eq!(
746            &err,
747            "must provide exactly one of `source` or `file` or `files` configuration"
748        )
749    }
750
751    #[test]
752    fn config_both_source_and_file() {
753        let config = RemapConfig {
754            source: Some("".to_owned()),
755            file: Some("".into()),
756            ..Default::default()
757        };
758
759        let err = remap(config).unwrap_err().to_string();
760        assert_eq!(
761            &err,
762            "must provide exactly one of `source` or `file` or `files` configuration"
763        )
764    }
765
766    fn get_field_string(event: &Event, field: &str) -> String {
767        event
768            .as_log()
769            .get(field)
770            .unwrap()
771            .to_string_lossy()
772            .into_owned()
773    }
774
775    #[test]
776    fn check_remap_doesnt_share_state_between_events() {
777        let conf = RemapConfig {
778            source: Some(".foo = .sentinel".to_string()),
779            file: None,
780            drop_on_error: true,
781            drop_on_abort: false,
782            ..Default::default()
783        };
784        let mut tform = remap(conf).unwrap();
785        assert!(tform.runner().runtime.is_empty());
786
787        let event1 = {
788            let mut event1 = LogEvent::from("event1");
789            event1.insert("sentinel", "bar");
790            Event::from(event1)
791        };
792        let result1 = transform_one(&mut tform, event1).unwrap();
793        assert_eq!(get_field_string(&result1, "message"), "event1");
794        assert_eq!(get_field_string(&result1, "foo"), "bar");
795        assert!(tform.runner().runtime.is_empty());
796
797        let event2 = {
798            let event2 = LogEvent::from("event2");
799            Event::from(event2)
800        };
801        let result2 = transform_one(&mut tform, event2).unwrap();
802        assert_eq!(get_field_string(&result2, "message"), "event2");
803        assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
804        assert!(tform.runner().runtime.is_empty());
805    }
806
807    #[test]
808    fn remap_return_raw_string_vector_namespace() {
809        let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
810
811        let event = {
812            let mut metadata = EventMetadata::default()
813                .with_schema_definition(&Arc::new(initial_definition.clone()));
814            // the Vector metadata field is required for an event to correctly detect the namespace at runtime
815            metadata
816                .value_mut()
817                .insert(&owned_value_path!("vector"), BTreeMap::new());
818
819            let mut event = LogEvent::new_with_metadata(metadata);
820            event.insert("copy_from", "buz");
821            Event::from(event)
822        };
823
824        let conf = RemapConfig {
825            source: Some(r#"  . = "root string";"#.to_string()),
826            file: None,
827            drop_on_error: true,
828            drop_on_abort: false,
829            ..Default::default()
830        };
831        let mut tform = remap(conf.clone()).unwrap();
832        let result = transform_one(&mut tform, event).unwrap();
833        assert_eq!(get_field_string(&result, "."), "root string");
834
835        let mut outputs = conf.outputs(
836            TableRegistry::default(),
837            &[(OutputId::dummy(), initial_definition)],
838            LogNamespace::Vector,
839        );
840
841        assert_eq!(outputs.len(), 1);
842        let output = outputs.pop().unwrap();
843        assert_eq!(output.port, None);
844        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
845        let expected_schema =
846            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
847        assert_eq!(actual_schema_def, expected_schema);
848    }
849
850    #[test]
851    fn check_remap_adds() {
852        let event = {
853            let mut event = LogEvent::from("augment me");
854            event.insert("copy_from", "buz");
855            Event::from(event)
856        };
857
858        let conf = RemapConfig {
859            source: Some(
860                r#"  .foo = "bar"
861  .bar = "baz"
862  .copy = .copy_from
863"#
864                .to_string(),
865            ),
866            file: None,
867            drop_on_error: true,
868            drop_on_abort: false,
869            ..Default::default()
870        };
871        let mut tform = remap(conf).unwrap();
872        let result = transform_one(&mut tform, event).unwrap();
873        assert_eq!(get_field_string(&result, "message"), "augment me");
874        assert_eq!(get_field_string(&result, "copy_from"), "buz");
875        assert_eq!(get_field_string(&result, "foo"), "bar");
876        assert_eq!(get_field_string(&result, "bar"), "baz");
877        assert_eq!(get_field_string(&result, "copy"), "buz");
878    }
879
880    #[test]
881    fn check_remap_emits_multiple() {
882        let event = {
883            let mut event = LogEvent::from("augment me");
884            event.insert(
885                "events",
886                vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
887            );
888            Event::from(event)
889        };
890
891        let conf = RemapConfig {
892            source: Some(
893                indoc! {r"
894                . = .events
895            "}
896                .to_owned(),
897            ),
898            file: None,
899            drop_on_error: true,
900            drop_on_abort: false,
901            ..Default::default()
902        };
903        let mut tform = remap(conf).unwrap();
904
905        let out = collect_outputs(&mut tform, event);
906        assert_eq!(2, out.primary.len());
907        let mut result = out.primary.into_events();
908
909        let r = result.next().unwrap();
910        assert_eq!(get_field_string(&r, "message"), "foo");
911        let r = result.next().unwrap();
912        assert_eq!(get_field_string(&r, "message"), "bar");
913    }
914
915    #[test]
916    fn check_remap_error() {
917        let event = {
918            let mut event = Event::Log(LogEvent::from("augment me"));
919            event.as_mut_log().insert("bar", "is a string");
920            event
921        };
922
923        let conf = RemapConfig {
924            source: Some(formatdoc! {r#"
925                .foo = "foo"
926                .not_an_int = int!(.bar)
927                .baz = 12
928            "#}),
929            file: None,
930            drop_on_error: false,
931            drop_on_abort: false,
932            ..Default::default()
933        };
934        let mut tform = remap(conf).unwrap();
935
936        let event = transform_one(&mut tform, event).unwrap();
937
938        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
939        assert!(event.as_log().get("foo").is_none());
940        assert!(event.as_log().get("baz").is_none());
941    }
942
943    #[test]
944    fn check_remap_error_drop() {
945        let event = {
946            let mut event = Event::Log(LogEvent::from("augment me"));
947            event.as_mut_log().insert("bar", "is a string");
948            event
949        };
950
951        let conf = RemapConfig {
952            source: Some(formatdoc! {r#"
953                .foo = "foo"
954                .not_an_int = int!(.bar)
955                .baz = 12
956            "#}),
957            file: None,
958            drop_on_error: true,
959            drop_on_abort: false,
960            ..Default::default()
961        };
962        let mut tform = remap(conf).unwrap();
963
964        assert!(transform_one(&mut tform, event).is_none())
965    }
966
967    #[test]
968    fn check_remap_error_infallible() {
969        let event = {
970            let mut event = Event::Log(LogEvent::from("augment me"));
971            event.as_mut_log().insert("bar", "is a string");
972            event
973        };
974
975        let conf = RemapConfig {
976            source: Some(formatdoc! {r#"
977                .foo = "foo"
978                .baz = 12
979            "#}),
980            file: None,
981            drop_on_error: false,
982            drop_on_abort: false,
983            ..Default::default()
984        };
985        let mut tform = remap(conf).unwrap();
986
987        let event = transform_one(&mut tform, event).unwrap();
988
989        assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
990        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
991        assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
992    }
993
994    #[test]
995    fn check_remap_abort() {
996        let event = {
997            let mut event = Event::Log(LogEvent::from("augment me"));
998            event.as_mut_log().insert("bar", "is a string");
999            event
1000        };
1001
1002        let conf = RemapConfig {
1003            source: Some(formatdoc! {r#"
1004                .foo = "foo"
1005                abort
1006                .baz = 12
1007            "#}),
1008            file: None,
1009            drop_on_error: false,
1010            drop_on_abort: false,
1011            ..Default::default()
1012        };
1013        let mut tform = remap(conf).unwrap();
1014
1015        let event = transform_one(&mut tform, event).unwrap();
1016
1017        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1018        assert!(event.as_log().get("foo").is_none());
1019        assert!(event.as_log().get("baz").is_none());
1020    }
1021
1022    #[test]
1023    fn check_remap_abort_drop() {
1024        let event = {
1025            let mut event = Event::Log(LogEvent::from("augment me"));
1026            event.as_mut_log().insert("bar", "is a string");
1027            event
1028        };
1029
1030        let conf = RemapConfig {
1031            source: Some(formatdoc! {r#"
1032                .foo = "foo"
1033                abort
1034                .baz = 12
1035            "#}),
1036            file: None,
1037            drop_on_error: false,
1038            drop_on_abort: true,
1039            ..Default::default()
1040        };
1041        let mut tform = remap(conf).unwrap();
1042
1043        assert!(transform_one(&mut tform, event).is_none())
1044    }
1045
1046    #[test]
1047    fn check_remap_metric() {
1048        let metric = Event::Metric(Metric::new(
1049            "counter",
1050            MetricKind::Absolute,
1051            MetricValue::Counter { value: 1.0 },
1052        ));
1053        let metadata = metric.metadata().clone();
1054
1055        let conf = RemapConfig {
1056            source: Some(
1057                r#".tags.host = "zoobub"
1058                       .name = "zork"
1059                       .namespace = "zerk"
1060                       .kind = "incremental""#
1061                    .to_string(),
1062            ),
1063            file: None,
1064            drop_on_error: true,
1065            drop_on_abort: false,
1066            ..Default::default()
1067        };
1068        let mut tform = remap(conf).unwrap();
1069
1070        let result = transform_one(&mut tform, metric).unwrap();
1071        assert_eq!(
1072            result,
1073            Event::Metric(
1074                Metric::new_with_metadata(
1075                    "zork",
1076                    MetricKind::Incremental,
1077                    MetricValue::Counter { value: 1.0 },
1078                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1079                    // to the actual value to skip the assertion here
1080                    metadata
1081                )
1082                .with_namespace(Some("zerk"))
1083                .with_tags(Some(metric_tags! {
1084                    "host" => "zoobub",
1085                }))
1086            )
1087        );
1088    }
1089
1090    #[test]
1091    fn remap_timezone_fallback() {
1092        let error = Event::from_json_value(
1093            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1094            LogNamespace::Legacy,
1095        )
1096        .unwrap();
1097        let conf = RemapConfig {
1098            source: Some(formatdoc! {r#"
1099                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1100            "#}),
1101            drop_on_error: true,
1102            drop_on_abort: true,
1103            reroute_dropped: true,
1104            ..Default::default()
1105        };
1106        let context = TransformContext {
1107            key: Some(ComponentKey::from("remapper")),
1108            globals: GlobalOptions {
1109                timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1110                ..Default::default()
1111            },
1112            ..Default::default()
1113        };
1114        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1115
1116        let output = transform_one_fallible(&mut tform, error).unwrap();
1117        let log = output.as_log();
1118        assert_eq!(
1119            log["timestamp"],
1120            DateTime::<chrono::Utc>::from(
1121                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1122            )
1123            .into()
1124        );
1125    }
1126
1127    #[test]
1128    fn remap_timezone_override() {
1129        let error = Event::from_json_value(
1130            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1131            LogNamespace::Legacy,
1132        )
1133        .unwrap();
1134        let conf = RemapConfig {
1135            source: Some(formatdoc! {r#"
1136                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1137            "#}),
1138            drop_on_error: true,
1139            drop_on_abort: true,
1140            reroute_dropped: true,
1141            timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1142            ..Default::default()
1143        };
1144        let context = TransformContext {
1145            key: Some(ComponentKey::from("remapper")),
1146            globals: GlobalOptions {
1147                timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1148                ..Default::default()
1149            },
1150            ..Default::default()
1151        };
1152        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1153
1154        let output = transform_one_fallible(&mut tform, error).unwrap();
1155        let log = output.as_log();
1156        assert_eq!(
1157            log["timestamp"],
1158            DateTime::<chrono::Utc>::from(
1159                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1160            )
1161            .into()
1162        );
1163    }
1164
1165    #[test]
1166    fn check_remap_branching() {
1167        let happy =
1168            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1169                .unwrap();
1170        let abort = Event::from_json_value(
1171            serde_json::json!({"hello": "goodbye"}),
1172            LogNamespace::Legacy,
1173        )
1174        .unwrap();
1175        let error =
1176            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1177
1178        let happy_metric = {
1179            let mut metric = Metric::new(
1180                "counter",
1181                MetricKind::Absolute,
1182                MetricValue::Counter { value: 1.0 },
1183            );
1184            metric.replace_tag("hello".into(), "world".into());
1185            Event::Metric(metric)
1186        };
1187
1188        let abort_metric = {
1189            let mut metric = Metric::new(
1190                "counter",
1191                MetricKind::Absolute,
1192                MetricValue::Counter { value: 1.0 },
1193            );
1194            metric.replace_tag("hello".into(), "goodbye".into());
1195            Event::Metric(metric)
1196        };
1197
1198        let error_metric = {
1199            let mut metric = Metric::new(
1200                "counter",
1201                MetricKind::Absolute,
1202                MetricValue::Counter { value: 1.0 },
1203            );
1204            metric.replace_tag("not_hello".into(), "oops".into());
1205            Event::Metric(metric)
1206        };
1207
1208        let conf = RemapConfig {
1209            source: Some(formatdoc! {r#"
1210                if exists(.tags) {{
1211                    # metrics
1212                    .tags.foo = "bar"
1213                    if string!(.tags.hello) == "goodbye" {{
1214                      abort
1215                    }}
1216                }} else {{
1217                    # logs
1218                    .foo = "bar"
1219                    if string(.hello) == "goodbye" {{
1220                      abort
1221                    }}
1222                }}
1223            "#}),
1224            drop_on_error: true,
1225            drop_on_abort: true,
1226            reroute_dropped: true,
1227            ..Default::default()
1228        };
1229        let schema_definitions = HashMap::from([
1230            (
1231                None,
1232                [("source".into(), test_default_schema_definition())].into(),
1233            ),
1234            (
1235                Some(DROPPED.to_owned()),
1236                [("source".into(), test_dropped_schema_definition())].into(),
1237            ),
1238        ]);
1239        let context = TransformContext {
1240            key: Some(ComponentKey::from("remapper")),
1241            schema_definitions,
1242            merged_schema_definition: schema::Definition::new_with_default_metadata(
1243                Kind::any_object(),
1244                [LogNamespace::Legacy],
1245            )
1246            .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1247            ..Default::default()
1248        };
1249        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1250
1251        let output = transform_one_fallible(&mut tform, happy).unwrap();
1252        let log = output.as_log();
1253        assert_eq!(log["hello"], "world".into());
1254        assert_eq!(log["foo"], "bar".into());
1255        assert!(!log.contains(event_path!("metadata")));
1256
1257        let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1258        let log = output.as_log();
1259        assert_eq!(log["hello"], "goodbye".into());
1260        assert!(!log.contains(event_path!("foo")));
1261        assert_eq!(
1262            log["metadata"],
1263            serde_json::json!({
1264                "dropped": {
1265                    "reason": "abort",
1266                    "message": "aborted",
1267                    "component_id": "remapper",
1268                    "component_type": "remap",
1269                    "component_kind": "transform",
1270                }
1271            })
1272            .try_into()
1273            .unwrap()
1274        );
1275
1276        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1277        let log = output.as_log();
1278        assert_eq!(log["hello"], 42.into());
1279        assert!(!log.contains(event_path!("foo")));
1280        assert_eq!(
1281            log["metadata"],
1282            serde_json::json!({
1283                "dropped": {
1284                    "reason": "error",
1285                    "message": "function call error for \"string\" at (160:174): expected string, got integer",
1286                    "component_id": "remapper",
1287                    "component_type": "remap",
1288                    "component_kind": "transform",
1289                }
1290            })
1291            .try_into()
1292            .unwrap()
1293        );
1294
1295        let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1296        similar_asserts::assert_eq!(
1297            output,
1298            Event::Metric(
1299                Metric::new_with_metadata(
1300                    "counter",
1301                    MetricKind::Absolute,
1302                    MetricValue::Counter { value: 1.0 },
1303                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1304                    // to the actual value to skip the assertion here
1305                    EventMetadata::default()
1306                        .with_schema_definition(output.metadata().schema_definition()),
1307                )
1308                .with_tags(Some(metric_tags! {
1309                    "hello" => "world",
1310                    "foo" => "bar",
1311                }))
1312            )
1313        );
1314
1315        let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1316        similar_asserts::assert_eq!(
1317            output,
1318            Event::Metric(
1319                Metric::new_with_metadata(
1320                    "counter",
1321                    MetricKind::Absolute,
1322                    MetricValue::Counter { value: 1.0 },
1323                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1324                    // to the actual value to skip the assertion here
1325                    EventMetadata::default()
1326                        .with_schema_definition(output.metadata().schema_definition()),
1327                )
1328                .with_tags(Some(metric_tags! {
1329                    "hello" => "goodbye",
1330                    "metadata.dropped.reason" => "abort",
1331                    "metadata.dropped.component_id" => "remapper",
1332                    "metadata.dropped.component_type" => "remap",
1333                    "metadata.dropped.component_kind" => "transform",
1334                }))
1335            )
1336        );
1337
1338        let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1339        similar_asserts::assert_eq!(
1340            output,
1341            Event::Metric(
1342                Metric::new_with_metadata(
1343                    "counter",
1344                    MetricKind::Absolute,
1345                    MetricValue::Counter { value: 1.0 },
1346                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1347                    // to the actual value to skip the assertion here
1348                    EventMetadata::default()
1349                        .with_schema_definition(output.metadata().schema_definition()),
1350                )
1351                .with_tags(Some(metric_tags! {
1352                    "not_hello" => "oops",
1353                    "metadata.dropped.reason" => "error",
1354                    "metadata.dropped.component_id" => "remapper",
1355                    "metadata.dropped.component_type" => "remap",
1356                    "metadata.dropped.component_kind" => "transform",
1357                }))
1358            )
1359        );
1360    }
1361
1362    #[test]
1363    fn check_remap_branching_assert_with_message() {
1364        let error_trigger_assert_custom_message =
1365            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1366        let error_trigger_default_assert_message =
1367            Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1368        let conf = RemapConfig {
1369            source: Some(formatdoc! {r#"
1370                assert_eq!(.hello, 0, "custom message here")
1371                assert_eq!(.hello, 1)
1372            "#}),
1373            drop_on_error: true,
1374            drop_on_abort: true,
1375            reroute_dropped: true,
1376            ..Default::default()
1377        };
1378        let context = TransformContext {
1379            key: Some(ComponentKey::from("remapper")),
1380            ..Default::default()
1381        };
1382        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1383
1384        let output =
1385            transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1386        let log = output.as_log();
1387        assert_eq!(log["hello"], 42.into());
1388        assert!(!log.contains(event_path!("foo")));
1389        assert_eq!(
1390            log["metadata"],
1391            serde_json::json!({
1392                "dropped": {
1393                    "reason": "error",
1394                    "message": "custom message here",
1395                    "component_id": "remapper",
1396                    "component_type": "remap",
1397                    "component_kind": "transform",
1398                }
1399            })
1400            .try_into()
1401            .unwrap()
1402        );
1403
1404        let output =
1405            transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1406        let log = output.as_log();
1407        assert_eq!(log["hello"], 0.into());
1408        assert!(!log.contains(event_path!("foo")));
1409        assert_eq!(
1410            log["metadata"],
1411            serde_json::json!({
1412                "dropped": {
1413                    "reason": "error",
1414                    "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1415                    "component_id": "remapper",
1416                    "component_type": "remap",
1417                    "component_kind": "transform",
1418                }
1419            })
1420            .try_into()
1421            .unwrap()
1422        );
1423    }
1424
1425    #[test]
1426    fn check_remap_branching_abort_with_message() {
1427        let error =
1428            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1429        let conf = RemapConfig {
1430            source: Some(formatdoc! {r#"
1431                abort "custom message here"
1432            "#}),
1433            drop_on_error: true,
1434            drop_on_abort: true,
1435            reroute_dropped: true,
1436            ..Default::default()
1437        };
1438        let context = TransformContext {
1439            key: Some(ComponentKey::from("remapper")),
1440            ..Default::default()
1441        };
1442        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1443
1444        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1445        let log = output.as_log();
1446        assert_eq!(log["hello"], 42.into());
1447        assert!(!log.contains(event_path!("foo")));
1448        assert_eq!(
1449            log["metadata"],
1450            serde_json::json!({
1451                "dropped": {
1452                    "reason": "abort",
1453                    "message": "custom message here",
1454                    "component_id": "remapper",
1455                    "component_type": "remap",
1456                    "component_kind": "transform",
1457                }
1458            })
1459            .try_into()
1460            .unwrap()
1461        );
1462    }
1463
1464    #[test]
1465    fn check_remap_branching_disabled() {
1466        let happy =
1467            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1468                .unwrap();
1469        let abort = Event::from_json_value(
1470            serde_json::json!({"hello": "goodbye"}),
1471            LogNamespace::Legacy,
1472        )
1473        .unwrap();
1474        let error =
1475            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1476
1477        let conf = RemapConfig {
1478            source: Some(formatdoc! {r#"
1479                if exists(.tags) {{
1480                    # metrics
1481                    .tags.foo = "bar"
1482                    if string!(.tags.hello) == "goodbye" {{
1483                      abort
1484                    }}
1485                }} else {{
1486                    # logs
1487                    .foo = "bar"
1488                    if string!(.hello) == "goodbye" {{
1489                      abort
1490                    }}
1491                }}
1492            "#}),
1493            drop_on_error: true,
1494            drop_on_abort: true,
1495            reroute_dropped: false,
1496            ..Default::default()
1497        };
1498
1499        let schema_definition = schema::Definition::new_with_default_metadata(
1500            Kind::any_object(),
1501            [LogNamespace::Legacy],
1502        )
1503        .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1504        .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1505
1506        assert_eq!(
1507            conf.outputs(
1508                vector_lib::enrichment::TableRegistry::default(),
1509                &[(
1510                    "test".into(),
1511                    schema::Definition::new_with_default_metadata(
1512                        Kind::any_object(),
1513                        [LogNamespace::Legacy]
1514                    )
1515                )],
1516                LogNamespace::Legacy
1517            ),
1518            vec![TransformOutput::new(
1519                DataType::all_bits(),
1520                [("test".into(), schema_definition)].into()
1521            )]
1522        );
1523
1524        let context = TransformContext {
1525            key: Some(ComponentKey::from("remapper")),
1526            ..Default::default()
1527        };
1528        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1529
1530        let output = transform_one_fallible(&mut tform, happy).unwrap();
1531        let log = output.as_log();
1532        assert_eq!(log["hello"], "world".into());
1533        assert_eq!(log["foo"], "bar".into());
1534        assert!(!log.contains(event_path!("metadata")));
1535
1536        let out = collect_outputs(&mut tform, abort);
1537        assert!(out.primary.is_empty());
1538        assert!(out.named[DROPPED].is_empty());
1539
1540        let out = collect_outputs(&mut tform, error);
1541        assert!(out.primary.is_empty());
1542        assert!(out.named[DROPPED].is_empty());
1543    }
1544
1545    #[tokio::test]
1546    async fn check_remap_branching_metrics_with_output() {
1547        init_test();
1548
1549        let config: ConfigBuilder = toml::from_str(indoc! {r#"
1550            [transforms.foo]
1551            inputs = []
1552            type = "remap"
1553            drop_on_abort = true
1554            reroute_dropped = true
1555            source = "abort"
1556
1557            [[tests]]
1558            name = "metric output"
1559
1560            [tests.input]
1561                insert_at = "foo"
1562                value = "none"
1563
1564            [[tests.outputs]]
1565                extract_from = "foo.dropped"
1566                [[tests.outputs.conditions]]
1567                type = "vrl"
1568                source = "true"
1569        "#})
1570        .unwrap();
1571
1572        let mut tests = build_unit_tests(config).await.unwrap();
1573        assert!(tests.remove(0).run().await.errors.is_empty());
1574        // Check that metrics were emitted with output tag
1575        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1576    }
1577
1578    struct CollectedOuput {
1579        primary: OutputBuffer,
1580        named: HashMap<String, OutputBuffer>,
1581    }
1582
1583    fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput {
1584        let mut outputs = TransformOutputsBuf::new_with_capacity(
1585            vec![
1586                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1587                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1588            ],
1589            1,
1590        );
1591
1592        ft.transform(event, &mut outputs);
1593
1594        CollectedOuput {
1595            primary: outputs.take_primary(),
1596            named: outputs.take_all_named(),
1597        }
1598    }
1599
1600    fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1601        let out = collect_outputs(ft, event);
1602        assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1603        assert!(out.primary.len() <= 1);
1604        out.primary.into_events().next()
1605    }
1606
1607    fn transform_one_fallible(
1608        ft: &mut dyn SyncTransform,
1609        event: Event,
1610    ) -> std::result::Result<Event, Event> {
1611        let mut outputs = TransformOutputsBuf::new_with_capacity(
1612            vec![
1613                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1614                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1615            ],
1616            1,
1617        );
1618
1619        ft.transform(event, &mut outputs);
1620
1621        let mut buf = outputs.drain().collect::<Vec<_>>();
1622        let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1623
1624        assert!(buf.len() < 2);
1625        assert!(err_buf.len() < 2);
1626        match (buf.pop(), err_buf.pop()) {
1627            (Some(good), None) => Ok(good),
1628            (None, Some(bad)) => Err(bad),
1629            (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1630        }
1631    }
1632
1633    #[tokio::test]
1634    async fn emits_internal_events() {
1635        assert_transform_compliance(async move {
1636            let config = RemapConfig {
1637                source: Some("abort".to_owned()),
1638                drop_on_abort: true,
1639                ..Default::default()
1640            };
1641
1642            let (tx, rx) = mpsc::channel(1);
1643            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1644
1645            let log = LogEvent::from("hello world");
1646            tx.send(log.into()).await.unwrap();
1647
1648            drop(tx);
1649            topology.stop().await;
1650            assert_eq!(out.recv().await, None);
1651        })
1652        .await
1653    }
1654
1655    #[test]
1656    fn test_combined_transforms_simple() {
1657        // Make sure that when getting the definitions from one transform and
1658        // passing them to another the correct definition is still produced.
1659
1660        // Transform 1 sets a simple value.
1661        let transform1 = RemapConfig {
1662            source: Some(r#".thing = "potato""#.to_string()),
1663            ..Default::default()
1664        };
1665
1666        let transform2 = RemapConfig {
1667            source: Some(".thang = .thing".to_string()),
1668            ..Default::default()
1669        };
1670
1671        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1672
1673        let outputs1 = transform1.outputs(
1674            enrichment_tables.clone(),
1675            &[("in".into(), schema::Definition::default_legacy_namespace())],
1676            LogNamespace::Legacy,
1677        );
1678
1679        assert_eq!(
1680            vec![TransformOutput::new(
1681                DataType::all_bits(),
1682                // The `never` definition should have been passed on to the end.
1683                [(
1684                    "in".into(),
1685                    Definition::default_legacy_namespace().with_event_field(
1686                        &owned_value_path!("thing"),
1687                        Kind::bytes(),
1688                        None
1689                    ),
1690                )]
1691                .into()
1692            )],
1693            outputs1
1694        );
1695
1696        let outputs2 = transform2.outputs(
1697            enrichment_tables,
1698            &[(
1699                "in1".into(),
1700                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1701            )],
1702            LogNamespace::Legacy,
1703        );
1704
1705        assert_eq!(
1706            vec![TransformOutput::new(
1707                DataType::all_bits(),
1708                [(
1709                    "in1".into(),
1710                    Definition::default_legacy_namespace()
1711                        .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1712                        .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1713                )]
1714                .into(),
1715            )],
1716            outputs2
1717        );
1718    }
1719
1720    #[test]
1721    fn test_combined_transforms_unnest() {
1722        // Make sure that when getting the definitions from one transform and
1723        // passing them to another the correct definition is still produced.
1724
1725        // Transform 1 sets a simple value.
1726        let transform1 = RemapConfig {
1727            source: Some(
1728                indoc! {
1729                r#"
1730                .thing = [{"cabbage": 32}, {"parsnips": 45}]
1731                . = unnest(.thing)
1732                "#
1733                }
1734                .to_string(),
1735            ),
1736            ..Default::default()
1737        };
1738
1739        let transform2 = RemapConfig {
1740            source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1741            ..Default::default()
1742        };
1743
1744        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1745
1746        let outputs1 = transform1.outputs(
1747            enrichment_tables.clone(),
1748            &[(
1749                "in".into(),
1750                schema::Definition::new_with_default_metadata(
1751                    Kind::any_object(),
1752                    [LogNamespace::Legacy],
1753                ),
1754            )],
1755            LogNamespace::Legacy,
1756        );
1757
1758        assert_eq!(
1759            vec![TransformOutput::new(
1760                DataType::all_bits(),
1761                [(
1762                    "in".into(),
1763                    Definition::new_with_default_metadata(
1764                        Kind::any_object(),
1765                        [LogNamespace::Legacy]
1766                    )
1767                    .with_event_field(
1768                        &owned_value_path!("thing"),
1769                        Kind::object(Collection::from(BTreeMap::from([
1770                            ("cabbage".into(), Kind::integer().or_undefined(),),
1771                            ("parsnips".into(), Kind::integer().or_undefined(),)
1772                        ]))),
1773                        None
1774                    ),
1775                )]
1776                .into(),
1777            )],
1778            outputs1
1779        );
1780
1781        let outputs2 = transform2.outputs(
1782            enrichment_tables,
1783            &[(
1784                "in1".into(),
1785                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1786            )],
1787            LogNamespace::Legacy,
1788        );
1789
1790        assert_eq!(
1791            vec![TransformOutput::new(
1792                DataType::all_bits(),
1793                [(
1794                    "in1".into(),
1795                    Definition::default_legacy_namespace()
1796                        .with_event_field(
1797                            &owned_value_path!("thing"),
1798                            Kind::object(Collection::from(BTreeMap::from([
1799                                ("cabbage".into(), Kind::integer().or_undefined(),),
1800                                ("parsnips".into(), Kind::integer().or_undefined(),)
1801                            ]))),
1802                            None
1803                        )
1804                        .with_event_field(
1805                            &owned_value_path!("thang"),
1806                            Kind::integer().or_null(),
1807                            None
1808                        ),
1809                )]
1810                .into(),
1811            )],
1812            outputs2
1813        );
1814    }
1815
1816    #[test]
1817    fn test_transform_abort() {
1818        // An abort should not change the typedef.
1819
1820        let transform1 = RemapConfig {
1821            source: Some(r"abort".to_string()),
1822            ..Default::default()
1823        };
1824
1825        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1826
1827        let outputs1 = transform1.outputs(
1828            enrichment_tables,
1829            &[(
1830                "in".into(),
1831                schema::Definition::new_with_default_metadata(
1832                    Kind::any_object(),
1833                    [LogNamespace::Legacy],
1834                ),
1835            )],
1836            LogNamespace::Legacy,
1837        );
1838
1839        assert_eq!(
1840            vec![TransformOutput::new(
1841                DataType::all_bits(),
1842                [(
1843                    "in".into(),
1844                    Definition::new_with_default_metadata(
1845                        Kind::any_object(),
1846                        [LogNamespace::Legacy]
1847                    ),
1848                )]
1849                .into(),
1850            )],
1851            outputs1
1852        );
1853    }
1854
1855    #[test]
1856    fn test_error_outputs() {
1857        // Even if we fail to compile the VRL it should still output
1858        // the correct ports. This may change if we separate the
1859        // `outputs` function into one returning outputs and a separate
1860        // returning schema definitions.
1861        let transform1 = RemapConfig {
1862            // This enrichment table does not exist.
1863            source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1864            reroute_dropped: true,
1865            ..Default::default()
1866        };
1867
1868        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1869
1870        let outputs1 = transform1.outputs(
1871            enrichment_tables,
1872            &[(
1873                "in".into(),
1874                schema::Definition::new_with_default_metadata(
1875                    Kind::any_object(),
1876                    [LogNamespace::Legacy],
1877                ),
1878            )],
1879            LogNamespace::Legacy,
1880        );
1881
1882        assert_eq!(
1883            HashSet::from([None, Some("dropped".to_string())]),
1884            outputs1
1885                .into_iter()
1886                .map(|output| output.port)
1887                .collect::<HashSet<_>>()
1888        );
1889    }
1890
1891    #[test]
1892    fn test_non_object_events() {
1893        let transform1 = RemapConfig {
1894            // This enrichment table does not exist.
1895            source: Some(r#". = "fish" "#.to_string()),
1896            ..Default::default()
1897        };
1898
1899        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1900
1901        let outputs1 = transform1.outputs(
1902            enrichment_tables,
1903            &[(
1904                "in".into(),
1905                schema::Definition::new_with_default_metadata(
1906                    Kind::any_object(),
1907                    [LogNamespace::Legacy],
1908                ),
1909            )],
1910            LogNamespace::Legacy,
1911        );
1912
1913        let wanted = schema::Definition::new_with_default_metadata(
1914            Kind::object(Collection::from_unknown(Kind::undefined())),
1915            [LogNamespace::Legacy],
1916        )
1917        .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1918
1919        assert_eq!(
1920            HashMap::from([(OutputId::from("in"), wanted)]),
1921            outputs1[0].schema_definitions(true),
1922        );
1923    }
1924
1925    #[test]
1926    fn test_array_and_non_object_events() {
1927        let transform1 = RemapConfig {
1928            source: Some(
1929                indoc! {r#"
1930                    if .lizard == true {
1931                        .thing = [{"cabbage": 42}];
1932                        . = unnest(.thing)
1933                    } else {
1934                      . = "fish"
1935                    }
1936                    "#}
1937                .to_string(),
1938            ),
1939            ..Default::default()
1940        };
1941
1942        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1943
1944        let outputs1 = transform1.outputs(
1945            enrichment_tables,
1946            &[(
1947                "in".into(),
1948                schema::Definition::new_with_default_metadata(
1949                    Kind::any_object(),
1950                    [LogNamespace::Legacy],
1951                ),
1952            )],
1953            LogNamespace::Legacy,
1954        );
1955
1956        let wanted = schema::Definition::new_with_default_metadata(
1957            Kind::any_object(),
1958            [LogNamespace::Legacy],
1959        )
1960        .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1961        .with_event_field(
1962            &owned_value_path!("thing"),
1963            Kind::object(Collection::from(BTreeMap::from([(
1964                "cabbage".into(),
1965                Kind::integer(),
1966            )])))
1967            .or_undefined(),
1968            None,
1969        );
1970
1971        assert_eq!(
1972            HashMap::from([(OutputId::from("in"), wanted)]),
1973            outputs1[0].schema_definitions(true),
1974        );
1975    }
1976
1977    #[test]
1978    fn check_remap_array_vector_namespace() {
1979        let event = {
1980            let mut event = LogEvent::from("input");
1981            // mark the event as a "Vector" namespaced log
1982            event
1983                .metadata_mut()
1984                .value_mut()
1985                .insert("vector", BTreeMap::new());
1986            Event::from(event)
1987        };
1988
1989        let conf = RemapConfig {
1990            source: Some(
1991                r". = [null]
1992"
1993                .to_string(),
1994            ),
1995            file: None,
1996            drop_on_error: true,
1997            drop_on_abort: false,
1998            ..Default::default()
1999        };
2000        let mut tform = remap(conf.clone()).unwrap();
2001        let result = transform_one(&mut tform, event).unwrap();
2002
2003        // Legacy namespace nests this under "message", Vector should set it as the root
2004        assert_eq!(result.as_log().get("."), Some(&Value::Null));
2005
2006        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
2007        let outputs1 = conf.outputs(
2008            enrichment_tables,
2009            &[(
2010                "in".into(),
2011                schema::Definition::new_with_default_metadata(
2012                    Kind::any_object(),
2013                    [LogNamespace::Vector],
2014                ),
2015            )],
2016            LogNamespace::Vector,
2017        );
2018
2019        let wanted =
2020            schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
2021
2022        assert_eq!(
2023            HashMap::from([(OutputId::from("in"), wanted)]),
2024            outputs1[0].schema_definitions(true),
2025        );
2026    }
2027
2028    fn assert_no_metrics(source: String) {
2029        vector_lib::metrics::init_test();
2030
2031        let config = RemapConfig {
2032            source: Some(source),
2033            drop_on_error: true,
2034            drop_on_abort: true,
2035            reroute_dropped: true,
2036            ..Default::default()
2037        };
2038        let mut ast_runner = remap(config).unwrap();
2039        let input_event =
2040            Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2041        let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2042        let dropped_log = dropped_event.as_log();
2043        assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2044
2045        let controller = Controller::get().expect("no controller");
2046        let metrics = controller
2047            .capture_metrics()
2048            .into_iter()
2049            .map(|metric| (metric.name().to_string(), metric))
2050            .collect::<BTreeMap<String, Metric>>();
2051        assert_eq!(metrics.get("component_discarded_events_total"), None);
2052        assert_eq!(metrics.get("component_errors_total"), None);
2053    }
2054    #[test]
2055    fn do_not_emit_metrics_when_dropped() {
2056        assert_no_metrics("abort".to_string());
2057    }
2058
2059    #[test]
2060    fn do_not_emit_metrics_when_errored() {
2061        assert_no_metrics("parse_key_value!(.message)".to_string());
2062    }
2063}