codecs/encoding/format/
cef.rs

1use crate::encoding::BuildError;
2use bytes::BytesMut;
3use chrono::SecondsFormat;
4use lookup::lookup_v2::ConfigTargetPath;
5use snafu::Snafu;
6use std::num::ParseIntError;
7use std::{collections::HashMap, fmt::Write};
8use tokio_util::codec::Encoder;
9use vector_config_macros::configurable_component;
10use vector_core::{
11    config::DataType,
12    event::{Event, LogEvent, Value},
13    schema,
14};
15
16const DEFAULT_DEVICE_VENDOR: &str = "Datadog";
17const DEFAULT_DEVICE_PRODUCT: &str = "Vector";
18// Major version of Vector.
19// TODO: find a way to get the actual vector version.
20//  The version should be the actual vector version, but it's not possible
21//  to get it from the config.
22const DEFAULT_DEVICE_VERSION: &str = "0";
23const DEFAULT_EVENT_CLASS_ID: &str = "Telemetry Event";
24const DEVICE_VENDOR_MAX_LENGTH: usize = 63;
25const DEVICE_PRODUCT_MAX_LENGTH: usize = 63;
26const DEVICE_VERSION_MAX_LENGTH: usize = 31;
27const DEVICE_EVENT_CLASS_ID_MAX_LENGTH: usize = 1023;
28const NAME_MAX_LENGTH: usize = 512;
29const SEVERITY_MAX: u8 = 10;
30
31/// Represents the device settings in the CEF format.
32#[derive(Debug, Clone)]
33pub struct DeviceSettings {
34    pub vendor: String,
35    pub product: String,
36    pub version: String,
37    pub event_class_id: String,
38}
39
40impl DeviceSettings {
41    /// Creates a new `DeviceSettings`.
42    pub const fn new(
43        vendor: String,
44        product: String,
45        version: String,
46        event_class_id: String,
47    ) -> Self {
48        Self {
49            vendor,
50            product,
51            version,
52            event_class_id,
53        }
54    }
55}
56
57/// Errors that can occur during CEF serialization.
58#[derive(Debug, Snafu)]
59pub enum CefSerializerError {
60    #[snafu(display(
61        r#"LogEvent field "{}" with the value "{}" exceed {} characters limit: actual {}"#,
62        field_name,
63        field,
64        max_length,
65        actual_length
66    ))]
67    ExceededLength {
68        field: String,
69        field_name: String,
70        max_length: usize,
71        actual_length: usize,
72    },
73    #[snafu(display(
74        r#"LogEvent CEF severity must be a number from 0 to {}: actual {}"#,
75        max_value,
76        actual_value
77    ))]
78    SeverityMaxValue { max_value: u8, actual_value: u8 },
79    #[snafu(display(r#"LogEvent CEF severity must be a number: {}"#, error))]
80    SeverityNumberType { error: ParseIntError },
81    #[snafu(display(r#"LogEvent extension keys can only contain ascii alphabetical characters: invalid key "{}""#, key))]
82    ExtensionNonASCIIKey { key: String },
83}
84
85/// Config used to build a `CefSerializer`.
86#[configurable_component]
87#[derive(Debug, Clone)]
88pub struct CefSerializerConfig {
89    /// The CEF Serializer Options.
90    pub cef: CefSerializerOptions,
91}
92
93impl CefSerializerConfig {
94    /// Creates a new `CefSerializerConfig`.
95    pub const fn new(cef: CefSerializerOptions) -> Self {
96        Self { cef }
97    }
98
99    /// Build the `CefSerializer` from this configuration.
100    pub fn build(&self) -> Result<CefSerializer, BuildError> {
101        let device_vendor = validate_length(
102            &self.cef.device_vendor,
103            "device_vendor",
104            DEVICE_VENDOR_MAX_LENGTH,
105        )?;
106        let device_product = validate_length(
107            &self.cef.device_product,
108            "device_product",
109            DEVICE_PRODUCT_MAX_LENGTH,
110        )?;
111        let device_version = validate_length(
112            &self.cef.device_version,
113            "device_version",
114            DEVICE_VERSION_MAX_LENGTH,
115        )?;
116        let device_event_class_id = validate_length(
117            &self.cef.device_event_class_id,
118            "device_event_class_id",
119            DEVICE_EVENT_CLASS_ID_MAX_LENGTH,
120        )?;
121
122        let invalid_keys: Vec<String> = self
123            .cef
124            .extensions
125            .keys()
126            .filter(|key| !key.chars().all(|c| c.is_ascii_alphabetic()))
127            .cloned()
128            .collect();
129
130        if !invalid_keys.is_empty() {
131            return ExtensionNonASCIIKeySnafu {
132                key: invalid_keys.join(", "),
133            }
134            .fail()
135            .map_err(|e| e.to_string().into());
136        }
137
138        let device = DeviceSettings::new(
139            device_vendor,
140            device_product,
141            device_version,
142            device_event_class_id,
143        );
144
145        Ok(CefSerializer::new(
146            self.cef.version.clone(),
147            device,
148            self.cef.severity.clone(),
149            self.cef.name.clone(),
150            self.cef.extensions.clone(),
151        ))
152    }
153
154    /// The data type of events that are accepted by `CefSerializer`.
155    pub fn input_type(&self) -> DataType {
156        DataType::Log
157    }
158
159    /// The schema required by the serializer.
160    pub fn schema_requirement(&self) -> schema::Requirement {
161        // While technically we support `Value` variants that can't be losslessly serialized to
162        // CEF, we don't want to enforce that limitation to users yet.
163        schema::Requirement::empty()
164    }
165}
166
167/// CEF version.
168#[configurable_component]
169#[derive(Debug, Default, Clone)]
170pub enum Version {
171    #[default]
172    /// CEF specification version 0.1.
173    V0,
174    /// CEF specification version 1.x.
175    V1,
176}
177
178impl Version {
179    fn as_str(&self) -> &'static str {
180        match self {
181            Version::V0 => "0",
182            Version::V1 => "1",
183        }
184    }
185}
186
187impl std::fmt::Display for Version {
188    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189        write!(f, "{}", self.as_str())
190    }
191}
192
193/// Config used to build a `CefSerializer`.
194#[configurable_component]
195#[derive(Debug, Clone)]
196pub struct CefSerializerOptions {
197    /// CEF Version. Can be either 0 or 1.
198    /// Set to "0" by default.
199    pub version: Version,
200
201    /// Identifies the vendor of the product.
202    /// The part of a unique device identifier. No two products can use the same combination of device vendor and device product.
203    /// The value length must be less than or equal to 63.
204    pub device_vendor: String,
205
206    /// Identifies the product of a vendor.
207    /// The part of a unique device identifier. No two products can use the same combination of device vendor and device product.
208    /// The value length must be less than or equal to 63.
209    pub device_product: String,
210
211    /// Identifies the version of the problem. The combination of the device product, vendor and this value make up the unique id of the device that sends messages.
212    /// The value length must be less than or equal to 31.
213    pub device_version: String,
214
215    /// Unique identifier for each event type. Identifies the type of event reported.
216    /// The value length must be less than or equal to 1023.
217    pub device_event_class_id: String,
218
219    /// This is a path that points to the field of a log event that reflects importance of the event.
220    /// Reflects importance of the event.
221    ///
222    /// It must point to a number from 0 to 10.
223    /// 0 = lowest_importance, 10 = highest_importance.
224    /// Set to "cef.severity" by default.
225    pub severity: ConfigTargetPath,
226
227    /// This is a path that points to the human-readable description of a log event.
228    /// The value length must be less than or equal to 512.
229    /// Equals "cef.name" by default.
230    pub name: ConfigTargetPath,
231
232    /// The collection of key-value pairs. Keys are the keys of the extensions, and values are paths that point to the extension values of a log event.
233    /// The event can have any number of key-value pairs in any order.
234    #[configurable(metadata(
235        docs::additional_props_description = "This is a path that points to the extension value of a log event."
236    ))]
237    pub extensions: HashMap<String, ConfigTargetPath>,
238    // TODO: use Template instead of ConfigTargetPath.
239    //   Templates are in the src/ package, and codes are in the lib/codecs.
240    //   Moving the Template to the lib/ package in order to prevent the circular dependency.
241}
242
243impl Default for CefSerializerOptions {
244    fn default() -> Self {
245        Self {
246            version: Version::default(),
247            device_vendor: String::from(DEFAULT_DEVICE_VENDOR),
248            device_product: String::from(DEFAULT_DEVICE_PRODUCT),
249            device_version: String::from(DEFAULT_DEVICE_VERSION),
250            device_event_class_id: String::from(DEFAULT_EVENT_CLASS_ID),
251            severity: ConfigTargetPath::try_from("cef.severity".to_string())
252                .expect("could not parse path"),
253            name: ConfigTargetPath::try_from("cef.name".to_string()).expect("could not parse path"),
254            extensions: HashMap::new(),
255        }
256    }
257}
258
259/// Serializer that converts an `Event` to the bytes using the CEF format.
260/// CEF:{version}|{device_vendor}|{device_product}|{device_version>|{device_event_class}|{name}|{severity}|{encoded_fields}
261#[derive(Debug, Clone)]
262pub struct CefSerializer {
263    version: Version,
264    device: DeviceSettings,
265    severity: ConfigTargetPath,
266    name: ConfigTargetPath,
267    extensions: HashMap<String, ConfigTargetPath>,
268}
269
270impl CefSerializer {
271    /// Creates a new `CefSerializer`.
272    pub const fn new(
273        version: Version,
274        device: DeviceSettings,
275        severity: ConfigTargetPath,
276        name: ConfigTargetPath,
277        extensions: HashMap<String, ConfigTargetPath>,
278    ) -> Self {
279        Self {
280            version,
281            device,
282            severity,
283            name,
284            extensions,
285        }
286    }
287}
288
289impl Encoder<Event> for CefSerializer {
290    type Error = vector_common::Error;
291
292    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
293        let log = event.into_log();
294
295        let severity: u8 = match get_log_event_value(&log, &self.severity).parse() {
296            Err(err) => {
297                return SeverityNumberTypeSnafu { error: err }
298                    .fail()
299                    .map_err(|e| e.to_string().into());
300            }
301            Ok(severity) => {
302                if severity > SEVERITY_MAX {
303                    return SeverityMaxValueSnafu {
304                        max_value: SEVERITY_MAX,
305                        actual_value: severity,
306                    }
307                    .fail()
308                    .map_err(|e| e.to_string().into());
309                };
310                severity
311            }
312        };
313
314        let name: String = get_log_event_value(&log, &self.name);
315        let name = validate_length(&name, "name", NAME_MAX_LENGTH)?;
316
317        let mut formatted_extensions = Vec::with_capacity(self.extensions.len());
318        for (extension, field) in &self.extensions {
319            let value = get_log_event_value(&log, field);
320            if value.is_empty() {
321                continue;
322            }
323            let value = escape_extension(&value);
324            formatted_extensions.push(format!("{extension}={value}"));
325        }
326
327        buffer.write_fmt(format_args!(
328            "CEF:{}|{}|{}|{}|{}|{}|{}",
329            &self.version,
330            &self.device.vendor,
331            &self.device.product,
332            &self.device.version,
333            &self.device.event_class_id,
334            name,
335            severity,
336        ))?;
337        if !formatted_extensions.is_empty() {
338            formatted_extensions.sort();
339
340            buffer.write_char('|')?;
341            buffer.write_str(formatted_extensions.join(" ").as_str())?;
342        }
343
344        Ok(())
345    }
346}
347
348fn get_log_event_value(log: &LogEvent, field: &ConfigTargetPath) -> String {
349    match log.get(field) {
350        Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).to_string(),
351        Some(Value::Integer(int)) => int.to_string(),
352        Some(Value::Float(float)) => float.to_string(),
353        Some(Value::Boolean(bool)) => bool.to_string(),
354        Some(Value::Timestamp(timestamp)) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true),
355        Some(Value::Null) => String::from(""),
356        // Other value types: Array, Regex, Object are not supported by the CEF format.
357        Some(_) => String::from(""),
358        None => String::from(""),
359    }
360}
361
362fn escape_header(s: &str) -> String {
363    escape_special_chars(s, '|')
364}
365fn escape_extension(s: &str) -> String {
366    escape_special_chars(s, '=')
367}
368
369fn escape_special_chars(s: &str, extra_char: char) -> String {
370    s.replace('\\', r#"\\"#)
371        .replace(extra_char, &format!(r#"\{extra_char}"#))
372}
373
374fn validate_length(field: &str, field_name: &str, max_length: usize) -> Result<String, BuildError> {
375    let escaped = escape_header(field);
376    if escaped.len() > max_length {
377        ExceededLengthSnafu {
378            field: escaped.clone(),
379            field_name,
380            max_length,
381            actual_length: escaped.len(),
382        }
383        .fail()?;
384    }
385    Ok(escaped)
386}
387
388#[cfg(test)]
389mod tests {
390    use bytes::BytesMut;
391    use chrono::DateTime;
392    use ordered_float::NotNan;
393    use vector_common::btreemap;
394    use vector_core::event::{Event, LogEvent, Value};
395
396    use super::*;
397
398    #[test]
399    fn build_error_on_invalid_extension() {
400        let extensions = HashMap::from([(
401            String::from("foo.test"),
402            ConfigTargetPath::try_from("foo".to_string()).unwrap(),
403        )]);
404        let opts: CefSerializerOptions = CefSerializerOptions {
405            extensions,
406            ..CefSerializerOptions::default()
407        };
408        let config = CefSerializerConfig::new(opts);
409        let err = config.build().unwrap_err();
410        assert_eq!(
411            err.to_string(),
412            "LogEvent extension keys can only contain ascii alphabetical characters: invalid key \"foo.test\""
413        );
414    }
415
416    #[test]
417    fn build_error_max_length() {
418        let extensions = HashMap::from([(
419            String::from("foo-test"),
420            ConfigTargetPath::try_from("foo".to_string()).unwrap(),
421        )]);
422        let opts: CefSerializerOptions = CefSerializerOptions {
423            device_vendor: "Repeat".repeat(11), // more than max length
424            extensions,
425            ..CefSerializerOptions::default()
426        };
427        let config = CefSerializerConfig::new(opts);
428        let err = config.build().unwrap_err();
429        assert_eq!(
430            err.to_string(),
431            "LogEvent field \"device_vendor\" with the value \"RepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeat\" exceed 63 characters limit: actual 66"
432        );
433    }
434
435    #[test]
436    fn try_escape_header() {
437        let s1 = String::from(r#"Test | test"#);
438        let s2 = String::from(r#"Test \ test"#);
439        let s3 = String::from(r#"Test test"#);
440        let s4 = String::from(r#"Test \| \| test"#);
441
442        let s1 = escape_header(&s1);
443        let s2 = escape_header(&s2);
444        let s3: String = escape_header(&s3);
445        let s4: String = escape_header(&s4);
446
447        assert_eq!(s1, r#"Test \| test"#);
448        assert_eq!(s2, r#"Test \\ test"#);
449        assert_eq!(s3, r#"Test test"#);
450        assert_eq!(s4, r#"Test \\\| \\\| test"#);
451    }
452
453    #[test]
454    fn try_escape_extension() {
455        let s1 = String::from(r#"Test=test"#);
456        let s2 = String::from(r#"Test = test"#);
457        let s3 = String::from(r#"Test test"#);
458        let s4 = String::from(r#"Test \| \| test"#);
459
460        let s1 = escape_extension(&s1);
461        let s2 = escape_extension(&s2);
462        let s3: String = escape_extension(&s3);
463        let s4: String = escape_extension(&s4);
464
465        assert_eq!(s1, r#"Test\=test"#);
466        assert_eq!(s2, r#"Test \= test"#);
467        assert_eq!(s3, r#"Test test"#);
468        assert_eq!(s4, r#"Test \\| \\| test"#);
469    }
470
471    #[test]
472    fn serialize_extensions() {
473        let event = Event::Log(LogEvent::from(btreemap! {
474            "cef" => Value::from(btreemap! {
475                "severity" => Value::from(1),
476                "name" => Value::from("Event name"),
477            }),
478            "foo" => Value::from("bar"),
479            "int" => Value::from(123),
480            "comma" => Value::from("abc,bcd"),
481            "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
482            "space" => Value::from("sp ace"),
483            "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
484            "quote" => Value::from("the \"quote\" should be escaped"),
485            "bool" => Value::from(true),
486            "other" => Value::from("data"),
487        }));
488
489        let extensions = HashMap::from([
490            (
491                String::from("foo"),
492                ConfigTargetPath::try_from("foo".to_string()).unwrap(),
493            ),
494            (
495                String::from("int"),
496                ConfigTargetPath::try_from("int".to_string()).unwrap(),
497            ),
498            (
499                String::from("comma"),
500                ConfigTargetPath::try_from("comma".to_string()).unwrap(),
501            ),
502            (
503                String::from("float"),
504                ConfigTargetPath::try_from("float".to_string()).unwrap(),
505            ),
506            (
507                String::from("missing"),
508                ConfigTargetPath::try_from("missing".to_string()).unwrap(),
509            ),
510            (
511                String::from("space"),
512                ConfigTargetPath::try_from("space".to_string()).unwrap(),
513            ),
514            (
515                String::from("time"),
516                ConfigTargetPath::try_from("time".to_string()).unwrap(),
517            ),
518            (
519                String::from("quote"),
520                ConfigTargetPath::try_from("quote".to_string()).unwrap(),
521            ),
522            (
523                String::from("bool"),
524                ConfigTargetPath::try_from("bool".to_string()).unwrap(),
525            ),
526        ]);
527
528        let opts: CefSerializerOptions = CefSerializerOptions {
529            extensions,
530            ..CefSerializerOptions::default()
531        };
532
533        let config = CefSerializerConfig::new(opts);
534        let mut serializer = config.build().unwrap();
535        let mut bytes = BytesMut::new();
536
537        serializer.encode(event, &mut bytes).unwrap();
538        let expected = b"CEF:0|Datadog|Vector|0|Telemetry Event|Event name|1|bool=true comma=abc,bcd float=3.1415925 foo=bar int=123 quote=the \"quote\" should be escaped space=sp ace time=2023-02-27T07:04:49.363Z";
539
540        assert_eq!(bytes.as_ref(), expected);
541    }
542}