1#![deny(missing_docs)]
2
3use std::collections::BTreeMap;
4
5use chrono::{DateTime, Utc};
6use lookup::{PathPrefix, event_path, lookup_v2::ConfigValuePath};
7use ordered_float::NotNan;
8use serde::{Deserialize, Deserializer};
9use vector_config::configurable_component;
10use vector_core::{
11 event::{Event, LogEvent, MaybeAsLogMut},
12 schema::meaning,
13 serde::is_default,
14};
15use vrl::{path::OwnedValuePath, value::Value};
16
17#[configurable_component(no_deser)]
19#[derive(Clone, Debug, Default, PartialEq, Eq)]
20pub struct Transformer {
21 #[serde(default, skip_serializing_if = "is_default")]
23 only_fields: Option<Vec<ConfigValuePath>>,
24
25 #[serde(default, skip_serializing_if = "is_default")]
27 except_fields: Option<Vec<ConfigValuePath>>,
28
29 #[serde(default, skip_serializing_if = "is_default")]
31 timestamp_format: Option<TimestampFormat>,
32}
33
34impl<'de> Deserialize<'de> for Transformer {
35 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
36 where
37 D: Deserializer<'de>,
38 {
39 #[derive(Deserialize)]
40 #[serde(deny_unknown_fields)]
41 struct TransformerInner {
42 #[serde(default)]
43 only_fields: Option<Vec<OwnedValuePath>>,
44 #[serde(default)]
45 except_fields: Option<Vec<OwnedValuePath>>,
46 #[serde(default)]
47 timestamp_format: Option<TimestampFormat>,
48 }
49
50 let inner: TransformerInner = Deserialize::deserialize(deserializer)?;
51 Self::new(
52 inner
53 .only_fields
54 .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
55 inner
56 .except_fields
57 .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
58 inner.timestamp_format,
59 )
60 .map_err(serde::de::Error::custom)
61 }
62}
63
64impl Transformer {
65 pub fn new(
70 only_fields: Option<Vec<ConfigValuePath>>,
71 except_fields: Option<Vec<ConfigValuePath>>,
72 timestamp_format: Option<TimestampFormat>,
73 ) -> vector_common::Result<Self> {
74 Self::validate_fields(only_fields.as_ref(), except_fields.as_ref())?;
75
76 Ok(Self {
77 only_fields,
78 except_fields,
79 timestamp_format,
80 })
81 }
82
83 #[cfg(any(test, feature = "test"))]
85 pub const fn only_fields(&self) -> &Option<Vec<ConfigValuePath>> {
86 &self.only_fields
87 }
88
89 pub const fn except_fields(&self) -> &Option<Vec<ConfigValuePath>> {
91 &self.except_fields
92 }
93
94 pub const fn timestamp_format(&self) -> &Option<TimestampFormat> {
96 &self.timestamp_format
97 }
98
99 fn validate_fields(
103 only_fields: Option<&Vec<ConfigValuePath>>,
104 except_fields: Option<&Vec<ConfigValuePath>>,
105 ) -> vector_common::Result<()> {
106 if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields)
107 && except_fields
108 .iter()
109 .any(|f| only_fields.iter().any(|v| v == f))
110 {
111 return Err("`except_fields` and `only_fields` should be mutually exclusive.".into());
112 }
113 Ok(())
114 }
115
116 pub fn transform(&self, event: &mut Event) {
118 if let Some(log) = event.maybe_as_log_mut() {
120 self.apply_except_fields(log);
122 self.apply_only_fields(log);
123 self.apply_timestamp_format(log);
124 }
125 }
126
127 fn apply_only_fields(&self, log: &mut LogEvent) {
128 if let Some(only_fields) = self.only_fields.as_ref() {
129 let mut old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));
130
131 for field in only_fields {
132 if let Some(value) = old_value.remove(field, true) {
133 log.insert((PathPrefix::Event, field), value);
134 }
135 }
136
137 let service_path = log
141 .metadata()
142 .schema_definition()
143 .meaning_path(meaning::SERVICE);
144 if let Some(service_path) = service_path {
145 let mut new_log = LogEvent::from(old_value);
146 if let Some(service) = new_log.remove(service_path) {
147 log.metadata_mut()
148 .add_dropped_field(meaning::SERVICE.into(), service);
149 }
150 }
151 }
152 }
153
154 fn apply_except_fields(&self, log: &mut LogEvent) {
155 if let Some(except_fields) = self.except_fields.as_ref() {
156 for field in except_fields {
157 let value_path = &field.0;
158 let value = log.remove((PathPrefix::Event, value_path));
159
160 let service_path = log
161 .metadata()
162 .schema_definition()
163 .meaning_path(meaning::SERVICE);
164 if let (Some(v), Some(service_path)) = (value, service_path)
167 && service_path.path == *value_path
168 {
169 log.metadata_mut()
170 .add_dropped_field(meaning::SERVICE.into(), v);
171 }
172 }
173 }
174 }
175
176 fn format_timestamps<F, T>(&self, log: &mut LogEvent, extract: F)
177 where
178 F: Fn(&DateTime<Utc>) -> T,
179 T: Into<Value>,
180 {
181 if log.value().is_object() {
182 let mut unix_timestamps = Vec::new();
183 for (k, v) in log.all_event_fields().expect("must be an object") {
184 if let Value::Timestamp(ts) = v {
185 unix_timestamps.push((k.clone(), extract(ts).into()));
186 }
187 }
188 for (k, v) in unix_timestamps {
189 log.parse_path_and_insert(k, v)
190 .expect("timestamp fields must allow insertion");
191 }
192 } else {
193 let timestamp = if let Value::Timestamp(ts) = log.value() {
195 Some(extract(ts))
196 } else {
197 None
198 };
199 if let Some(ts) = timestamp {
200 log.insert(event_path!(), ts.into());
201 }
202 }
203 }
204
205 fn apply_timestamp_format(&self, log: &mut LogEvent) {
206 if let Some(timestamp_format) = self.timestamp_format.as_ref() {
207 match timestamp_format {
208 TimestampFormat::Unix => self.format_timestamps(log, |ts| ts.timestamp()),
209 TimestampFormat::UnixMs => self.format_timestamps(log, |ts| ts.timestamp_millis()),
210 TimestampFormat::UnixUs => self.format_timestamps(log, |ts| ts.timestamp_micros()),
211 TimestampFormat::UnixNs => self.format_timestamps(log, |ts| {
212 ts.timestamp_nanos_opt().expect("Timestamp out of range")
213 }),
214 TimestampFormat::UnixFloat => self.format_timestamps(log, |ts| {
215 NotNan::new(ts.timestamp_micros() as f64 / 1e6)
216 .expect("this division will never produce a NaN")
217 }),
218 TimestampFormat::Rfc3339 => (),
220 }
221 }
222 }
223
224 #[cfg(any(test, feature = "test"))]
229 pub fn set_except_fields(
230 &mut self,
231 except_fields: Option<Vec<ConfigValuePath>>,
232 ) -> vector_common::Result<()> {
233 Self::validate_fields(self.only_fields.as_ref(), except_fields.as_ref())?;
234 self.except_fields = except_fields;
235 Ok(())
236 }
237}
238
239#[configurable_component]
240#[derive(Clone, Copy, Debug, Eq, PartialEq)]
241#[serde(rename_all = "snake_case")]
242pub enum TimestampFormat {
244 Unix,
246
247 Rfc3339,
249
250 UnixMs,
252
253 UnixUs,
255
256 UnixNs,
258
259 UnixFloat,
261}
262
263#[cfg(test)]
264mod tests {
265 use std::{collections::BTreeMap, sync::Arc};
266
267 use indoc::indoc;
268 use lookup::path::parse_target_path;
269 use vector_core::{
270 config::{LogNamespace, log_schema},
271 schema,
272 };
273 use vrl::{btreemap, value::Kind};
274
275 use super::*;
276
277 #[test]
278 fn serialize() {
279 let string =
280 r#"{"only_fields":["a.b[0]"],"except_fields":["ignore_me"],"timestamp_format":"unix"}"#;
281
282 let transformer = serde_json::from_str::<Transformer>(string).unwrap();
283
284 let serialized = serde_json::to_string(&transformer).unwrap();
285
286 assert_eq!(string, serialized);
287 }
288
289 #[test]
290 fn serialize_empty() {
291 let string = "{}";
292
293 let transformer = serde_json::from_str::<Transformer>(string).unwrap();
294
295 let serialized = serde_json::to_string(&transformer).unwrap();
296
297 assert_eq!(string, serialized);
298 }
299
300 #[test]
301 fn deserialize_and_transform_except() {
302 let transformer: Transformer =
303 toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d.z", "e"]"#).unwrap();
304 let mut log = LogEvent::default();
305 {
306 log.insert("a", 1);
307 log.insert("a.b", 1);
308 log.insert("a.b.c", 1);
309 log.insert("a.b.d", 1);
310 log.insert("b[0]", 1);
311 log.insert("b[1].x", 1);
312 log.insert("c[0].x", 1);
313 log.insert("c[0].y", 1);
314 log.insert("d.z", 1);
315 log.insert("e.a", 1);
316 log.insert("e.b", 1);
317 }
318 let mut event = Event::from(log);
319 transformer.transform(&mut event);
320 assert!(!event.as_mut_log().contains("a.b.c"));
321 assert!(!event.as_mut_log().contains("b"));
322 assert!(!event.as_mut_log().contains("b[1].x"));
323 assert!(!event.as_mut_log().contains("c[0].y"));
324 assert!(!event.as_mut_log().contains("d.z"));
325 assert!(!event.as_mut_log().contains("e.a"));
326
327 assert!(event.as_mut_log().contains("a.b.d"));
328 assert!(event.as_mut_log().contains("c[0].x"));
329 }
330
331 #[test]
332 fn deserialize_and_transform_only() {
333 let transformer: Transformer =
334 toml::from_str(r#"only_fields = ["a.b.c", "b", "c[0].y", "\"g.z\""]"#).unwrap();
335 let mut log = LogEvent::default();
336 {
337 log.insert("a", 1);
338 log.insert("a.b", 1);
339 log.insert("a.b.c", 1);
340 log.insert("a.b.d", 1);
341 log.insert("b[0]", 1);
342 log.insert("b[1].x", 1);
343 log.insert("c[0].x", 1);
344 log.insert("c[0].y", 1);
345 log.insert("d.y", 1);
346 log.insert("d.z", 1);
347 log.insert("e[0]", 1);
348 log.insert("e[1]", 1);
349 log.insert("\"f.z\"", 1);
350 log.insert("\"g.z\"", 1);
351 log.insert("h", BTreeMap::new());
352 log.insert("i", Vec::<Value>::new());
353 }
354 let mut event = Event::from(log);
355 transformer.transform(&mut event);
356 assert!(event.as_mut_log().contains("a.b.c"));
357 assert!(event.as_mut_log().contains("b"));
358 assert!(event.as_mut_log().contains("b[1].x"));
359 assert!(event.as_mut_log().contains("c[0].y"));
360 assert!(event.as_mut_log().contains("\"g.z\""));
361
362 assert!(!event.as_mut_log().contains("a.b.d"));
363 assert!(!event.as_mut_log().contains("c[0].x"));
364 assert!(!event.as_mut_log().contains("d"));
365 assert!(!event.as_mut_log().contains("e"));
366 assert!(!event.as_mut_log().contains("f"));
367 assert!(!event.as_mut_log().contains("h"));
368 assert!(!event.as_mut_log().contains("i"));
369 }
370
371 #[test]
372 fn deserialize_and_transform_timestamp() {
373 let mut base = Event::Log(LogEvent::from("Demo"));
374 let timestamp = base
375 .as_mut_log()
376 .get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
377 .unwrap()
378 .clone();
379 let timestamp = timestamp.as_timestamp().unwrap();
380 base.as_mut_log()
381 .insert("another", Value::Timestamp(*timestamp));
382
383 let cases = [
384 ("unix", Value::from(timestamp.timestamp())),
385 ("unix_ms", Value::from(timestamp.timestamp_millis())),
386 ("unix_us", Value::from(timestamp.timestamp_micros())),
387 (
388 "unix_ns",
389 Value::from(timestamp.timestamp_nanos_opt().unwrap()),
390 ),
391 (
392 "unix_float",
393 Value::from(timestamp.timestamp_micros() as f64 / 1e6),
394 ),
395 ];
396 for (fmt, expected) in cases {
397 let config: String = format!(r#"timestamp_format = "{fmt}""#);
398 let transformer: Transformer = toml::from_str(&config).unwrap();
399 let mut event = base.clone();
400 transformer.transform(&mut event);
401 let log = event.as_mut_log();
402
403 for actual in [
404 log.get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
406 .unwrap(),
407 log.get("another").unwrap(),
409 ] {
410 assert_eq!(expected.kind_str(), actual.kind_str());
412 assert_eq!(&expected, actual);
414 }
415 }
416 }
417
418 #[test]
419 fn exclusivity_violation() {
420 let config: std::result::Result<Transformer, _> = toml::from_str(indoc! {r#"
421 except_fields = ["Doop"]
422 only_fields = ["Doop"]
423 "#});
424 assert!(config.is_err())
425 }
426
427 #[test]
428 fn deny_unknown_fields() {
429 let config: std::result::Result<Transformer, _> = toml::from_str(indoc! {r#"
434 onlyfields = ["Doop"]
435 "#});
436 assert!(config.is_err())
437 }
438
439 #[test]
440 fn only_fields_with_service() {
441 let transformer: Transformer = toml::from_str(r#"only_fields = ["message"]"#).unwrap();
442 let mut log = LogEvent::default();
443 {
444 log.insert("message", 1);
445 log.insert("thing.service", "carrot");
446 }
447
448 let schema = schema::Definition::new_with_default_metadata(
449 Kind::object(btreemap! {
450 "thing" => Kind::object(btreemap! {
451 "service" => Kind::bytes(),
452 }),
453 }),
454 [LogNamespace::Vector],
455 );
456
457 let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
458
459 let mut event = Event::from(log);
460
461 event
462 .metadata_mut()
463 .set_schema_definition(&Arc::new(schema));
464
465 transformer.transform(&mut event);
466 assert!(event.as_mut_log().contains("message"));
467
468 assert!(!event.as_mut_log().contains("thing.service"));
470
471 assert_eq!(
473 &Value::from("carrot"),
474 event.as_log().get_by_meaning("service").unwrap()
475 );
476 }
477
478 #[test]
479 fn except_fields_with_service() {
480 let transformer: Transformer =
481 toml::from_str(r#"except_fields = ["thing.service"]"#).unwrap();
482 let mut log = LogEvent::default();
483 {
484 log.insert("message", 1);
485 log.insert("thing.service", "carrot");
486 }
487
488 let schema = schema::Definition::new_with_default_metadata(
489 Kind::object(btreemap! {
490 "thing" => Kind::object(btreemap! {
491 "service" => Kind::bytes(),
492 }),
493 }),
494 [LogNamespace::Vector],
495 );
496
497 let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
498
499 let mut event = Event::from(log);
500
501 event
502 .metadata_mut()
503 .set_schema_definition(&Arc::new(schema));
504
505 transformer.transform(&mut event);
506 assert!(event.as_mut_log().contains("message"));
507
508 assert!(!event.as_mut_log().contains("thing.service"));
510
511 assert_eq!(
513 &Value::from("carrot"),
514 event.as_log().get_by_meaning("service").unwrap()
515 );
516 }
517}