vector/codecs/encoding/
transformer.rs

1#![deny(missing_docs)]
2
3use chrono::{DateTime, Utc};
4use core::fmt::Debug;
5use std::collections::BTreeMap;
6
7use ordered_float::NotNan;
8use serde::{Deserialize, Deserializer};
9use vector_lib::configurable::configurable_component;
10use vector_lib::event::{LogEvent, MaybeAsLogMut};
11use vector_lib::lookup::lookup_v2::ConfigValuePath;
12use vector_lib::lookup::{event_path, PathPrefix};
13use vector_lib::schema::meaning;
14use vrl::path::OwnedValuePath;
15use vrl::value::Value;
16
17use crate::{event::Event, serde::is_default};
18
19/// Transformations to prepare an event for serialization.
20#[configurable_component(no_deser)]
21#[derive(Clone, Debug, Default, PartialEq, Eq)]
22pub struct Transformer {
23    /// List of fields that are included in the encoded event.
24    #[serde(default, skip_serializing_if = "is_default")]
25    only_fields: Option<Vec<ConfigValuePath>>,
26
27    /// List of fields that are excluded from the encoded event.
28    #[serde(default, skip_serializing_if = "is_default")]
29    except_fields: Option<Vec<ConfigValuePath>>,
30
31    /// Format used for timestamp fields.
32    #[serde(default, skip_serializing_if = "is_default")]
33    timestamp_format: Option<TimestampFormat>,
34}
35
36impl<'de> Deserialize<'de> for Transformer {
37    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
38    where
39        D: Deserializer<'de>,
40    {
41        #[derive(Deserialize)]
42        #[serde(deny_unknown_fields)]
43        struct TransformerInner {
44            #[serde(default)]
45            only_fields: Option<Vec<OwnedValuePath>>,
46            #[serde(default)]
47            except_fields: Option<Vec<OwnedValuePath>>,
48            #[serde(default)]
49            timestamp_format: Option<TimestampFormat>,
50        }
51
52        let inner: TransformerInner = Deserialize::deserialize(deserializer)?;
53        Self::new(
54            inner
55                .only_fields
56                .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
57            inner
58                .except_fields
59                .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
60            inner.timestamp_format,
61        )
62        .map_err(serde::de::Error::custom)
63    }
64}
65
66impl Transformer {
67    /// Creates a new `Transformer`.
68    ///
69    /// Returns `Err` if `only_fields` and `except_fields` fail validation, i.e. are not mutually
70    /// exclusive.
71    pub fn new(
72        only_fields: Option<Vec<ConfigValuePath>>,
73        except_fields: Option<Vec<ConfigValuePath>>,
74        timestamp_format: Option<TimestampFormat>,
75    ) -> Result<Self, crate::Error> {
76        Self::validate_fields(only_fields.as_ref(), except_fields.as_ref())?;
77
78        Ok(Self {
79            only_fields,
80            except_fields,
81            timestamp_format,
82        })
83    }
84
85    /// Get the `Transformer`'s `only_fields`.
86    #[cfg(test)]
87    pub const fn only_fields(&self) -> &Option<Vec<ConfigValuePath>> {
88        &self.only_fields
89    }
90
91    /// Get the `Transformer`'s `except_fields`.
92    pub const fn except_fields(&self) -> &Option<Vec<ConfigValuePath>> {
93        &self.except_fields
94    }
95
96    /// Get the `Transformer`'s `timestamp_format`.
97    pub const fn timestamp_format(&self) -> &Option<TimestampFormat> {
98        &self.timestamp_format
99    }
100
101    /// Check if `except_fields` and `only_fields` items are mutually exclusive.
102    ///
103    /// If an error is returned, the entire encoding configuration should be considered inoperable.
104    fn validate_fields(
105        only_fields: Option<&Vec<ConfigValuePath>>,
106        except_fields: Option<&Vec<ConfigValuePath>>,
107    ) -> crate::Result<()> {
108        if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields) {
109            if except_fields
110                .iter()
111                .any(|f| only_fields.iter().any(|v| v == f))
112            {
113                return Err(
114                    "`except_fields` and `only_fields` should be mutually exclusive.".into(),
115                );
116            }
117        }
118        Ok(())
119    }
120
121    /// Prepare an event for serialization by the given transformation rules.
122    pub fn transform(&self, event: &mut Event) {
123        // Rules are currently applied to logs only.
124        if let Some(log) = event.maybe_as_log_mut() {
125            // Ordering in here should not matter.
126            self.apply_except_fields(log);
127            self.apply_only_fields(log);
128            self.apply_timestamp_format(log);
129        }
130    }
131
132    fn apply_only_fields(&self, log: &mut LogEvent) {
133        if let Some(only_fields) = self.only_fields.as_ref() {
134            let mut old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));
135
136            for field in only_fields {
137                if let Some(value) = old_value.remove(field, true) {
138                    log.insert((PathPrefix::Event, field), value);
139                }
140            }
141
142            // We may need the service field to apply tags to emitted metrics after the log message has been pruned. If there
143            // is a service meaning, we move this value to `dropped_fields` in the metadata.
144            // If the field is still in the new log message after pruning it will have been removed from `old_value` above.
145            let service_path = log
146                .metadata()
147                .schema_definition()
148                .meaning_path(meaning::SERVICE);
149            if let Some(service_path) = service_path {
150                let mut new_log = LogEvent::from(old_value);
151                if let Some(service) = new_log.remove(service_path) {
152                    log.metadata_mut()
153                        .add_dropped_field(meaning::SERVICE.into(), service);
154                }
155            }
156        }
157    }
158
159    fn apply_except_fields(&self, log: &mut LogEvent) {
160        if let Some(except_fields) = self.except_fields.as_ref() {
161            for field in except_fields {
162                let value_path = &field.0;
163                let value = log.remove((PathPrefix::Event, value_path));
164
165                let service_path = log
166                    .metadata()
167                    .schema_definition()
168                    .meaning_path(meaning::SERVICE);
169                // If we are removing the service field we need to store this in a `dropped_fields` list as we may need to
170                // refer to this later when emitting metrics.
171                if let (Some(v), Some(service_path)) = (value, service_path) {
172                    if service_path.path == *value_path {
173                        log.metadata_mut()
174                            .add_dropped_field(meaning::SERVICE.into(), v);
175                    }
176                }
177            }
178        }
179    }
180
181    fn format_timestamps<F, T>(&self, log: &mut LogEvent, extract: F)
182    where
183        F: Fn(&DateTime<Utc>) -> T,
184        T: Into<Value>,
185    {
186        if log.value().is_object() {
187            let mut unix_timestamps = Vec::new();
188            for (k, v) in log.all_event_fields().expect("must be an object") {
189                if let Value::Timestamp(ts) = v {
190                    unix_timestamps.push((k.clone(), extract(ts).into()));
191                }
192            }
193            for (k, v) in unix_timestamps {
194                log.parse_path_and_insert(k, v).unwrap();
195            }
196        } else {
197            // root is not an object
198            let timestamp = if let Value::Timestamp(ts) = log.value() {
199                Some(extract(ts))
200            } else {
201                None
202            };
203            if let Some(ts) = timestamp {
204                log.insert(event_path!(), ts.into());
205            }
206        }
207    }
208
209    fn apply_timestamp_format(&self, log: &mut LogEvent) {
210        if let Some(timestamp_format) = self.timestamp_format.as_ref() {
211            match timestamp_format {
212                TimestampFormat::Unix => self.format_timestamps(log, |ts| ts.timestamp()),
213                TimestampFormat::UnixMs => self.format_timestamps(log, |ts| ts.timestamp_millis()),
214                TimestampFormat::UnixUs => self.format_timestamps(log, |ts| ts.timestamp_micros()),
215                TimestampFormat::UnixNs => self.format_timestamps(log, |ts| {
216                    ts.timestamp_nanos_opt().expect("Timestamp out of range")
217                }),
218                TimestampFormat::UnixFloat => self.format_timestamps(log, |ts| {
219                    NotNan::new(ts.timestamp_micros() as f64 / 1e6).unwrap()
220                }),
221                // RFC3339 is the default serialization of a timestamp.
222                TimestampFormat::Rfc3339 => (),
223            }
224        }
225    }
226
227    /// Set the `except_fields` value.
228    ///
229    /// Returns `Err` if the new `except_fields` fail validation, i.e. are not mutually exclusive
230    /// with `only_fields`.
231    #[cfg(test)]
232    pub fn set_except_fields(
233        &mut self,
234        except_fields: Option<Vec<ConfigValuePath>>,
235    ) -> crate::Result<()> {
236        Self::validate_fields(self.only_fields.as_ref(), except_fields.as_ref())?;
237        self.except_fields = except_fields;
238        Ok(())
239    }
240}
241
242#[configurable_component]
243#[derive(Clone, Copy, Debug, Eq, PartialEq)]
244#[serde(rename_all = "snake_case")]
245/// The format in which a timestamp should be represented.
246pub enum TimestampFormat {
247    /// Represent the timestamp as a Unix timestamp.
248    Unix,
249
250    /// Represent the timestamp as a RFC 3339 timestamp.
251    Rfc3339,
252
253    /// Represent the timestamp as a Unix timestamp in milliseconds.
254    UnixMs,
255
256    /// Represent the timestamp as a Unix timestamp in microseconds
257    UnixUs,
258
259    /// Represent the timestamp as a Unix timestamp in nanoseconds.
260    UnixNs,
261
262    /// Represent the timestamp as a Unix timestamp in floating point.
263    UnixFloat,
264}
265
266#[cfg(test)]
267mod tests {
268    use indoc::indoc;
269    use vector_lib::btreemap;
270    use vector_lib::config::{log_schema, LogNamespace};
271    use vector_lib::lookup::path::parse_target_path;
272    use vrl::value::Kind;
273
274    use crate::config::schema;
275
276    use super::*;
277    use std::{collections::BTreeMap, sync::Arc};
278
279    #[test]
280    fn serialize() {
281        let string =
282            r#"{"only_fields":["a.b[0]"],"except_fields":["ignore_me"],"timestamp_format":"unix"}"#;
283
284        let transformer = serde_json::from_str::<Transformer>(string).unwrap();
285
286        let serialized = serde_json::to_string(&transformer).unwrap();
287
288        assert_eq!(string, serialized);
289    }
290
291    #[test]
292    fn serialize_empty() {
293        let string = "{}";
294
295        let transformer = serde_json::from_str::<Transformer>(string).unwrap();
296
297        let serialized = serde_json::to_string(&transformer).unwrap();
298
299        assert_eq!(string, serialized);
300    }
301
302    #[test]
303    fn deserialize_and_transform_except() {
304        let transformer: Transformer =
305            toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d.z", "e"]"#).unwrap();
306        let mut log = LogEvent::default();
307        {
308            log.insert("a", 1);
309            log.insert("a.b", 1);
310            log.insert("a.b.c", 1);
311            log.insert("a.b.d", 1);
312            log.insert("b[0]", 1);
313            log.insert("b[1].x", 1);
314            log.insert("c[0].x", 1);
315            log.insert("c[0].y", 1);
316            log.insert("d.z", 1);
317            log.insert("e.a", 1);
318            log.insert("e.b", 1);
319        }
320        let mut event = Event::from(log);
321        transformer.transform(&mut event);
322        assert!(!event.as_mut_log().contains("a.b.c"));
323        assert!(!event.as_mut_log().contains("b"));
324        assert!(!event.as_mut_log().contains("b[1].x"));
325        assert!(!event.as_mut_log().contains("c[0].y"));
326        assert!(!event.as_mut_log().contains("d.z"));
327        assert!(!event.as_mut_log().contains("e.a"));
328
329        assert!(event.as_mut_log().contains("a.b.d"));
330        assert!(event.as_mut_log().contains("c[0].x"));
331    }
332
333    #[test]
334    fn deserialize_and_transform_only() {
335        let transformer: Transformer =
336            toml::from_str(r#"only_fields = ["a.b.c", "b", "c[0].y", "\"g.z\""]"#).unwrap();
337        let mut log = LogEvent::default();
338        {
339            log.insert("a", 1);
340            log.insert("a.b", 1);
341            log.insert("a.b.c", 1);
342            log.insert("a.b.d", 1);
343            log.insert("b[0]", 1);
344            log.insert("b[1].x", 1);
345            log.insert("c[0].x", 1);
346            log.insert("c[0].y", 1);
347            log.insert("d.y", 1);
348            log.insert("d.z", 1);
349            log.insert("e[0]", 1);
350            log.insert("e[1]", 1);
351            log.insert("\"f.z\"", 1);
352            log.insert("\"g.z\"", 1);
353            log.insert("h", BTreeMap::new());
354            log.insert("i", Vec::<Value>::new());
355        }
356        let mut event = Event::from(log);
357        transformer.transform(&mut event);
358        assert!(event.as_mut_log().contains("a.b.c"));
359        assert!(event.as_mut_log().contains("b"));
360        assert!(event.as_mut_log().contains("b[1].x"));
361        assert!(event.as_mut_log().contains("c[0].y"));
362        assert!(event.as_mut_log().contains("\"g.z\""));
363
364        assert!(!event.as_mut_log().contains("a.b.d"));
365        assert!(!event.as_mut_log().contains("c[0].x"));
366        assert!(!event.as_mut_log().contains("d"));
367        assert!(!event.as_mut_log().contains("e"));
368        assert!(!event.as_mut_log().contains("f"));
369        assert!(!event.as_mut_log().contains("h"));
370        assert!(!event.as_mut_log().contains("i"));
371    }
372
373    #[test]
374    fn deserialize_and_transform_timestamp() {
375        let mut base = Event::Log(LogEvent::from("Demo"));
376        let timestamp = base
377            .as_mut_log()
378            .get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
379            .unwrap()
380            .clone();
381        let timestamp = timestamp.as_timestamp().unwrap();
382        base.as_mut_log()
383            .insert("another", Value::Timestamp(*timestamp));
384
385        let cases = [
386            ("unix", Value::from(timestamp.timestamp())),
387            ("unix_ms", Value::from(timestamp.timestamp_millis())),
388            ("unix_us", Value::from(timestamp.timestamp_micros())),
389            (
390                "unix_ns",
391                Value::from(timestamp.timestamp_nanos_opt().unwrap()),
392            ),
393            (
394                "unix_float",
395                Value::from(timestamp.timestamp_micros() as f64 / 1e6),
396            ),
397        ];
398        for (fmt, expected) in cases {
399            let config: String = format!(r#"timestamp_format = "{fmt}""#);
400            let transformer: Transformer = toml::from_str(&config).unwrap();
401            let mut event = base.clone();
402            transformer.transform(&mut event);
403            let log = event.as_mut_log();
404
405            for actual in [
406                // original key
407                log.get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
408                    .unwrap(),
409                // second key
410                log.get("another").unwrap(),
411            ] {
412                // type matches
413                assert_eq!(expected.kind_str(), actual.kind_str());
414                // value matches
415                assert_eq!(&expected, actual);
416            }
417        }
418    }
419
420    #[test]
421    fn exclusivity_violation() {
422        let config: std::result::Result<Transformer, _> = toml::from_str(indoc! {r#"
423            except_fields = ["Doop"]
424            only_fields = ["Doop"]
425        "#});
426        assert!(config.is_err())
427    }
428
429    #[test]
430    fn deny_unknown_fields() {
431        // We're only checking this explicitly because of our custom deserializer arrangement to
432        // make it possible to throw the exclusivity error during deserialization, to ensure that we
433        // enforce this on the top-level `Transformer` type even though it has to be applied at the
434        // intermediate deserialization stage, on `TransformerInner`.
435        let config: std::result::Result<Transformer, _> = toml::from_str(indoc! {r#"
436            onlyfields = ["Doop"]
437        "#});
438        assert!(config.is_err())
439    }
440
441    #[test]
442    fn only_fields_with_service() {
443        let transformer: Transformer = toml::from_str(r#"only_fields = ["message"]"#).unwrap();
444        let mut log = LogEvent::default();
445        {
446            log.insert("message", 1);
447            log.insert("thing.service", "carrot");
448        }
449
450        let schema = schema::Definition::new_with_default_metadata(
451            Kind::object(btreemap! {
452                "thing" => Kind::object(btreemap! {
453                    "service" => Kind::bytes(),
454                })
455            }),
456            [LogNamespace::Vector],
457        );
458
459        let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
460
461        let mut event = Event::from(log);
462
463        event
464            .metadata_mut()
465            .set_schema_definition(&Arc::new(schema));
466
467        transformer.transform(&mut event);
468        assert!(event.as_mut_log().contains("message"));
469
470        // Event no longer contains the service field.
471        assert!(!event.as_mut_log().contains("thing.service"));
472
473        // But we can still get the service by meaning.
474        assert_eq!(
475            &Value::from("carrot"),
476            event.as_log().get_by_meaning("service").unwrap()
477        );
478    }
479
480    #[test]
481    fn except_fields_with_service() {
482        let transformer: Transformer =
483            toml::from_str(r#"except_fields = ["thing.service"]"#).unwrap();
484        let mut log = LogEvent::default();
485        {
486            log.insert("message", 1);
487            log.insert("thing.service", "carrot");
488        }
489
490        let schema = schema::Definition::new_with_default_metadata(
491            Kind::object(btreemap! {
492                "thing" => Kind::object(btreemap! {
493                    "service" => Kind::bytes(),
494                })
495            }),
496            [LogNamespace::Vector],
497        );
498
499        let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
500
501        let mut event = Event::from(log);
502
503        event
504            .metadata_mut()
505            .set_schema_definition(&Arc::new(schema));
506
507        transformer.transform(&mut event);
508        assert!(event.as_mut_log().contains("message"));
509
510        // Event no longer contains the service field.
511        assert!(!event.as_mut_log().contains("thing.service"));
512
513        // But we can still get the service by meaning.
514        assert_eq!(
515            &Value::from("carrot"),
516            event.as_log().get_by_meaning("service").unwrap()
517        );
518    }
519}