1#![deny(missing_docs)]
2
3use chrono::{DateTime, Utc};
4use core::fmt::Debug;
5use std::collections::BTreeMap;
6
7use ordered_float::NotNan;
8use serde::{Deserialize, Deserializer};
9use vector_lib::configurable::configurable_component;
10use vector_lib::event::{LogEvent, MaybeAsLogMut};
11use vector_lib::lookup::lookup_v2::ConfigValuePath;
12use vector_lib::lookup::{event_path, PathPrefix};
13use vector_lib::schema::meaning;
14use vrl::path::OwnedValuePath;
15use vrl::value::Value;
16
17use crate::{event::Event, serde::is_default};
18
19#[configurable_component(no_deser)]
21#[derive(Clone, Debug, Default, PartialEq, Eq)]
22pub struct Transformer {
23 #[serde(default, skip_serializing_if = "is_default")]
25 only_fields: Option<Vec<ConfigValuePath>>,
26
27 #[serde(default, skip_serializing_if = "is_default")]
29 except_fields: Option<Vec<ConfigValuePath>>,
30
31 #[serde(default, skip_serializing_if = "is_default")]
33 timestamp_format: Option<TimestampFormat>,
34}
35
36impl<'de> Deserialize<'de> for Transformer {
37 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
38 where
39 D: Deserializer<'de>,
40 {
41 #[derive(Deserialize)]
42 #[serde(deny_unknown_fields)]
43 struct TransformerInner {
44 #[serde(default)]
45 only_fields: Option<Vec<OwnedValuePath>>,
46 #[serde(default)]
47 except_fields: Option<Vec<OwnedValuePath>>,
48 #[serde(default)]
49 timestamp_format: Option<TimestampFormat>,
50 }
51
52 let inner: TransformerInner = Deserialize::deserialize(deserializer)?;
53 Self::new(
54 inner
55 .only_fields
56 .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
57 inner
58 .except_fields
59 .map(|v| v.iter().map(|p| ConfigValuePath(p.clone())).collect()),
60 inner.timestamp_format,
61 )
62 .map_err(serde::de::Error::custom)
63 }
64}
65
66impl Transformer {
67 pub fn new(
72 only_fields: Option<Vec<ConfigValuePath>>,
73 except_fields: Option<Vec<ConfigValuePath>>,
74 timestamp_format: Option<TimestampFormat>,
75 ) -> Result<Self, crate::Error> {
76 Self::validate_fields(only_fields.as_ref(), except_fields.as_ref())?;
77
78 Ok(Self {
79 only_fields,
80 except_fields,
81 timestamp_format,
82 })
83 }
84
85 #[cfg(test)]
87 pub const fn only_fields(&self) -> &Option<Vec<ConfigValuePath>> {
88 &self.only_fields
89 }
90
91 pub const fn except_fields(&self) -> &Option<Vec<ConfigValuePath>> {
93 &self.except_fields
94 }
95
96 pub const fn timestamp_format(&self) -> &Option<TimestampFormat> {
98 &self.timestamp_format
99 }
100
101 fn validate_fields(
105 only_fields: Option<&Vec<ConfigValuePath>>,
106 except_fields: Option<&Vec<ConfigValuePath>>,
107 ) -> crate::Result<()> {
108 if let (Some(only_fields), Some(except_fields)) = (only_fields, except_fields) {
109 if except_fields
110 .iter()
111 .any(|f| only_fields.iter().any(|v| v == f))
112 {
113 return Err(
114 "`except_fields` and `only_fields` should be mutually exclusive.".into(),
115 );
116 }
117 }
118 Ok(())
119 }
120
121 pub fn transform(&self, event: &mut Event) {
123 if let Some(log) = event.maybe_as_log_mut() {
125 self.apply_except_fields(log);
127 self.apply_only_fields(log);
128 self.apply_timestamp_format(log);
129 }
130 }
131
132 fn apply_only_fields(&self, log: &mut LogEvent) {
133 if let Some(only_fields) = self.only_fields.as_ref() {
134 let mut old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));
135
136 for field in only_fields {
137 if let Some(value) = old_value.remove(field, true) {
138 log.insert((PathPrefix::Event, field), value);
139 }
140 }
141
142 let service_path = log
146 .metadata()
147 .schema_definition()
148 .meaning_path(meaning::SERVICE);
149 if let Some(service_path) = service_path {
150 let mut new_log = LogEvent::from(old_value);
151 if let Some(service) = new_log.remove(service_path) {
152 log.metadata_mut()
153 .add_dropped_field(meaning::SERVICE.into(), service);
154 }
155 }
156 }
157 }
158
159 fn apply_except_fields(&self, log: &mut LogEvent) {
160 if let Some(except_fields) = self.except_fields.as_ref() {
161 for field in except_fields {
162 let value_path = &field.0;
163 let value = log.remove((PathPrefix::Event, value_path));
164
165 let service_path = log
166 .metadata()
167 .schema_definition()
168 .meaning_path(meaning::SERVICE);
169 if let (Some(v), Some(service_path)) = (value, service_path) {
172 if service_path.path == *value_path {
173 log.metadata_mut()
174 .add_dropped_field(meaning::SERVICE.into(), v);
175 }
176 }
177 }
178 }
179 }
180
181 fn format_timestamps<F, T>(&self, log: &mut LogEvent, extract: F)
182 where
183 F: Fn(&DateTime<Utc>) -> T,
184 T: Into<Value>,
185 {
186 if log.value().is_object() {
187 let mut unix_timestamps = Vec::new();
188 for (k, v) in log.all_event_fields().expect("must be an object") {
189 if let Value::Timestamp(ts) = v {
190 unix_timestamps.push((k.clone(), extract(ts).into()));
191 }
192 }
193 for (k, v) in unix_timestamps {
194 log.parse_path_and_insert(k, v).unwrap();
195 }
196 } else {
197 let timestamp = if let Value::Timestamp(ts) = log.value() {
199 Some(extract(ts))
200 } else {
201 None
202 };
203 if let Some(ts) = timestamp {
204 log.insert(event_path!(), ts.into());
205 }
206 }
207 }
208
209 fn apply_timestamp_format(&self, log: &mut LogEvent) {
210 if let Some(timestamp_format) = self.timestamp_format.as_ref() {
211 match timestamp_format {
212 TimestampFormat::Unix => self.format_timestamps(log, |ts| ts.timestamp()),
213 TimestampFormat::UnixMs => self.format_timestamps(log, |ts| ts.timestamp_millis()),
214 TimestampFormat::UnixUs => self.format_timestamps(log, |ts| ts.timestamp_micros()),
215 TimestampFormat::UnixNs => self.format_timestamps(log, |ts| {
216 ts.timestamp_nanos_opt().expect("Timestamp out of range")
217 }),
218 TimestampFormat::UnixFloat => self.format_timestamps(log, |ts| {
219 NotNan::new(ts.timestamp_micros() as f64 / 1e6).unwrap()
220 }),
221 TimestampFormat::Rfc3339 => (),
223 }
224 }
225 }
226
227 #[cfg(test)]
232 pub fn set_except_fields(
233 &mut self,
234 except_fields: Option<Vec<ConfigValuePath>>,
235 ) -> crate::Result<()> {
236 Self::validate_fields(self.only_fields.as_ref(), except_fields.as_ref())?;
237 self.except_fields = except_fields;
238 Ok(())
239 }
240}
241
242#[configurable_component]
243#[derive(Clone, Copy, Debug, Eq, PartialEq)]
244#[serde(rename_all = "snake_case")]
245pub enum TimestampFormat {
247 Unix,
249
250 Rfc3339,
252
253 UnixMs,
255
256 UnixUs,
258
259 UnixNs,
261
262 UnixFloat,
264}
265
266#[cfg(test)]
267mod tests {
268 use indoc::indoc;
269 use vector_lib::btreemap;
270 use vector_lib::config::{log_schema, LogNamespace};
271 use vector_lib::lookup::path::parse_target_path;
272 use vrl::value::Kind;
273
274 use crate::config::schema;
275
276 use super::*;
277 use std::{collections::BTreeMap, sync::Arc};
278
279 #[test]
280 fn serialize() {
281 let string =
282 r#"{"only_fields":["a.b[0]"],"except_fields":["ignore_me"],"timestamp_format":"unix"}"#;
283
284 let transformer = serde_json::from_str::<Transformer>(string).unwrap();
285
286 let serialized = serde_json::to_string(&transformer).unwrap();
287
288 assert_eq!(string, serialized);
289 }
290
291 #[test]
292 fn serialize_empty() {
293 let string = "{}";
294
295 let transformer = serde_json::from_str::<Transformer>(string).unwrap();
296
297 let serialized = serde_json::to_string(&transformer).unwrap();
298
299 assert_eq!(string, serialized);
300 }
301
302 #[test]
303 fn deserialize_and_transform_except() {
304 let transformer: Transformer =
305 toml::from_str(r#"except_fields = ["a.b.c", "b", "c[0].y", "d.z", "e"]"#).unwrap();
306 let mut log = LogEvent::default();
307 {
308 log.insert("a", 1);
309 log.insert("a.b", 1);
310 log.insert("a.b.c", 1);
311 log.insert("a.b.d", 1);
312 log.insert("b[0]", 1);
313 log.insert("b[1].x", 1);
314 log.insert("c[0].x", 1);
315 log.insert("c[0].y", 1);
316 log.insert("d.z", 1);
317 log.insert("e.a", 1);
318 log.insert("e.b", 1);
319 }
320 let mut event = Event::from(log);
321 transformer.transform(&mut event);
322 assert!(!event.as_mut_log().contains("a.b.c"));
323 assert!(!event.as_mut_log().contains("b"));
324 assert!(!event.as_mut_log().contains("b[1].x"));
325 assert!(!event.as_mut_log().contains("c[0].y"));
326 assert!(!event.as_mut_log().contains("d.z"));
327 assert!(!event.as_mut_log().contains("e.a"));
328
329 assert!(event.as_mut_log().contains("a.b.d"));
330 assert!(event.as_mut_log().contains("c[0].x"));
331 }
332
333 #[test]
334 fn deserialize_and_transform_only() {
335 let transformer: Transformer =
336 toml::from_str(r#"only_fields = ["a.b.c", "b", "c[0].y", "\"g.z\""]"#).unwrap();
337 let mut log = LogEvent::default();
338 {
339 log.insert("a", 1);
340 log.insert("a.b", 1);
341 log.insert("a.b.c", 1);
342 log.insert("a.b.d", 1);
343 log.insert("b[0]", 1);
344 log.insert("b[1].x", 1);
345 log.insert("c[0].x", 1);
346 log.insert("c[0].y", 1);
347 log.insert("d.y", 1);
348 log.insert("d.z", 1);
349 log.insert("e[0]", 1);
350 log.insert("e[1]", 1);
351 log.insert("\"f.z\"", 1);
352 log.insert("\"g.z\"", 1);
353 log.insert("h", BTreeMap::new());
354 log.insert("i", Vec::<Value>::new());
355 }
356 let mut event = Event::from(log);
357 transformer.transform(&mut event);
358 assert!(event.as_mut_log().contains("a.b.c"));
359 assert!(event.as_mut_log().contains("b"));
360 assert!(event.as_mut_log().contains("b[1].x"));
361 assert!(event.as_mut_log().contains("c[0].y"));
362 assert!(event.as_mut_log().contains("\"g.z\""));
363
364 assert!(!event.as_mut_log().contains("a.b.d"));
365 assert!(!event.as_mut_log().contains("c[0].x"));
366 assert!(!event.as_mut_log().contains("d"));
367 assert!(!event.as_mut_log().contains("e"));
368 assert!(!event.as_mut_log().contains("f"));
369 assert!(!event.as_mut_log().contains("h"));
370 assert!(!event.as_mut_log().contains("i"));
371 }
372
373 #[test]
374 fn deserialize_and_transform_timestamp() {
375 let mut base = Event::Log(LogEvent::from("Demo"));
376 let timestamp = base
377 .as_mut_log()
378 .get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
379 .unwrap()
380 .clone();
381 let timestamp = timestamp.as_timestamp().unwrap();
382 base.as_mut_log()
383 .insert("another", Value::Timestamp(*timestamp));
384
385 let cases = [
386 ("unix", Value::from(timestamp.timestamp())),
387 ("unix_ms", Value::from(timestamp.timestamp_millis())),
388 ("unix_us", Value::from(timestamp.timestamp_micros())),
389 (
390 "unix_ns",
391 Value::from(timestamp.timestamp_nanos_opt().unwrap()),
392 ),
393 (
394 "unix_float",
395 Value::from(timestamp.timestamp_micros() as f64 / 1e6),
396 ),
397 ];
398 for (fmt, expected) in cases {
399 let config: String = format!(r#"timestamp_format = "{fmt}""#);
400 let transformer: Transformer = toml::from_str(&config).unwrap();
401 let mut event = base.clone();
402 transformer.transform(&mut event);
403 let log = event.as_mut_log();
404
405 for actual in [
406 log.get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
408 .unwrap(),
409 log.get("another").unwrap(),
411 ] {
412 assert_eq!(expected.kind_str(), actual.kind_str());
414 assert_eq!(&expected, actual);
416 }
417 }
418 }
419
420 #[test]
421 fn exclusivity_violation() {
422 let config: std::result::Result<Transformer, _> = toml::from_str(indoc! {r#"
423 except_fields = ["Doop"]
424 only_fields = ["Doop"]
425 "#});
426 assert!(config.is_err())
427 }
428
429 #[test]
430 fn deny_unknown_fields() {
431 let config: std::result::Result<Transformer, _> = toml::from_str(indoc! {r#"
436 onlyfields = ["Doop"]
437 "#});
438 assert!(config.is_err())
439 }
440
441 #[test]
442 fn only_fields_with_service() {
443 let transformer: Transformer = toml::from_str(r#"only_fields = ["message"]"#).unwrap();
444 let mut log = LogEvent::default();
445 {
446 log.insert("message", 1);
447 log.insert("thing.service", "carrot");
448 }
449
450 let schema = schema::Definition::new_with_default_metadata(
451 Kind::object(btreemap! {
452 "thing" => Kind::object(btreemap! {
453 "service" => Kind::bytes(),
454 })
455 }),
456 [LogNamespace::Vector],
457 );
458
459 let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
460
461 let mut event = Event::from(log);
462
463 event
464 .metadata_mut()
465 .set_schema_definition(&Arc::new(schema));
466
467 transformer.transform(&mut event);
468 assert!(event.as_mut_log().contains("message"));
469
470 assert!(!event.as_mut_log().contains("thing.service"));
472
473 assert_eq!(
475 &Value::from("carrot"),
476 event.as_log().get_by_meaning("service").unwrap()
477 );
478 }
479
480 #[test]
481 fn except_fields_with_service() {
482 let transformer: Transformer =
483 toml::from_str(r#"except_fields = ["thing.service"]"#).unwrap();
484 let mut log = LogEvent::default();
485 {
486 log.insert("message", 1);
487 log.insert("thing.service", "carrot");
488 }
489
490 let schema = schema::Definition::new_with_default_metadata(
491 Kind::object(btreemap! {
492 "thing" => Kind::object(btreemap! {
493 "service" => Kind::bytes(),
494 })
495 }),
496 [LogNamespace::Vector],
497 );
498
499 let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
500
501 let mut event = Event::from(log);
502
503 event
504 .metadata_mut()
505 .set_schema_definition(&Arc::new(schema));
506
507 transformer.transform(&mut event);
508 assert!(event.as_mut_log().contains("message"));
509
510 assert!(!event.as_mut_log().contains("thing.service"));
512
513 assert_eq!(
515 &Value::from("carrot"),
516 event.as_log().get_by_meaning("service").unwrap()
517 );
518 }
519}