codecs/encoding/
transformer.rs

1#![deny(missing_docs)]
2
3use std::collections::BTreeMap;
4
5use chrono::{DateTime, Utc};
6use lookup::{PathPrefix, event_path, lookup_v2::ConfigValuePath};
7use ordered_float::NotNan;
8use serde::{Deserialize, Deserializer};
9use vector_config::configurable_component;
10use vector_core::{
11    event::{Event, LogEvent, MaybeAsLogMut},
12    schema::meaning,
13    serde::is_default,
14};
15use vrl::{path::OwnedValuePath, value::Value};
16
17/// Transformations to prepare an event for serialization.
18#[configurable_component(no_deser)]
19#[derive(Clone, Debug, Default, PartialEq, Eq)]
20pub struct Transformer {
21    /// List of fields that are included in the encoded event.
22    #[serde(default, skip_serializing_if = "is_default")]
23    only_fields: Option<Vec<ConfigValuePath>>,
24
25    /// List of fields that are excluded from the encoded event.
26    #[serde(default, skip_serializing_if = "is_default")]
27    except_fields: Option<Vec<ConfigValuePath>>,
28
29    /// Format used for timestamp fields.
30    #[serde(default, skip_serializing_if = "is_default")]
31    timestamp_format: Option<TimestampFormat>,
32}
33
34impl<'de> Deserialize<'de> for Transformer {
35    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
36    where
37        D: Deserializer<'de>,
38    {
39        #[derive(Deserialize)]
40        #[serde(deny_unknown_fields)]
41        struct TransformerInner {
42            #[serde(default)]
43            only_fields: Option<Vec<OwnedValuePath>>,
44            #[serde(default)]
45            except_fields: Option<Vec<OwnedValuePath>>,
46            #[serde(default)]
47            timestamp_format: Option<TimestampFormat>,
48        }
49
50        let inner: TransformerInner = Deserialize::deserialize(deserializer)?;
51        Self::new(
52            inner
53                .only_fields
54                .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
55            inner
56                .except_fields
57                .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
58            inner.timestamp_format,
59        )
60        .map_err(serde::de::Error::custom)
61    }
62}
63
64impl Transformer {
65    /// Creates a new `Transformer`.
66    ///
67    /// Returns `Err` if `only_fields` and `except_fields` fail validation, i.e. are not mutually
68    /// exclusive.
69    pub fn new(
70        only_fields: Option<Vec<ConfigValuePath>>,
71        except_fields: Option<Vec<ConfigValuePath>>,
72        timestamp_format: Option<TimestampFormat>,
73    ) -> vector_common::Result<Self> {
74        Self::validate_fields(only_fields.as_ref(), except_fields.as_ref())?;
75
76        Ok(Self {
77            only_fields,
78            except_fields,
79            timestamp_format,
80        })
81    }
82
83    /// Get the `Transformer`'s `only_fields`.
84    #[cfg(any(test, feature = "test"))]
85    pub const fn only_fields(&self) -> &Option<Vec<ConfigValuePath>> {
86        &self.only_fields
87    }
88
89    /// Get the `Transformer`'s `except_fields`.
90    pub const fn except_fields(&self) -> &Option<Vec<ConfigValuePath>> {
91        &self.except_fields
92    }
93
94    /// Get the `Transformer`'s `timestamp_format`.
95    pub const fn timestamp_format(&self) -> &Option<TimestampFormat> {
96        &self.timestamp_format
97    }
98
99    /// Check if `except_fields` and `only_fields` items are mutually exclusive.
100    ///
101    /// If an error is returned, the entire encoding configuration should be considered inoperable.
102    fn validate_fields(
103        only_fields: Option<&Vec<ConfigValuePath>>,
104        except_fields: Option<&Vec<ConfigValuePath>>,
105    ) -> vector_common::Result<()> {
106        if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields)
107            && except_fields
108                .iter()
109                .any(|f| only_fields.iter().any(|v| v == f))
110        {
111            return Err("`except_fields` and `only_fields` should be mutually exclusive.".into());
112        }
113        Ok(())
114    }
115
116    /// Prepare an event for serialization by the given transformation rules.
117    pub fn transform(&self, event: &mut Event) {
118        // Rules are currently applied to logs only.
119        if let Some(log) = event.maybe_as_log_mut() {
120            // Ordering in here should not matter.
121            self.apply_except_fields(log);
122            self.apply_only_fields(log);
123            self.apply_timestamp_format(log);
124        }
125    }
126
127    fn apply_only_fields(&self, log: &mut LogEvent) {
128        if let Some(only_fields) = self.only_fields.as_ref() {
129            let mut old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));
130
131            for field in only_fields {
132                if let Some(value) = old_value.remove(field, true) {
133                    log.insert((PathPrefix::Event, field), value);
134                }
135            }
136
137            // We may need the service field to apply tags to emitted metrics after the log message has been pruned. If there
138            // is a service meaning, we move this value to `dropped_fields` in the metadata.
139            // If the field is still in the new log message after pruning it will have been removed from `old_value` above.
140            let service_path = log
141                .metadata()
142                .schema_definition()
143                .meaning_path(meaning::SERVICE);
144            if let Some(service_path) = service_path {
145                let mut new_log = LogEvent::from(old_value);
146                if let Some(service) = new_log.remove(service_path) {
147                    log.metadata_mut()
148                        .add_dropped_field(meaning::SERVICE.into(), service);
149                }
150            }
151        }
152    }
153
154    fn apply_except_fields(&self, log: &mut LogEvent) {
155        if let Some(except_fields) = self.except_fields.as_ref() {
156            for field in except_fields {
157                let value_path = &field.0;
158                let value = log.remove((PathPrefix::Event, value_path));
159
160                let service_path = log
161                    .metadata()
162                    .schema_definition()
163                    .meaning_path(meaning::SERVICE);
164                // If we are removing the service field we need to store this in a `dropped_fields` list as we may need to
165                // refer to this later when emitting metrics.
166                if let (Some(v), Some(service_path)) = (value, service_path)
167                    && service_path.path == *value_path
168                {
169                    log.metadata_mut()
170                        .add_dropped_field(meaning::SERVICE.into(), v);
171                }
172            }
173        }
174    }
175
176    fn format_timestamps<F, T>(&self, log: &mut LogEvent, extract: F)
177    where
178        F: Fn(&DateTime<Utc>) -> T,
179        T: Into<Value>,
180    {
181        if log.value().is_object() {
182            let mut unix_timestamps = Vec::new();
183            for (k, v) in log.all_event_fields().expect("must be an object") {
184                if let Value::Timestamp(ts) = v {
185                    unix_timestamps.push((k.clone(), extract(ts).into()));
186                }
187            }
188            for (k, v) in unix_timestamps {
189                log.parse_path_and_insert(k, v)
190                    .expect("timestamp fields must allow insertion");
191            }
192        } else {
193            // root is not an object
194            let timestamp = if let Value::Timestamp(ts) = log.value() {
195                Some(extract(ts))
196            } else {
197                None
198            };
199            if let Some(ts) = timestamp {
200                log.insert(event_path!(), ts.into());
201            }
202        }
203    }
204
205    fn apply_timestamp_format(&self, log: &mut LogEvent) {
206        if let Some(timestamp_format) = self.timestamp_format.as_ref() {
207            match timestamp_format {
208                TimestampFormat::Unix => self.format_timestamps(log, |ts| ts.timestamp()),
209                TimestampFormat::UnixMs => self.format_timestamps(log, |ts| ts.timestamp_millis()),
210                TimestampFormat::UnixUs => self.format_timestamps(log, |ts| ts.timestamp_micros()),
211                TimestampFormat::UnixNs => self.format_timestamps(log, |ts| {
212                    ts.timestamp_nanos_opt().expect("Timestamp out of range")
213                }),
214                TimestampFormat::UnixFloat => self.format_timestamps(log, |ts| {
215                    NotNan::new(ts.timestamp_micros() as f64 / 1e6)
216                        .expect("this division will never produce a NaN")
217                }),
218                // RFC3339 is the default serialization of a timestamp.
219                TimestampFormat::Rfc3339 => (),
220            }
221        }
222    }
223
224    /// Set the `except_fields` value.
225    ///
226    /// Returns `Err` if the new `except_fields` fail validation, i.e. are not mutually exclusive
227    /// with `only_fields`.
228    #[cfg(any(test, feature = "test"))]
229    pub fn set_except_fields(
230        &mut self,
231        except_fields: Option<Vec<ConfigValuePath>>,
232    ) -> vector_common::Result<()> {
233        Self::validate_fields(self.only_fields.as_ref(), except_fields.as_ref())?;
234        self.except_fields = except_fields;
235        Ok(())
236    }
237}
238
239#[configurable_component]
240#[derive(Clone, Copy, Debug, Eq, PartialEq)]
241#[serde(rename_all = "snake_case")]
242/// The format in which a timestamp should be represented.
243pub enum TimestampFormat {
244    /// Represent the timestamp as a Unix timestamp.
245    Unix,
246
247    /// Represent the timestamp as a RFC 3339 timestamp.
248    Rfc3339,
249
250    /// Represent the timestamp as a Unix timestamp in milliseconds.
251    UnixMs,
252
253    /// Represent the timestamp as a Unix timestamp in microseconds.
254    UnixUs,
255
256    /// Represent the timestamp as a Unix timestamp in nanoseconds.
257    UnixNs,
258
259    /// Represent the timestamp as a Unix timestamp in floating point.
260    UnixFloat,
261}
262
263#[cfg(test)]
264mod tests {
265    use std::{collections::BTreeMap, sync::Arc};
266
267    use indoc::indoc;
268    use lookup::path::parse_target_path;
269    use vector_core::{
270        config::{LogNamespace, log_schema},
271        schema,
272    };
273    use vrl::{btreemap, value::Kind};
274
275    use super::*;
276
277    #[test]
278    fn serialize() {
279        let string =
280            r#"{"only_fields":["a.b[0]"],"except_fields":["ignore_me"],"timestamp_format":"unix"}"#;
281
282        let transformer = serde_json::from_str::<Transformer>(string).unwrap();
283
284        let serialized = serde_json::to_string(&transformer).unwrap();
285
286        assert_eq!(string, serialized);
287    }
288
289    #[test]
290    fn serialize_empty() {
291        let string = "{}";
292
293        let transformer = serde_json::from_str::<Transformer>(string).unwrap();
294
295        let serialized = serde_json::to_string(&transformer).unwrap();
296
297        assert_eq!(string, serialized);
298    }
299
300    #[test]
301    fn deserialize_and_transform_except() {
302        let transformer: Transformer =
303            toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d.z", "e"]"#).unwrap();
304        let mut log = LogEvent::default();
305        {
306            log.insert("a", 1);
307            log.insert("a.b", 1);
308            log.insert("a.b.c", 1);
309            log.insert("a.b.d", 1);
310            log.insert("b[0]", 1);
311            log.insert("b[1].x", 1);
312            log.insert("c[0].x", 1);
313            log.insert("c[0].y", 1);
314            log.insert("d.z", 1);
315            log.insert("e.a", 1);
316            log.insert("e.b", 1);
317        }
318        let mut event = Event::from(log);
319        transformer.transform(&mut event);
320        assert!(!event.as_mut_log().contains("a.b.c"));
321        assert!(!event.as_mut_log().contains("b"));
322        assert!(!event.as_mut_log().contains("b[1].x"));
323        assert!(!event.as_mut_log().contains("c[0].y"));
324        assert!(!event.as_mut_log().contains("d.z"));
325        assert!(!event.as_mut_log().contains("e.a"));
326
327        assert!(event.as_mut_log().contains("a.b.d"));
328        assert!(event.as_mut_log().contains("c[0].x"));
329    }
330
331    #[test]
332    fn deserialize_and_transform_only() {
333        let transformer: Transformer =
334            toml::from_str(r#"only_fields = ["a.b.c", "b", "c[0].y", "\"g.z\""]"#).unwrap();
335        let mut log = LogEvent::default();
336        {
337            log.insert("a", 1);
338            log.insert("a.b", 1);
339            log.insert("a.b.c", 1);
340            log.insert("a.b.d", 1);
341            log.insert("b[0]", 1);
342            log.insert("b[1].x", 1);
343            log.insert("c[0].x", 1);
344            log.insert("c[0].y", 1);
345            log.insert("d.y", 1);
346            log.insert("d.z", 1);
347            log.insert("e[0]", 1);
348            log.insert("e[1]", 1);
349            log.insert("\"f.z\"", 1);
350            log.insert("\"g.z\"", 1);
351            log.insert("h", BTreeMap::new());
352            log.insert("i", Vec::<Value>::new());
353        }
354        let mut event = Event::from(log);
355        transformer.transform(&mut event);
356        assert!(event.as_mut_log().contains("a.b.c"));
357        assert!(event.as_mut_log().contains("b"));
358        assert!(event.as_mut_log().contains("b[1].x"));
359        assert!(event.as_mut_log().contains("c[0].y"));
360        assert!(event.as_mut_log().contains("\"g.z\""));
361
362        assert!(!event.as_mut_log().contains("a.b.d"));
363        assert!(!event.as_mut_log().contains("c[0].x"));
364        assert!(!event.as_mut_log().contains("d"));
365        assert!(!event.as_mut_log().contains("e"));
366        assert!(!event.as_mut_log().contains("f"));
367        assert!(!event.as_mut_log().contains("h"));
368        assert!(!event.as_mut_log().contains("i"));
369    }
370
371    #[test]
372    fn deserialize_and_transform_timestamp() {
373        let mut base = Event::Log(LogEvent::from("Demo"));
374        let timestamp = base
375            .as_mut_log()
376            .get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
377            .unwrap()
378            .clone();
379        let timestamp = timestamp.as_timestamp().unwrap();
380        base.as_mut_log()
381            .insert("another", Value::Timestamp(*timestamp));
382
383        let cases = [
384            ("unix", Value::from(timestamp.timestamp())),
385            ("unix_ms", Value::from(timestamp.timestamp_millis())),
386            ("unix_us", Value::from(timestamp.timestamp_micros())),
387            (
388                "unix_ns",
389                Value::from(timestamp.timestamp_nanos_opt().unwrap()),
390            ),
391            (
392                "unix_float",
393                Value::from(timestamp.timestamp_micros() as f64 / 1e6),
394            ),
395        ];
396        for (fmt, expected) in cases {
397            let config: String = format!(r#"timestamp_format = "{fmt}""#);
398            let transformer: Transformer = toml::from_str(&config).unwrap();
399            let mut event = base.clone();
400            transformer.transform(&mut event);
401            let log = event.as_mut_log();
402
403            for actual in [
404                // original key
405                log.get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
406                    .unwrap(),
407                // second key
408                log.get("another").unwrap(),
409            ] {
410                // type matches
411                assert_eq!(expected.kind_str(), actual.kind_str());
412                // value matches
413                assert_eq!(&expected, actual);
414            }
415        }
416    }
417
418    #[test]
419    fn exclusivity_violation() {
420        let config: std::result::Result<Transformer, _> = toml::from_str(indoc! {r#"
421            except_fields = ["Doop"]
422            only_fields = ["Doop"]
423        "#});
424        assert!(config.is_err())
425    }
426
427    #[test]
428    fn deny_unknown_fields() {
429        // We're only checking this explicitly because of our custom deserializer arrangement to
430        // make it possible to throw the exclusivity error during deserialization, to ensure that we
431        // enforce this on the top-level `Transformer` type even though it has to be applied at the
432        // intermediate deserialization stage, on `TransformerInner`.
433        let config: std::result::Result<Transformer, _> = toml::from_str(indoc! {r#"
434            onlyfields = ["Doop"]
435        "#});
436        assert!(config.is_err())
437    }
438
439    #[test]
440    fn only_fields_with_service() {
441        let transformer: Transformer = toml::from_str(r#"only_fields = ["message"]"#).unwrap();
442        let mut log = LogEvent::default();
443        {
444            log.insert("message", 1);
445            log.insert("thing.service", "carrot");
446        }
447
448        let schema = schema::Definition::new_with_default_metadata(
449            Kind::object(btreemap! {
450                "thing" => Kind::object(btreemap! {
451                    "service" => Kind::bytes(),
452                }),
453            }),
454            [LogNamespace::Vector],
455        );
456
457        let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
458
459        let mut event = Event::from(log);
460
461        event
462            .metadata_mut()
463            .set_schema_definition(&Arc::new(schema));
464
465        transformer.transform(&mut event);
466        assert!(event.as_mut_log().contains("message"));
467
468        // Event no longer contains the service field.
469        assert!(!event.as_mut_log().contains("thing.service"));
470
471        // But we can still get the service by meaning.
472        assert_eq!(
473            &Value::from("carrot"),
474            event.as_log().get_by_meaning("service").unwrap()
475        );
476    }
477
478    #[test]
479    fn except_fields_with_service() {
480        let transformer: Transformer =
481            toml::from_str(r#"except_fields = ["thing.service"]"#).unwrap();
482        let mut log = LogEvent::default();
483        {
484            log.insert("message", 1);
485            log.insert("thing.service", "carrot");
486        }
487
488        let schema = schema::Definition::new_with_default_metadata(
489            Kind::object(btreemap! {
490                "thing" => Kind::object(btreemap! {
491                    "service" => Kind::bytes(),
492                }),
493            }),
494            [LogNamespace::Vector],
495        );
496
497        let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
498
499        let mut event = Event::from(log);
500
501        event
502            .metadata_mut()
503            .set_schema_definition(&Arc::new(schema));
504
505        transformer.transform(&mut event);
506        assert!(event.as_mut_log().contains("message"));
507
508        // Event no longer contains the service field.
509        assert!(!event.as_mut_log().contains("thing.service"));
510
511        // But we can still get the service by meaning.
512        assert_eq!(
513            &Value::from("carrot"),
514            event.as_log().get_by_meaning("service").unwrap()
515        );
516    }
517}