vector/transforms/
remap.rs

1use std::collections::HashMap;
2use std::sync::Mutex;
3use std::{
4    collections::BTreeMap,
5    fs::File,
6    io::{self, Read},
7    path::PathBuf,
8};
9
10use snafu::{ResultExt, Snafu};
11use vector_lib::codecs::MetricTagValues;
12use vector_lib::compile_vrl;
13use vector_lib::config::LogNamespace;
14use vector_lib::configurable::configurable_component;
15use vector_lib::enrichment::TableRegistry;
16use vector_lib::lookup::{metadata_path, owned_value_path, PathPrefix};
17use vector_lib::schema::Definition;
18use vector_lib::TimeZone;
19use vector_vrl_functions::set_semantic_meaning::MeaningList;
20use vrl::compiler::runtime::{Runtime, Terminate};
21use vrl::compiler::state::ExternalEnv;
22use vrl::compiler::{CompileConfig, ExpressionError, Program, TypeState, VrlRuntime};
23use vrl::diagnostic::{DiagnosticMessage, Formatter, Note};
24use vrl::path;
25use vrl::path::ValuePath;
26use vrl::value::{Kind, Value};
27
28use crate::config::OutputId;
29use crate::{
30    config::{
31        log_schema, ComponentKey, DataType, Input, TransformConfig, TransformContext,
32        TransformOutput,
33    },
34    event::{Event, TargetEvents, VrlTarget},
35    internal_events::{RemapMappingAbort, RemapMappingError},
36    schema,
37    transforms::{SyncTransform, Transform, TransformOutputsBuf},
38    Result,
39};
40
41const DROPPED: &str = "dropped";
42type CacheKey = (TableRegistry, schema::Definition);
43type CacheValue = (Program, String, MeaningList);
44
45/// Configuration for the `remap` transform.
46#[configurable_component(transform(
47    "remap",
48    "Modify your observability data as it passes through your topology using Vector Remap Language (VRL)."
49))]
50#[derive(Derivative)]
51#[serde(deny_unknown_fields)]
52#[derivative(Default, Debug)]
53pub struct RemapConfig {
54    /// The [Vector Remap Language][vrl] (VRL) program to execute for each event.
55    ///
56    /// Required if `file` is missing.
57    ///
58    /// [vrl]: https://vector.dev/docs/reference/vrl
59    #[configurable(metadata(
60        docs::examples = ". = parse_json!(.message)\n.new_field = \"new value\"\n.status = to_int!(.status)\n.duration = parse_duration!(.duration, \"s\")\n.new_name = del(.old_name)",
61        docs::syntax_override = "remap_program"
62    ))]
63    pub source: Option<String>,
64
65    /// File path to the [Vector Remap Language][vrl] (VRL) program to execute for each event.
66    ///
67    /// If a relative path is provided, its root is the current working directory.
68    ///
69    /// Required if `source` is missing.
70    ///
71    /// [vrl]: https://vector.dev/docs/reference/vrl
72    #[configurable(metadata(docs::examples = "./my/program.vrl"))]
73    pub file: Option<PathBuf>,
74
75    /// File paths to the [Vector Remap Language][vrl] (VRL) programs to execute for each event.
76    ///
77    /// If a relative path is provided, its root is the current working directory.
78    ///
79    /// Required if `source` or `file` are missing.
80    ///
81    /// [vrl]: https://vector.dev/docs/reference/vrl
82    #[configurable(metadata(docs::examples = "['./my/program.vrl', './my/program2.vrl']"))]
83    pub files: Option<Vec<PathBuf>>,
84
85    /// When set to `single`, metric tag values are exposed as single strings, the
86    /// same as they were before this config option. Tags with multiple values show the last assigned value, and null values
87    /// are ignored.
88    ///
89    /// When set to `full`, all metric tags are exposed as arrays of either string or null
90    /// values.
91    #[serde(default)]
92    pub metric_tag_values: MetricTagValues,
93
94    /// The name of the timezone to apply to timestamp conversions that do not contain an explicit
95    /// time zone.
96    ///
97    /// This overrides the [global `timezone`][global_timezone] option. The time zone name may be
98    /// any name in the [TZ database][tz_database], or `local` to indicate system local time.
99    ///
100    /// [global_timezone]: https://vector.dev/docs/reference/configuration//global-options#timezone
101    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
102    #[serde(default)]
103    #[configurable(metadata(docs::advanced))]
104    pub timezone: Option<TimeZone>,
105
106    /// Drops any event that encounters an error during processing.
107    ///
108    /// Normally, if a VRL program encounters an error when processing an event, the original,
109    /// unmodified event is sent downstream. In some cases, you may not want to send the event
110    /// any further, such as if certain transformation or enrichment is strictly required. Setting
111    /// `drop_on_error` to `true` allows you to ensure these events do not get processed any
112    /// further.
113    ///
114    /// Additionally, dropped events can potentially be diverted to a specially named output for
115    /// further logging and analysis by setting `reroute_dropped`.
116    #[serde(default = "crate::serde::default_false")]
117    #[configurable(metadata(docs::human_name = "Drop Event on Error"))]
118    pub drop_on_error: bool,
119
120    /// Drops any event that is manually aborted during processing.
121    ///
122    /// If a VRL program is manually aborted (using [`abort`][vrl_docs_abort]) when
123    /// processing an event, this option controls whether the original, unmodified event is sent
124    /// downstream without any modifications or if it is dropped.
125    ///
126    /// Additionally, dropped events can potentially be diverted to a specially-named output for
127    /// further logging and analysis by setting `reroute_dropped`.
128    ///
129    /// [vrl_docs_abort]: https://vector.dev/docs/reference/vrl/expressions/#abort
130    #[serde(default = "crate::serde::default_true")]
131    #[configurable(metadata(docs::human_name = "Drop Event on Abort"))]
132    pub drop_on_abort: bool,
133
134    /// Reroutes dropped events to a named output instead of halting processing on them.
135    ///
136    /// When using `drop_on_error` or `drop_on_abort`, events that are "dropped" are processed no
137    /// further. In some cases, it may be desirable to keep the events around for further analysis,
138    /// debugging, or retrying.
139    ///
140    /// In these cases, `reroute_dropped` can be set to `true` which forwards the original event
141    /// to a specially-named output, `dropped`. The original event is annotated with additional
142    /// fields describing why the event was dropped.
143    #[serde(default = "crate::serde::default_false")]
144    #[configurable(metadata(docs::human_name = "Reroute Dropped Events"))]
145    pub reroute_dropped: bool,
146
147    #[configurable(derived, metadata(docs::hidden))]
148    #[serde(default)]
149    pub runtime: VrlRuntime,
150
151    #[configurable(derived, metadata(docs::hidden))]
152    #[serde(skip)]
153    #[derivative(Debug = "ignore")]
154    /// Cache can't be `BTreeMap` or `HashMap` because of `TableRegistry`, which doesn't allow us to inspect tables inside it.
155    /// And even if we allowed the inspection, the tables can be huge, resulting in a long comparison or hash computation
156    /// while using `Vec` allows us to use just a shallow comparison
157    pub cache: Mutex<Vec<(CacheKey, std::result::Result<CacheValue, String>)>>,
158}
159
160impl Clone for RemapConfig {
161    fn clone(&self) -> Self {
162        Self {
163            source: self.source.clone(),
164            file: self.file.clone(),
165            files: self.files.clone(),
166            metric_tag_values: self.metric_tag_values,
167            timezone: self.timezone,
168            drop_on_error: self.drop_on_error,
169            drop_on_abort: self.drop_on_abort,
170            reroute_dropped: self.reroute_dropped,
171            runtime: self.runtime,
172            cache: Mutex::new(Default::default()),
173        }
174    }
175}
176
177impl RemapConfig {
178    fn compile_vrl_program(
179        &self,
180        enrichment_tables: TableRegistry,
181        merged_schema_definition: schema::Definition,
182    ) -> Result<(Program, String, MeaningList)> {
183        if let Some((_, res)) = self
184            .cache
185            .lock()
186            .expect("Data poisoned")
187            .iter()
188            .find(|v| v.0 .0 == enrichment_tables && v.0 .1 == merged_schema_definition)
189        {
190            return res.clone().map_err(Into::into);
191        }
192
193        let source = match (&self.source, &self.file, &self.files) {
194            (Some(source), None, None) => source.to_owned(),
195            (None, Some(path), None) => Self::read_file(path)?,
196            (None, None, Some(paths)) => {
197                let mut combined_source = String::new();
198                for path in paths {
199                    let content = Self::read_file(path)?;
200                    combined_source.push_str(&content);
201                    combined_source.push('\n');
202                }
203                combined_source
204            }
205            _ => return Err(Box::new(BuildError::SourceAndOrFileOrFiles)),
206        };
207
208        let mut functions = vrl::stdlib::all();
209        functions.append(&mut vector_lib::enrichment::vrl_functions());
210        #[cfg(feature = "sources-dnstap")]
211        functions.append(&mut dnstap_parser::vrl_functions());
212        functions.append(&mut vector_vrl_functions::all());
213
214        let state = TypeState {
215            local: Default::default(),
216            external: ExternalEnv::new_with_kind(
217                merged_schema_definition.event_kind().clone(),
218                merged_schema_definition.metadata_kind().clone(),
219            ),
220        };
221        let mut config = CompileConfig::default();
222
223        config.set_custom(enrichment_tables.clone());
224        config.set_custom(MeaningList::default());
225
226        let res = compile_vrl(&source, &functions, &state, config)
227            .map_err(|diagnostics| Formatter::new(&source, diagnostics).colored().to_string())
228            .map(|result| {
229                (
230                    result.program,
231                    Formatter::new(&source, result.warnings).to_string(),
232                    result.config.get_custom::<MeaningList>().unwrap().clone(),
233                )
234            });
235
236        self.cache
237            .lock()
238            .expect("Data poisoned")
239            .push(((enrichment_tables, merged_schema_definition), res.clone()));
240
241        res.map_err(Into::into)
242    }
243
244    fn read_file(path: &PathBuf) -> Result<String> {
245        let mut buffer = String::new();
246        File::open(path)
247            .with_context(|_| FileOpenFailedSnafu { path })?
248            .read_to_string(&mut buffer)
249            .with_context(|_| FileReadFailedSnafu { path })?;
250        Ok(buffer)
251    }
252}
253
254impl_generate_config_from_default!(RemapConfig);
255
256#[async_trait::async_trait]
257#[typetag::serde(name = "remap")]
258impl TransformConfig for RemapConfig {
259    async fn build(&self, context: &TransformContext) -> Result<Transform> {
260        let (transform, warnings) = match self.runtime {
261            VrlRuntime::Ast => {
262                let (remap, warnings) = Remap::new_ast(self.clone(), context)?;
263                (Transform::synchronous(remap), warnings)
264            }
265        };
266
267        // TODO: We could improve on this by adding support for non-fatal error
268        // messages in the topology. This would make the topology responsible
269        // for printing warnings (including potentially emitting metrics),
270        // instead of individual transforms.
271        if !warnings.is_empty() {
272            warn!(message = "VRL compilation warning.", %warnings);
273        }
274
275        Ok(transform)
276    }
277
278    fn input(&self) -> Input {
279        Input::all()
280    }
281
282    fn outputs(
283        &self,
284        enrichment_tables: vector_lib::enrichment::TableRegistry,
285        input_definitions: &[(OutputId, schema::Definition)],
286        _: LogNamespace,
287    ) -> Vec<TransformOutput> {
288        let merged_definition: Definition = input_definitions
289            .iter()
290            .map(|(_output, definition)| definition.clone())
291            .reduce(Definition::merge)
292            .unwrap_or_else(Definition::any);
293
294        // We need to compile the VRL program in order to know the schema definition output of this
295        // transform. We ignore any compilation errors, as those are caught by the transform build
296        // step.
297        let compiled = self
298            .compile_vrl_program(enrichment_tables, merged_definition)
299            .map(|(program, _, meaning_list)| (program.final_type_info().state, meaning_list.0))
300            .map_err(|_| ());
301
302        let mut dropped_definitions = HashMap::new();
303        let mut default_definitions = HashMap::new();
304
305        for (output_id, input_definition) in input_definitions {
306            let default_definition = compiled
307                .clone()
308                .map(|(state, meaning)| {
309                    let mut new_type_def = Definition::new(
310                        state.external.target_kind().clone(),
311                        state.external.metadata_kind().clone(),
312                        input_definition.log_namespaces().clone(),
313                    );
314
315                    for (id, path) in input_definition.meanings() {
316                        // Attempt to copy over the meanings from the input definition.
317                        // The function will fail if the meaning that now points to a field that no longer exists,
318                        // this is fine since we will no longer want that meaning in the output definition.
319                        let _ = new_type_def.try_with_meaning(path.clone(), id);
320                    }
321
322                    // Apply any semantic meanings set in the VRL program
323                    for (id, path) in meaning {
324                        // currently only event paths are supported
325                        new_type_def = new_type_def.with_meaning(path, &id);
326                    }
327                    new_type_def
328                })
329                .unwrap_or_else(|_| {
330                    Definition::new_with_default_metadata(
331                        // The program failed to compile, so it can "never" return a value
332                        Kind::never(),
333                        input_definition.log_namespaces().clone(),
334                    )
335                });
336
337            // When a message is dropped and re-routed, we keep the original event, but also annotate
338            // it with additional metadata.
339            let dropped_definition = Definition::combine_log_namespaces(
340                input_definition.log_namespaces(),
341                input_definition.clone().with_event_field(
342                    log_schema().metadata_key().expect("valid metadata key"),
343                    Kind::object(BTreeMap::from([
344                        ("reason".into(), Kind::bytes()),
345                        ("message".into(), Kind::bytes()),
346                        ("component_id".into(), Kind::bytes()),
347                        ("component_type".into(), Kind::bytes()),
348                        ("component_kind".into(), Kind::bytes()),
349                    ])),
350                    Some("metadata"),
351                ),
352                input_definition
353                    .clone()
354                    .with_metadata_field(&owned_value_path!("reason"), Kind::bytes(), None)
355                    .with_metadata_field(&owned_value_path!("message"), Kind::bytes(), None)
356                    .with_metadata_field(&owned_value_path!("component_id"), Kind::bytes(), None)
357                    .with_metadata_field(&owned_value_path!("component_type"), Kind::bytes(), None)
358                    .with_metadata_field(&owned_value_path!("component_kind"), Kind::bytes(), None),
359            );
360
361            default_definitions.insert(
362                output_id.clone(),
363                VrlTarget::modify_schema_definition_for_into_events(default_definition),
364            );
365            dropped_definitions.insert(
366                output_id.clone(),
367                VrlTarget::modify_schema_definition_for_into_events(dropped_definition),
368            );
369        }
370
371        let default_output = TransformOutput::new(DataType::all_bits(), default_definitions);
372
373        if self.reroute_dropped {
374            vec![
375                default_output,
376                TransformOutput::new(DataType::all_bits(), dropped_definitions).with_port(DROPPED),
377            ]
378        } else {
379            vec![default_output]
380        }
381    }
382
383    fn enable_concurrency(&self) -> bool {
384        true
385    }
386
387    fn files_to_watch(&self) -> Vec<&PathBuf> {
388        self.file
389            .iter()
390            .chain(self.files.iter().flatten())
391            .collect()
392    }
393}
394
395#[derive(Debug, Clone)]
396pub struct Remap<Runner>
397where
398    Runner: VrlRunner,
399{
400    component_key: Option<ComponentKey>,
401    program: Program,
402    timezone: TimeZone,
403    drop_on_error: bool,
404    drop_on_abort: bool,
405    reroute_dropped: bool,
406    runner: Runner,
407    metric_tag_values: MetricTagValues,
408}
409
410pub trait VrlRunner {
411    fn run(
412        &mut self,
413        target: &mut VrlTarget,
414        program: &Program,
415        timezone: &TimeZone,
416    ) -> std::result::Result<Value, Terminate>;
417}
418
419#[derive(Debug)]
420pub struct AstRunner {
421    pub runtime: Runtime,
422}
423
424impl Clone for AstRunner {
425    fn clone(&self) -> Self {
426        Self {
427            runtime: Runtime::default(),
428        }
429    }
430}
431
432impl VrlRunner for AstRunner {
433    fn run(
434        &mut self,
435        target: &mut VrlTarget,
436        program: &Program,
437        timezone: &TimeZone,
438    ) -> std::result::Result<Value, Terminate> {
439        let result = self.runtime.resolve(target, program, timezone);
440        self.runtime.clear();
441        result
442    }
443}
444
445impl Remap<AstRunner> {
446    pub fn new_ast(
447        config: RemapConfig,
448        context: &TransformContext,
449    ) -> crate::Result<(Self, String)> {
450        let (program, warnings, _) = config.compile_vrl_program(
451            context.enrichment_tables.clone(),
452            context.merged_schema_definition.clone(),
453        )?;
454
455        let runtime = Runtime::default();
456        let runner = AstRunner { runtime };
457
458        Self::new(config, context, program, runner).map(|remap| (remap, warnings))
459    }
460}
461
462impl<Runner> Remap<Runner>
463where
464    Runner: VrlRunner,
465{
466    fn new(
467        config: RemapConfig,
468        context: &TransformContext,
469        program: Program,
470        runner: Runner,
471    ) -> crate::Result<Self> {
472        Ok(Remap {
473            component_key: context.key.clone(),
474            program,
475            timezone: config
476                .timezone
477                .unwrap_or_else(|| context.globals.timezone()),
478            drop_on_error: config.drop_on_error,
479            drop_on_abort: config.drop_on_abort,
480            reroute_dropped: config.reroute_dropped,
481            runner,
482            metric_tag_values: config.metric_tag_values,
483        })
484    }
485
486    #[cfg(test)]
487    const fn runner(&self) -> &Runner {
488        &self.runner
489    }
490
491    fn dropped_data(&self, reason: &str, error: ExpressionError) -> serde_json::Value {
492        let message = error
493            .notes()
494            .iter()
495            .filter(|note| matches!(note, Note::UserErrorMessage(_)))
496            .next_back()
497            .map(|note| note.to_string())
498            .unwrap_or_else(|| error.to_string());
499        serde_json::json!({
500                "reason": reason,
501                "message": message,
502                "component_id": self.component_key,
503                "component_type": "remap",
504                "component_kind": "transform",
505        })
506    }
507
508    fn annotate_dropped(&self, event: &mut Event, reason: &str, error: ExpressionError) {
509        match event {
510            Event::Log(log) => match log.namespace() {
511                LogNamespace::Legacy => {
512                    if let Some(metadata_key) = log_schema().metadata_key() {
513                        log.insert(
514                            (PathPrefix::Event, metadata_key.concat(path!("dropped"))),
515                            self.dropped_data(reason, error),
516                        );
517                    }
518                }
519                LogNamespace::Vector => {
520                    log.insert(
521                        metadata_path!("vector", "dropped"),
522                        self.dropped_data(reason, error),
523                    );
524                }
525            },
526            Event::Metric(metric) => {
527                if let Some(metadata_key) = log_schema().metadata_key() {
528                    metric.replace_tag(format!("{metadata_key}.dropped.reason"), reason.into());
529                    metric.replace_tag(
530                        format!("{metadata_key}.dropped.component_id"),
531                        self.component_key
532                            .as_ref()
533                            .map(ToString::to_string)
534                            .unwrap_or_default(),
535                    );
536                    metric.replace_tag(
537                        format!("{metadata_key}.dropped.component_type"),
538                        "remap".into(),
539                    );
540                    metric.replace_tag(
541                        format!("{metadata_key}.dropped.component_kind"),
542                        "transform".into(),
543                    );
544                }
545            }
546            Event::Trace(trace) => {
547                trace.maybe_insert(log_schema().metadata_key_target_path(), || {
548                    self.dropped_data(reason, error).into()
549                });
550            }
551        }
552    }
553
554    fn run_vrl(&mut self, target: &mut VrlTarget) -> std::result::Result<Value, Terminate> {
555        self.runner.run(target, &self.program, &self.timezone)
556    }
557}
558
559impl<Runner> SyncTransform for Remap<Runner>
560where
561    Runner: VrlRunner + Clone + Send + Sync,
562{
563    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
564        // If a program can fail or abort at runtime and we know that we will still need to forward
565        // the event in that case (either to the main output or `dropped`, depending on the
566        // config), we need to clone the original event and keep it around, to allow us to discard
567        // any mutations made to the event while the VRL program runs, before it failed or aborted.
568        //
569        // The `drop_on_{error, abort}` transform config allows operators to remove events from the
570        // main output if they're failed or aborted, in which case we can skip the cloning, since
571        // any mutations made by VRL will be ignored regardless. If they hav configured
572        // `reroute_dropped`, however, we still need to do the clone to ensure that we can forward
573        // the event to the `dropped` output.
574        let forward_on_error = !self.drop_on_error || self.reroute_dropped;
575        let forward_on_abort = !self.drop_on_abort || self.reroute_dropped;
576        let original_event = if (self.program.info().fallible && forward_on_error)
577            || (self.program.info().abortable && forward_on_abort)
578        {
579            Some(event.clone())
580        } else {
581            None
582        };
583
584        let log_namespace = event
585            .maybe_as_log()
586            .map(|log| log.namespace())
587            .unwrap_or(LogNamespace::Legacy);
588
589        let mut target = VrlTarget::new(
590            event,
591            self.program.info(),
592            match self.metric_tag_values {
593                MetricTagValues::Single => false,
594                MetricTagValues::Full => true,
595            },
596        );
597        let result = self.run_vrl(&mut target);
598
599        match result {
600            Ok(_) => match target.into_events(log_namespace) {
601                TargetEvents::One(event) => push_default(event, output),
602                TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)),
603                TargetEvents::Traces(events) => {
604                    events.for_each(|event| push_default(event, output))
605                }
606            },
607            Err(reason) => {
608                let (reason, error, drop) = match reason {
609                    Terminate::Abort(error) => {
610                        if !self.reroute_dropped {
611                            emit!(RemapMappingAbort {
612                                event_dropped: self.drop_on_abort,
613                            });
614                        }
615                        ("abort", error, self.drop_on_abort)
616                    }
617                    Terminate::Error(error) => {
618                        if !self.reroute_dropped {
619                            emit!(RemapMappingError {
620                                error: error.to_string(),
621                                event_dropped: self.drop_on_error,
622                            });
623                        }
624                        ("error", error, self.drop_on_error)
625                    }
626                };
627
628                if !drop {
629                    let event = original_event.expect("event will be set");
630
631                    push_default(event, output);
632                } else if self.reroute_dropped {
633                    let mut event = original_event.expect("event will be set");
634
635                    self.annotate_dropped(&mut event, reason, error);
636                    push_dropped(event, output);
637                }
638            }
639        }
640    }
641}
642
643#[inline]
644fn push_default(event: Event, output: &mut TransformOutputsBuf) {
645    output.push(None, event)
646}
647
648#[inline]
649fn push_dropped(event: Event, output: &mut TransformOutputsBuf) {
650    output.push(Some(DROPPED), event);
651}
652
653#[derive(Debug, Snafu)]
654pub enum BuildError {
655    #[snafu(display("must provide exactly one of `source` or `file` or `files` configuration"))]
656    SourceAndOrFileOrFiles,
657
658    #[snafu(display("Could not open vrl program {:?}: {}", path, source))]
659    FileOpenFailed { path: PathBuf, source: io::Error },
660    #[snafu(display("Could not read vrl program {:?}: {}", path, source))]
661    FileReadFailed { path: PathBuf, source: io::Error },
662}
663
664#[cfg(test)]
665mod tests {
666    use std::collections::{HashMap, HashSet};
667    use std::sync::Arc;
668
669    use indoc::{formatdoc, indoc};
670    use vector_lib::{config::GlobalOptions, event::EventMetadata, metric_tags};
671    use vrl::value::kind::Collection;
672    use vrl::{btreemap, event_path};
673
674    use super::*;
675    use crate::metrics::Controller;
676    use crate::{
677        config::{build_unit_tests, ConfigBuilder},
678        event::{
679            metric::{MetricKind, MetricValue},
680            LogEvent, Metric, Value,
681        },
682        schema,
683        test_util::components::{
684            assert_transform_compliance, init_test, COMPONENT_MULTIPLE_OUTPUTS_TESTS,
685        },
686        transforms::test::create_topology,
687        transforms::OutputBuffer,
688    };
689    use chrono::DateTime;
690    use tokio::sync::mpsc;
691    use tokio_stream::wrappers::ReceiverStream;
692    use vector_lib::enrichment::TableRegistry;
693
694    fn test_default_schema_definition() -> schema::Definition {
695        schema::Definition::empty_legacy_namespace().with_event_field(
696            &owned_value_path!("a default field"),
697            Kind::integer().or_bytes(),
698            Some("default"),
699        )
700    }
701
702    fn test_dropped_schema_definition() -> schema::Definition {
703        schema::Definition::empty_legacy_namespace().with_event_field(
704            &owned_value_path!("a dropped field"),
705            Kind::boolean().or_null(),
706            Some("dropped"),
707        )
708    }
709
710    fn remap(config: RemapConfig) -> Result<Remap<AstRunner>> {
711        let schema_definitions = HashMap::from([
712            (
713                None,
714                [("source".into(), test_default_schema_definition())].into(),
715            ),
716            (
717                Some(DROPPED.to_owned()),
718                [("source".into(), test_dropped_schema_definition())].into(),
719            ),
720        ]);
721
722        Remap::new_ast(config, &TransformContext::new_test(schema_definitions))
723            .map(|(remap, _)| remap)
724    }
725
726    #[test]
727    fn generate_config() {
728        crate::test_util::test_generate_config::<RemapConfig>();
729    }
730
731    #[test]
732    fn config_missing_source_and_file() {
733        let config = RemapConfig {
734            source: None,
735            file: None,
736            ..Default::default()
737        };
738
739        let err = remap(config).unwrap_err().to_string();
740        assert_eq!(
741            &err,
742            "must provide exactly one of `source` or `file` or `files` configuration"
743        )
744    }
745
746    #[test]
747    fn config_both_source_and_file() {
748        let config = RemapConfig {
749            source: Some("".to_owned()),
750            file: Some("".into()),
751            ..Default::default()
752        };
753
754        let err = remap(config).unwrap_err().to_string();
755        assert_eq!(
756            &err,
757            "must provide exactly one of `source` or `file` or `files` configuration"
758        )
759    }
760
761    fn get_field_string(event: &Event, field: &str) -> String {
762        event
763            .as_log()
764            .get(field)
765            .unwrap()
766            .to_string_lossy()
767            .into_owned()
768    }
769
770    #[test]
771    fn check_remap_doesnt_share_state_between_events() {
772        let conf = RemapConfig {
773            source: Some(".foo = .sentinel".to_string()),
774            file: None,
775            drop_on_error: true,
776            drop_on_abort: false,
777            ..Default::default()
778        };
779        let mut tform = remap(conf).unwrap();
780        assert!(tform.runner().runtime.is_empty());
781
782        let event1 = {
783            let mut event1 = LogEvent::from("event1");
784            event1.insert("sentinel", "bar");
785            Event::from(event1)
786        };
787        let result1 = transform_one(&mut tform, event1).unwrap();
788        assert_eq!(get_field_string(&result1, "message"), "event1");
789        assert_eq!(get_field_string(&result1, "foo"), "bar");
790        assert!(tform.runner().runtime.is_empty());
791
792        let event2 = {
793            let event2 = LogEvent::from("event2");
794            Event::from(event2)
795        };
796        let result2 = transform_one(&mut tform, event2).unwrap();
797        assert_eq!(get_field_string(&result2, "message"), "event2");
798        assert_eq!(result2.as_log().get("foo"), Some(&Value::Null));
799        assert!(tform.runner().runtime.is_empty());
800    }
801
802    #[test]
803    fn remap_return_raw_string_vector_namespace() {
804        let initial_definition = Definition::default_for_namespace(&[LogNamespace::Vector].into());
805
806        let event = {
807            let mut metadata = EventMetadata::default()
808                .with_schema_definition(&Arc::new(initial_definition.clone()));
809            // the Vector metadata field is required for an event to correctly detect the namespace at runtime
810            metadata
811                .value_mut()
812                .insert(&owned_value_path!("vector"), BTreeMap::new());
813
814            let mut event = LogEvent::new_with_metadata(metadata);
815            event.insert("copy_from", "buz");
816            Event::from(event)
817        };
818
819        let conf = RemapConfig {
820            source: Some(r#"  . = "root string";"#.to_string()),
821            file: None,
822            drop_on_error: true,
823            drop_on_abort: false,
824            ..Default::default()
825        };
826        let mut tform = remap(conf.clone()).unwrap();
827        let result = transform_one(&mut tform, event).unwrap();
828        assert_eq!(get_field_string(&result, "."), "root string");
829
830        let mut outputs = conf.outputs(
831            TableRegistry::default(),
832            &[(OutputId::dummy(), initial_definition)],
833            LogNamespace::Vector,
834        );
835
836        assert_eq!(outputs.len(), 1);
837        let output = outputs.pop().unwrap();
838        assert_eq!(output.port, None);
839        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
840        let expected_schema =
841            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
842        assert_eq!(actual_schema_def, expected_schema);
843    }
844
845    #[test]
846    fn check_remap_adds() {
847        let event = {
848            let mut event = LogEvent::from("augment me");
849            event.insert("copy_from", "buz");
850            Event::from(event)
851        };
852
853        let conf = RemapConfig {
854            source: Some(
855                r#"  .foo = "bar"
856  .bar = "baz"
857  .copy = .copy_from
858"#
859                .to_string(),
860            ),
861            file: None,
862            drop_on_error: true,
863            drop_on_abort: false,
864            ..Default::default()
865        };
866        let mut tform = remap(conf).unwrap();
867        let result = transform_one(&mut tform, event).unwrap();
868        assert_eq!(get_field_string(&result, "message"), "augment me");
869        assert_eq!(get_field_string(&result, "copy_from"), "buz");
870        assert_eq!(get_field_string(&result, "foo"), "bar");
871        assert_eq!(get_field_string(&result, "bar"), "baz");
872        assert_eq!(get_field_string(&result, "copy"), "buz");
873    }
874
875    #[test]
876    fn check_remap_emits_multiple() {
877        let event = {
878            let mut event = LogEvent::from("augment me");
879            event.insert(
880                "events",
881                vec![btreemap!("message" => "foo"), btreemap!("message" => "bar")],
882            );
883            Event::from(event)
884        };
885
886        let conf = RemapConfig {
887            source: Some(
888                indoc! {r"
889                . = .events
890            "}
891                .to_owned(),
892            ),
893            file: None,
894            drop_on_error: true,
895            drop_on_abort: false,
896            ..Default::default()
897        };
898        let mut tform = remap(conf).unwrap();
899
900        let out = collect_outputs(&mut tform, event);
901        assert_eq!(2, out.primary.len());
902        let mut result = out.primary.into_events();
903
904        let r = result.next().unwrap();
905        assert_eq!(get_field_string(&r, "message"), "foo");
906        let r = result.next().unwrap();
907        assert_eq!(get_field_string(&r, "message"), "bar");
908    }
909
910    #[test]
911    fn check_remap_error() {
912        let event = {
913            let mut event = Event::Log(LogEvent::from("augment me"));
914            event.as_mut_log().insert("bar", "is a string");
915            event
916        };
917
918        let conf = RemapConfig {
919            source: Some(formatdoc! {r#"
920                .foo = "foo"
921                .not_an_int = int!(.bar)
922                .baz = 12
923            "#}),
924            file: None,
925            drop_on_error: false,
926            drop_on_abort: false,
927            ..Default::default()
928        };
929        let mut tform = remap(conf).unwrap();
930
931        let event = transform_one(&mut tform, event).unwrap();
932
933        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
934        assert!(event.as_log().get("foo").is_none());
935        assert!(event.as_log().get("baz").is_none());
936    }
937
938    #[test]
939    fn check_remap_error_drop() {
940        let event = {
941            let mut event = Event::Log(LogEvent::from("augment me"));
942            event.as_mut_log().insert("bar", "is a string");
943            event
944        };
945
946        let conf = RemapConfig {
947            source: Some(formatdoc! {r#"
948                .foo = "foo"
949                .not_an_int = int!(.bar)
950                .baz = 12
951            "#}),
952            file: None,
953            drop_on_error: true,
954            drop_on_abort: false,
955            ..Default::default()
956        };
957        let mut tform = remap(conf).unwrap();
958
959        assert!(transform_one(&mut tform, event).is_none())
960    }
961
962    #[test]
963    fn check_remap_error_infallible() {
964        let event = {
965            let mut event = Event::Log(LogEvent::from("augment me"));
966            event.as_mut_log().insert("bar", "is a string");
967            event
968        };
969
970        let conf = RemapConfig {
971            source: Some(formatdoc! {r#"
972                .foo = "foo"
973                .baz = 12
974            "#}),
975            file: None,
976            drop_on_error: false,
977            drop_on_abort: false,
978            ..Default::default()
979        };
980        let mut tform = remap(conf).unwrap();
981
982        let event = transform_one(&mut tform, event).unwrap();
983
984        assert_eq!(event.as_log().get("foo"), Some(&Value::from("foo")));
985        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
986        assert_eq!(event.as_log().get("baz"), Some(&Value::from(12)));
987    }
988
989    #[test]
990    fn check_remap_abort() {
991        let event = {
992            let mut event = Event::Log(LogEvent::from("augment me"));
993            event.as_mut_log().insert("bar", "is a string");
994            event
995        };
996
997        let conf = RemapConfig {
998            source: Some(formatdoc! {r#"
999                .foo = "foo"
1000                abort
1001                .baz = 12
1002            "#}),
1003            file: None,
1004            drop_on_error: false,
1005            drop_on_abort: false,
1006            ..Default::default()
1007        };
1008        let mut tform = remap(conf).unwrap();
1009
1010        let event = transform_one(&mut tform, event).unwrap();
1011
1012        assert_eq!(event.as_log().get("bar"), Some(&Value::from("is a string")));
1013        assert!(event.as_log().get("foo").is_none());
1014        assert!(event.as_log().get("baz").is_none());
1015    }
1016
1017    #[test]
1018    fn check_remap_abort_drop() {
1019        let event = {
1020            let mut event = Event::Log(LogEvent::from("augment me"));
1021            event.as_mut_log().insert("bar", "is a string");
1022            event
1023        };
1024
1025        let conf = RemapConfig {
1026            source: Some(formatdoc! {r#"
1027                .foo = "foo"
1028                abort
1029                .baz = 12
1030            "#}),
1031            file: None,
1032            drop_on_error: false,
1033            drop_on_abort: true,
1034            ..Default::default()
1035        };
1036        let mut tform = remap(conf).unwrap();
1037
1038        assert!(transform_one(&mut tform, event).is_none())
1039    }
1040
1041    #[test]
1042    fn check_remap_metric() {
1043        let metric = Event::Metric(Metric::new(
1044            "counter",
1045            MetricKind::Absolute,
1046            MetricValue::Counter { value: 1.0 },
1047        ));
1048        let metadata = metric.metadata().clone();
1049
1050        let conf = RemapConfig {
1051            source: Some(
1052                r#".tags.host = "zoobub"
1053                       .name = "zork"
1054                       .namespace = "zerk"
1055                       .kind = "incremental""#
1056                    .to_string(),
1057            ),
1058            file: None,
1059            drop_on_error: true,
1060            drop_on_abort: false,
1061            ..Default::default()
1062        };
1063        let mut tform = remap(conf).unwrap();
1064
1065        let result = transform_one(&mut tform, metric).unwrap();
1066        assert_eq!(
1067            result,
1068            Event::Metric(
1069                Metric::new_with_metadata(
1070                    "zork",
1071                    MetricKind::Incremental,
1072                    MetricValue::Counter { value: 1.0 },
1073                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1074                    // to the actual value to skip the assertion here
1075                    metadata
1076                )
1077                .with_namespace(Some("zerk"))
1078                .with_tags(Some(metric_tags! {
1079                    "host" => "zoobub",
1080                }))
1081            )
1082        );
1083    }
1084
1085    #[test]
1086    fn remap_timezone_fallback() {
1087        let error = Event::from_json_value(
1088            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1089            LogNamespace::Legacy,
1090        )
1091        .unwrap();
1092        let conf = RemapConfig {
1093            source: Some(formatdoc! {r#"
1094                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1095            "#}),
1096            drop_on_error: true,
1097            drop_on_abort: true,
1098            reroute_dropped: true,
1099            ..Default::default()
1100        };
1101        let context = TransformContext {
1102            key: Some(ComponentKey::from("remapper")),
1103            globals: GlobalOptions {
1104                timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1105                ..Default::default()
1106            },
1107            ..Default::default()
1108        };
1109        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1110
1111        let output = transform_one_fallible(&mut tform, error).unwrap();
1112        let log = output.as_log();
1113        assert_eq!(
1114            log["timestamp"],
1115            DateTime::<chrono::Utc>::from(
1116                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1117            )
1118            .into()
1119        );
1120    }
1121
1122    #[test]
1123    fn remap_timezone_override() {
1124        let error = Event::from_json_value(
1125            serde_json::json!({"timestamp": "2022-12-27 00:00:00"}),
1126            LogNamespace::Legacy,
1127        )
1128        .unwrap();
1129        let conf = RemapConfig {
1130            source: Some(formatdoc! {r#"
1131                .timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
1132            "#}),
1133            drop_on_error: true,
1134            drop_on_abort: true,
1135            reroute_dropped: true,
1136            timezone: Some(TimeZone::parse("America/Los_Angeles").unwrap()),
1137            ..Default::default()
1138        };
1139        let context = TransformContext {
1140            key: Some(ComponentKey::from("remapper")),
1141            globals: GlobalOptions {
1142                timezone: Some(TimeZone::parse("Etc/UTC").unwrap()),
1143                ..Default::default()
1144            },
1145            ..Default::default()
1146        };
1147        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1148
1149        let output = transform_one_fallible(&mut tform, error).unwrap();
1150        let log = output.as_log();
1151        assert_eq!(
1152            log["timestamp"],
1153            DateTime::<chrono::Utc>::from(
1154                DateTime::parse_from_rfc3339("2022-12-27T00:00:00-08:00").unwrap()
1155            )
1156            .into()
1157        );
1158    }
1159
1160    #[test]
1161    fn check_remap_branching() {
1162        let happy =
1163            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1164                .unwrap();
1165        let abort = Event::from_json_value(
1166            serde_json::json!({"hello": "goodbye"}),
1167            LogNamespace::Legacy,
1168        )
1169        .unwrap();
1170        let error =
1171            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1172
1173        let happy_metric = {
1174            let mut metric = Metric::new(
1175                "counter",
1176                MetricKind::Absolute,
1177                MetricValue::Counter { value: 1.0 },
1178            );
1179            metric.replace_tag("hello".into(), "world".into());
1180            Event::Metric(metric)
1181        };
1182
1183        let abort_metric = {
1184            let mut metric = Metric::new(
1185                "counter",
1186                MetricKind::Absolute,
1187                MetricValue::Counter { value: 1.0 },
1188            );
1189            metric.replace_tag("hello".into(), "goodbye".into());
1190            Event::Metric(metric)
1191        };
1192
1193        let error_metric = {
1194            let mut metric = Metric::new(
1195                "counter",
1196                MetricKind::Absolute,
1197                MetricValue::Counter { value: 1.0 },
1198            );
1199            metric.replace_tag("not_hello".into(), "oops".into());
1200            Event::Metric(metric)
1201        };
1202
1203        let conf = RemapConfig {
1204            source: Some(formatdoc! {r#"
1205                if exists(.tags) {{
1206                    # metrics
1207                    .tags.foo = "bar"
1208                    if string!(.tags.hello) == "goodbye" {{
1209                      abort
1210                    }}
1211                }} else {{
1212                    # logs
1213                    .foo = "bar"
1214                    if string(.hello) == "goodbye" {{
1215                      abort
1216                    }}
1217                }}
1218            "#}),
1219            drop_on_error: true,
1220            drop_on_abort: true,
1221            reroute_dropped: true,
1222            ..Default::default()
1223        };
1224        let schema_definitions = HashMap::from([
1225            (
1226                None,
1227                [("source".into(), test_default_schema_definition())].into(),
1228            ),
1229            (
1230                Some(DROPPED.to_owned()),
1231                [("source".into(), test_dropped_schema_definition())].into(),
1232            ),
1233        ]);
1234        let context = TransformContext {
1235            key: Some(ComponentKey::from("remapper")),
1236            schema_definitions,
1237            merged_schema_definition: schema::Definition::new_with_default_metadata(
1238                Kind::any_object(),
1239                [LogNamespace::Legacy],
1240            )
1241            .with_event_field(&owned_value_path!("hello"), Kind::bytes(), None),
1242            ..Default::default()
1243        };
1244        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1245
1246        let output = transform_one_fallible(&mut tform, happy).unwrap();
1247        let log = output.as_log();
1248        assert_eq!(log["hello"], "world".into());
1249        assert_eq!(log["foo"], "bar".into());
1250        assert!(!log.contains(event_path!("metadata")));
1251
1252        let output = transform_one_fallible(&mut tform, abort).unwrap_err();
1253        let log = output.as_log();
1254        assert_eq!(log["hello"], "goodbye".into());
1255        assert!(!log.contains(event_path!("foo")));
1256        assert_eq!(
1257            log["metadata"],
1258            serde_json::json!({
1259                "dropped": {
1260                    "reason": "abort",
1261                    "message": "aborted",
1262                    "component_id": "remapper",
1263                    "component_type": "remap",
1264                    "component_kind": "transform",
1265                }
1266            })
1267            .try_into()
1268            .unwrap()
1269        );
1270
1271        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1272        let log = output.as_log();
1273        assert_eq!(log["hello"], 42.into());
1274        assert!(!log.contains(event_path!("foo")));
1275        assert_eq!(
1276            log["metadata"],
1277            serde_json::json!({
1278                "dropped": {
1279                    "reason": "error",
1280                    "message": "function call error for \"string\" at (160:174): expected string, got integer",
1281                    "component_id": "remapper",
1282                    "component_type": "remap",
1283                    "component_kind": "transform",
1284                }
1285            })
1286            .try_into()
1287            .unwrap()
1288        );
1289
1290        let output = transform_one_fallible(&mut tform, happy_metric).unwrap();
1291        similar_asserts::assert_eq!(
1292            output,
1293            Event::Metric(
1294                Metric::new_with_metadata(
1295                    "counter",
1296                    MetricKind::Absolute,
1297                    MetricValue::Counter { value: 1.0 },
1298                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1299                    // to the actual value to skip the assertion here
1300                    EventMetadata::default()
1301                        .with_schema_definition(output.metadata().schema_definition()),
1302                )
1303                .with_tags(Some(metric_tags! {
1304                    "hello" => "world",
1305                    "foo" => "bar",
1306                }))
1307            )
1308        );
1309
1310        let output = transform_one_fallible(&mut tform, abort_metric).unwrap_err();
1311        similar_asserts::assert_eq!(
1312            output,
1313            Event::Metric(
1314                Metric::new_with_metadata(
1315                    "counter",
1316                    MetricKind::Absolute,
1317                    MetricValue::Counter { value: 1.0 },
1318                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1319                    // to the actual value to skip the assertion here
1320                    EventMetadata::default()
1321                        .with_schema_definition(output.metadata().schema_definition()),
1322                )
1323                .with_tags(Some(metric_tags! {
1324                    "hello" => "goodbye",
1325                    "metadata.dropped.reason" => "abort",
1326                    "metadata.dropped.component_id" => "remapper",
1327                    "metadata.dropped.component_type" => "remap",
1328                    "metadata.dropped.component_kind" => "transform",
1329                }))
1330            )
1331        );
1332
1333        let output = transform_one_fallible(&mut tform, error_metric).unwrap_err();
1334        similar_asserts::assert_eq!(
1335            output,
1336            Event::Metric(
1337                Metric::new_with_metadata(
1338                    "counter",
1339                    MetricKind::Absolute,
1340                    MetricValue::Counter { value: 1.0 },
1341                    // The schema definition is set in the topology, which isn't used in this test. Setting the definition
1342                    // to the actual value to skip the assertion here
1343                    EventMetadata::default()
1344                        .with_schema_definition(output.metadata().schema_definition()),
1345                )
1346                .with_tags(Some(metric_tags! {
1347                    "not_hello" => "oops",
1348                    "metadata.dropped.reason" => "error",
1349                    "metadata.dropped.component_id" => "remapper",
1350                    "metadata.dropped.component_type" => "remap",
1351                    "metadata.dropped.component_kind" => "transform",
1352                }))
1353            )
1354        );
1355    }
1356
1357    #[test]
1358    fn check_remap_branching_assert_with_message() {
1359        let error_trigger_assert_custom_message =
1360            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1361        let error_trigger_default_assert_message =
1362            Event::from_json_value(serde_json::json!({"hello": 0}), LogNamespace::Legacy).unwrap();
1363        let conf = RemapConfig {
1364            source: Some(formatdoc! {r#"
1365                assert_eq!(.hello, 0, "custom message here")
1366                assert_eq!(.hello, 1)
1367            "#}),
1368            drop_on_error: true,
1369            drop_on_abort: true,
1370            reroute_dropped: true,
1371            ..Default::default()
1372        };
1373        let context = TransformContext {
1374            key: Some(ComponentKey::from("remapper")),
1375            ..Default::default()
1376        };
1377        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1378
1379        let output =
1380            transform_one_fallible(&mut tform, error_trigger_assert_custom_message).unwrap_err();
1381        let log = output.as_log();
1382        assert_eq!(log["hello"], 42.into());
1383        assert!(!log.contains(event_path!("foo")));
1384        assert_eq!(
1385            log["metadata"],
1386            serde_json::json!({
1387                "dropped": {
1388                    "reason": "error",
1389                    "message": "custom message here",
1390                    "component_id": "remapper",
1391                    "component_type": "remap",
1392                    "component_kind": "transform",
1393                }
1394            })
1395            .try_into()
1396            .unwrap()
1397        );
1398
1399        let output =
1400            transform_one_fallible(&mut tform, error_trigger_default_assert_message).unwrap_err();
1401        let log = output.as_log();
1402        assert_eq!(log["hello"], 0.into());
1403        assert!(!log.contains(event_path!("foo")));
1404        assert_eq!(
1405            log["metadata"],
1406            serde_json::json!({
1407                "dropped": {
1408                    "reason": "error",
1409                    "message": "function call error for \"assert_eq\" at (45:66): assertion failed: 0 == 1",
1410                    "component_id": "remapper",
1411                    "component_type": "remap",
1412                    "component_kind": "transform",
1413                }
1414            })
1415            .try_into()
1416            .unwrap()
1417        );
1418    }
1419
1420    #[test]
1421    fn check_remap_branching_abort_with_message() {
1422        let error =
1423            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1424        let conf = RemapConfig {
1425            source: Some(formatdoc! {r#"
1426                abort "custom message here"
1427            "#}),
1428            drop_on_error: true,
1429            drop_on_abort: true,
1430            reroute_dropped: true,
1431            ..Default::default()
1432        };
1433        let context = TransformContext {
1434            key: Some(ComponentKey::from("remapper")),
1435            ..Default::default()
1436        };
1437        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1438
1439        let output = transform_one_fallible(&mut tform, error).unwrap_err();
1440        let log = output.as_log();
1441        assert_eq!(log["hello"], 42.into());
1442        assert!(!log.contains(event_path!("foo")));
1443        assert_eq!(
1444            log["metadata"],
1445            serde_json::json!({
1446                "dropped": {
1447                    "reason": "abort",
1448                    "message": "custom message here",
1449                    "component_id": "remapper",
1450                    "component_type": "remap",
1451                    "component_kind": "transform",
1452                }
1453            })
1454            .try_into()
1455            .unwrap()
1456        );
1457    }
1458
1459    #[test]
1460    fn check_remap_branching_disabled() {
1461        let happy =
1462            Event::from_json_value(serde_json::json!({"hello": "world"}), LogNamespace::Legacy)
1463                .unwrap();
1464        let abort = Event::from_json_value(
1465            serde_json::json!({"hello": "goodbye"}),
1466            LogNamespace::Legacy,
1467        )
1468        .unwrap();
1469        let error =
1470            Event::from_json_value(serde_json::json!({"hello": 42}), LogNamespace::Legacy).unwrap();
1471
1472        let conf = RemapConfig {
1473            source: Some(formatdoc! {r#"
1474                if exists(.tags) {{
1475                    # metrics
1476                    .tags.foo = "bar"
1477                    if string!(.tags.hello) == "goodbye" {{
1478                      abort
1479                    }}
1480                }} else {{
1481                    # logs
1482                    .foo = "bar"
1483                    if string!(.hello) == "goodbye" {{
1484                      abort
1485                    }}
1486                }}
1487            "#}),
1488            drop_on_error: true,
1489            drop_on_abort: true,
1490            reroute_dropped: false,
1491            ..Default::default()
1492        };
1493
1494        let schema_definition = schema::Definition::new_with_default_metadata(
1495            Kind::any_object(),
1496            [LogNamespace::Legacy],
1497        )
1498        .with_event_field(&owned_value_path!("foo"), Kind::any(), None)
1499        .with_event_field(&owned_value_path!("tags"), Kind::any(), None);
1500
1501        assert_eq!(
1502            conf.outputs(
1503                vector_lib::enrichment::TableRegistry::default(),
1504                &[(
1505                    "test".into(),
1506                    schema::Definition::new_with_default_metadata(
1507                        Kind::any_object(),
1508                        [LogNamespace::Legacy]
1509                    )
1510                )],
1511                LogNamespace::Legacy
1512            ),
1513            vec![TransformOutput::new(
1514                DataType::all_bits(),
1515                [("test".into(), schema_definition)].into()
1516            )]
1517        );
1518
1519        let context = TransformContext {
1520            key: Some(ComponentKey::from("remapper")),
1521            ..Default::default()
1522        };
1523        let mut tform = Remap::new_ast(conf, &context).unwrap().0;
1524
1525        let output = transform_one_fallible(&mut tform, happy).unwrap();
1526        let log = output.as_log();
1527        assert_eq!(log["hello"], "world".into());
1528        assert_eq!(log["foo"], "bar".into());
1529        assert!(!log.contains(event_path!("metadata")));
1530
1531        let out = collect_outputs(&mut tform, abort);
1532        assert!(out.primary.is_empty());
1533        assert!(out.named[DROPPED].is_empty());
1534
1535        let out = collect_outputs(&mut tform, error);
1536        assert!(out.primary.is_empty());
1537        assert!(out.named[DROPPED].is_empty());
1538    }
1539
1540    #[tokio::test]
1541    async fn check_remap_branching_metrics_with_output() {
1542        init_test();
1543
1544        let config: ConfigBuilder = toml::from_str(indoc! {r#"
1545            [transforms.foo]
1546            inputs = []
1547            type = "remap"
1548            drop_on_abort = true
1549            reroute_dropped = true
1550            source = "abort"
1551
1552            [[tests]]
1553            name = "metric output"
1554
1555            [tests.input]
1556                insert_at = "foo"
1557                value = "none"
1558
1559            [[tests.outputs]]
1560                extract_from = "foo.dropped"
1561                [[tests.outputs.conditions]]
1562                type = "vrl"
1563                source = "true"
1564        "#})
1565        .unwrap();
1566
1567        let mut tests = build_unit_tests(config).await.unwrap();
1568        assert!(tests.remove(0).run().await.errors.is_empty());
1569        // Check that metrics were emitted with output tag
1570        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
1571    }
1572
1573    struct CollectedOuput {
1574        primary: OutputBuffer,
1575        named: HashMap<String, OutputBuffer>,
1576    }
1577
1578    fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput {
1579        let mut outputs = TransformOutputsBuf::new_with_capacity(
1580            vec![
1581                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1582                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1583            ],
1584            1,
1585        );
1586
1587        ft.transform(event, &mut outputs);
1588
1589        CollectedOuput {
1590            primary: outputs.take_primary(),
1591            named: outputs.take_all_named(),
1592        }
1593    }
1594
1595    fn transform_one(ft: &mut dyn SyncTransform, event: Event) -> Option<Event> {
1596        let out = collect_outputs(ft, event);
1597        assert_eq!(0, out.named.values().map(|v| v.len()).sum::<usize>());
1598        assert!(out.primary.len() <= 1);
1599        out.primary.into_events().next()
1600    }
1601
1602    fn transform_one_fallible(
1603        ft: &mut dyn SyncTransform,
1604        event: Event,
1605    ) -> std::result::Result<Event, Event> {
1606        let mut outputs = TransformOutputsBuf::new_with_capacity(
1607            vec![
1608                TransformOutput::new(DataType::all_bits(), HashMap::new()),
1609                TransformOutput::new(DataType::all_bits(), HashMap::new()).with_port(DROPPED),
1610            ],
1611            1,
1612        );
1613
1614        ft.transform(event, &mut outputs);
1615
1616        let mut buf = outputs.drain().collect::<Vec<_>>();
1617        let mut err_buf = outputs.drain_named(DROPPED).collect::<Vec<_>>();
1618
1619        assert!(buf.len() < 2);
1620        assert!(err_buf.len() < 2);
1621        match (buf.pop(), err_buf.pop()) {
1622            (Some(good), None) => Ok(good),
1623            (None, Some(bad)) => Err(bad),
1624            (a, b) => panic!("expected output xor error output, got {a:?} and {b:?}"),
1625        }
1626    }
1627
1628    #[tokio::test]
1629    async fn emits_internal_events() {
1630        assert_transform_compliance(async move {
1631            let config = RemapConfig {
1632                source: Some("abort".to_owned()),
1633                drop_on_abort: true,
1634                ..Default::default()
1635            };
1636
1637            let (tx, rx) = mpsc::channel(1);
1638            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1639
1640            let log = LogEvent::from("hello world");
1641            tx.send(log.into()).await.unwrap();
1642
1643            drop(tx);
1644            topology.stop().await;
1645            assert_eq!(out.recv().await, None);
1646        })
1647        .await
1648    }
1649
1650    #[test]
1651    fn test_combined_transforms_simple() {
1652        // Make sure that when getting the definitions from one transform and
1653        // passing them to another the correct definition is still produced.
1654
1655        // Transform 1 sets a simple value.
1656        let transform1 = RemapConfig {
1657            source: Some(r#".thing = "potato""#.to_string()),
1658            ..Default::default()
1659        };
1660
1661        let transform2 = RemapConfig {
1662            source: Some(".thang = .thing".to_string()),
1663            ..Default::default()
1664        };
1665
1666        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1667
1668        let outputs1 = transform1.outputs(
1669            enrichment_tables.clone(),
1670            &[("in".into(), schema::Definition::default_legacy_namespace())],
1671            LogNamespace::Legacy,
1672        );
1673
1674        assert_eq!(
1675            vec![TransformOutput::new(
1676                DataType::all_bits(),
1677                // The `never` definition should have been passed on to the end.
1678                [(
1679                    "in".into(),
1680                    Definition::default_legacy_namespace().with_event_field(
1681                        &owned_value_path!("thing"),
1682                        Kind::bytes(),
1683                        None
1684                    ),
1685                )]
1686                .into()
1687            )],
1688            outputs1
1689        );
1690
1691        let outputs2 = transform2.outputs(
1692            enrichment_tables,
1693            &[(
1694                "in1".into(),
1695                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1696            )],
1697            LogNamespace::Legacy,
1698        );
1699
1700        assert_eq!(
1701            vec![TransformOutput::new(
1702                DataType::all_bits(),
1703                [(
1704                    "in1".into(),
1705                    Definition::default_legacy_namespace()
1706                        .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None)
1707                        .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None),
1708                )]
1709                .into(),
1710            )],
1711            outputs2
1712        );
1713    }
1714
1715    #[test]
1716    fn test_combined_transforms_unnest() {
1717        // Make sure that when getting the definitions from one transform and
1718        // passing them to another the correct definition is still produced.
1719
1720        // Transform 1 sets a simple value.
1721        let transform1 = RemapConfig {
1722            source: Some(
1723                indoc! {
1724                r#"
1725                .thing = [{"cabbage": 32}, {"parsnips": 45}]
1726                . = unnest(.thing)
1727                "#
1728                }
1729                .to_string(),
1730            ),
1731            ..Default::default()
1732        };
1733
1734        let transform2 = RemapConfig {
1735            source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()),
1736            ..Default::default()
1737        };
1738
1739        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1740
1741        let outputs1 = transform1.outputs(
1742            enrichment_tables.clone(),
1743            &[(
1744                "in".into(),
1745                schema::Definition::new_with_default_metadata(
1746                    Kind::any_object(),
1747                    [LogNamespace::Legacy],
1748                ),
1749            )],
1750            LogNamespace::Legacy,
1751        );
1752
1753        assert_eq!(
1754            vec![TransformOutput::new(
1755                DataType::all_bits(),
1756                [(
1757                    "in".into(),
1758                    Definition::new_with_default_metadata(
1759                        Kind::any_object(),
1760                        [LogNamespace::Legacy]
1761                    )
1762                    .with_event_field(
1763                        &owned_value_path!("thing"),
1764                        Kind::object(Collection::from(BTreeMap::from([
1765                            ("cabbage".into(), Kind::integer().or_undefined(),),
1766                            ("parsnips".into(), Kind::integer().or_undefined(),)
1767                        ]))),
1768                        None
1769                    ),
1770                )]
1771                .into(),
1772            )],
1773            outputs1
1774        );
1775
1776        let outputs2 = transform2.outputs(
1777            enrichment_tables,
1778            &[(
1779                "in1".into(),
1780                outputs1[0].schema_definitions(true)[&"in".into()].clone(),
1781            )],
1782            LogNamespace::Legacy,
1783        );
1784
1785        assert_eq!(
1786            vec![TransformOutput::new(
1787                DataType::all_bits(),
1788                [(
1789                    "in1".into(),
1790                    Definition::default_legacy_namespace()
1791                        .with_event_field(
1792                            &owned_value_path!("thing"),
1793                            Kind::object(Collection::from(BTreeMap::from([
1794                                ("cabbage".into(), Kind::integer().or_undefined(),),
1795                                ("parsnips".into(), Kind::integer().or_undefined(),)
1796                            ]))),
1797                            None
1798                        )
1799                        .with_event_field(
1800                            &owned_value_path!("thang"),
1801                            Kind::integer().or_null(),
1802                            None
1803                        ),
1804                )]
1805                .into(),
1806            )],
1807            outputs2
1808        );
1809    }
1810
1811    #[test]
1812    fn test_transform_abort() {
1813        // An abort should not change the typedef.
1814
1815        let transform1 = RemapConfig {
1816            source: Some(r"abort".to_string()),
1817            ..Default::default()
1818        };
1819
1820        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1821
1822        let outputs1 = transform1.outputs(
1823            enrichment_tables,
1824            &[(
1825                "in".into(),
1826                schema::Definition::new_with_default_metadata(
1827                    Kind::any_object(),
1828                    [LogNamespace::Legacy],
1829                ),
1830            )],
1831            LogNamespace::Legacy,
1832        );
1833
1834        assert_eq!(
1835            vec![TransformOutput::new(
1836                DataType::all_bits(),
1837                [(
1838                    "in".into(),
1839                    Definition::new_with_default_metadata(
1840                        Kind::any_object(),
1841                        [LogNamespace::Legacy]
1842                    ),
1843                )]
1844                .into(),
1845            )],
1846            outputs1
1847        );
1848    }
1849
1850    #[test]
1851    fn test_error_outputs() {
1852        // Even if we fail to compile the VRL it should still output
1853        // the correct ports. This may change if we separate the
1854        // `outputs` function into one returning outputs and a separate
1855        // returning schema definitions.
1856        let transform1 = RemapConfig {
1857            // This enrichment table does not exist.
1858            source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()),
1859            reroute_dropped: true,
1860            ..Default::default()
1861        };
1862
1863        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1864
1865        let outputs1 = transform1.outputs(
1866            enrichment_tables,
1867            &[(
1868                "in".into(),
1869                schema::Definition::new_with_default_metadata(
1870                    Kind::any_object(),
1871                    [LogNamespace::Legacy],
1872                ),
1873            )],
1874            LogNamespace::Legacy,
1875        );
1876
1877        assert_eq!(
1878            HashSet::from([None, Some("dropped".to_string())]),
1879            outputs1
1880                .into_iter()
1881                .map(|output| output.port)
1882                .collect::<HashSet<_>>()
1883        );
1884    }
1885
1886    #[test]
1887    fn test_non_object_events() {
1888        let transform1 = RemapConfig {
1889            // This enrichment table does not exist.
1890            source: Some(r#". = "fish" "#.to_string()),
1891            ..Default::default()
1892        };
1893
1894        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1895
1896        let outputs1 = transform1.outputs(
1897            enrichment_tables,
1898            &[(
1899                "in".into(),
1900                schema::Definition::new_with_default_metadata(
1901                    Kind::any_object(),
1902                    [LogNamespace::Legacy],
1903                ),
1904            )],
1905            LogNamespace::Legacy,
1906        );
1907
1908        let wanted = schema::Definition::new_with_default_metadata(
1909            Kind::object(Collection::from_unknown(Kind::undefined())),
1910            [LogNamespace::Legacy],
1911        )
1912        .with_event_field(&owned_value_path!("message"), Kind::bytes(), None);
1913
1914        assert_eq!(
1915            HashMap::from([(OutputId::from("in"), wanted)]),
1916            outputs1[0].schema_definitions(true),
1917        );
1918    }
1919
1920    #[test]
1921    fn test_array_and_non_object_events() {
1922        let transform1 = RemapConfig {
1923            source: Some(
1924                indoc! {r#"
1925                    if .lizard == true {
1926                        .thing = [{"cabbage": 42}];
1927                        . = unnest(.thing)
1928                    } else {
1929                      . = "fish"
1930                    }
1931                    "#}
1932                .to_string(),
1933            ),
1934            ..Default::default()
1935        };
1936
1937        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
1938
1939        let outputs1 = transform1.outputs(
1940            enrichment_tables,
1941            &[(
1942                "in".into(),
1943                schema::Definition::new_with_default_metadata(
1944                    Kind::any_object(),
1945                    [LogNamespace::Legacy],
1946                ),
1947            )],
1948            LogNamespace::Legacy,
1949        );
1950
1951        let wanted = schema::Definition::new_with_default_metadata(
1952            Kind::any_object(),
1953            [LogNamespace::Legacy],
1954        )
1955        .with_event_field(&owned_value_path!("message"), Kind::any(), None)
1956        .with_event_field(
1957            &owned_value_path!("thing"),
1958            Kind::object(Collection::from(BTreeMap::from([(
1959                "cabbage".into(),
1960                Kind::integer(),
1961            )])))
1962            .or_undefined(),
1963            None,
1964        );
1965
1966        assert_eq!(
1967            HashMap::from([(OutputId::from("in"), wanted)]),
1968            outputs1[0].schema_definitions(true),
1969        );
1970    }
1971
1972    #[test]
1973    fn check_remap_array_vector_namespace() {
1974        let event = {
1975            let mut event = LogEvent::from("input");
1976            // mark the event as a "Vector" namespaced log
1977            event
1978                .metadata_mut()
1979                .value_mut()
1980                .insert("vector", BTreeMap::new());
1981            Event::from(event)
1982        };
1983
1984        let conf = RemapConfig {
1985            source: Some(
1986                r". = [null]
1987"
1988                .to_string(),
1989            ),
1990            file: None,
1991            drop_on_error: true,
1992            drop_on_abort: false,
1993            ..Default::default()
1994        };
1995        let mut tform = remap(conf.clone()).unwrap();
1996        let result = transform_one(&mut tform, event).unwrap();
1997
1998        // Legacy namespace nests this under "message", Vector should set it as the root
1999        assert_eq!(result.as_log().get("."), Some(&Value::Null));
2000
2001        let enrichment_tables = vector_lib::enrichment::TableRegistry::default();
2002        let outputs1 = conf.outputs(
2003            enrichment_tables,
2004            &[(
2005                "in".into(),
2006                schema::Definition::new_with_default_metadata(
2007                    Kind::any_object(),
2008                    [LogNamespace::Vector],
2009                ),
2010            )],
2011            LogNamespace::Vector,
2012        );
2013
2014        let wanted =
2015            schema::Definition::new_with_default_metadata(Kind::null(), [LogNamespace::Vector]);
2016
2017        assert_eq!(
2018            HashMap::from([(OutputId::from("in"), wanted)]),
2019            outputs1[0].schema_definitions(true),
2020        );
2021    }
2022
2023    fn assert_no_metrics(source: String) {
2024        vector_lib::metrics::init_test();
2025
2026        let config = RemapConfig {
2027            source: Some(source),
2028            drop_on_error: true,
2029            drop_on_abort: true,
2030            reroute_dropped: true,
2031            ..Default::default()
2032        };
2033        let mut ast_runner = remap(config).unwrap();
2034        let input_event =
2035            Event::from_json_value(serde_json::json!({"a": 42}), LogNamespace::Vector).unwrap();
2036        let dropped_event = transform_one_fallible(&mut ast_runner, input_event).unwrap_err();
2037        let dropped_log = dropped_event.as_log();
2038        assert_eq!(dropped_log.get(event_path!("a")), Some(&Value::from(42)));
2039
2040        let controller = Controller::get().expect("no controller");
2041        let metrics = controller
2042            .capture_metrics()
2043            .into_iter()
2044            .map(|metric| (metric.name().to_string(), metric))
2045            .collect::<BTreeMap<String, Metric>>();
2046        assert_eq!(metrics.get("component_discarded_events_total"), None);
2047        assert_eq!(metrics.get("component_errors_total"), None);
2048    }
2049    #[test]
2050    fn do_not_emit_metrics_when_dropped() {
2051        assert_no_metrics("abort".to_string());
2052    }
2053
2054    #[test]
2055    fn do_not_emit_metrics_when_errored() {
2056        assert_no_metrics("parse_key_value!(.message)".to_string());
2057    }
2058}