codecs/encoding/format/
cef.rs

1use std::{collections::HashMap, fmt::Write, num::ParseIntError};
2
3use bytes::BytesMut;
4use chrono::SecondsFormat;
5use lookup::lookup_v2::ConfigTargetPath;
6use snafu::Snafu;
7use tokio_util::codec::Encoder;
8use vector_config_macros::configurable_component;
9use vector_core::{
10    config::DataType,
11    event::{Event, LogEvent, Value},
12    schema,
13};
14
15use crate::encoding::BuildError;
16
17const DEFAULT_DEVICE_VENDOR: &str = "Datadog";
18const DEFAULT_DEVICE_PRODUCT: &str = "Vector";
19// Major version of Vector.
20// TODO: find a way to get the actual vector version.
21//  The version should be the actual vector version, but it's not possible
22//  to get it from the config.
23const DEFAULT_DEVICE_VERSION: &str = "0";
24const DEFAULT_EVENT_CLASS_ID: &str = "Telemetry Event";
25const DEVICE_VENDOR_MAX_LENGTH: usize = 63;
26const DEVICE_PRODUCT_MAX_LENGTH: usize = 63;
27const DEVICE_VERSION_MAX_LENGTH: usize = 31;
28const DEVICE_EVENT_CLASS_ID_MAX_LENGTH: usize = 1023;
29const NAME_MAX_LENGTH: usize = 512;
30const SEVERITY_MAX: u8 = 10;
31
32/// Represents the device settings in the CEF format.
33#[derive(Debug, Clone)]
34pub struct DeviceSettings {
35    pub vendor: String,
36    pub product: String,
37    pub version: String,
38    pub event_class_id: String,
39}
40
41impl DeviceSettings {
42    /// Creates a new `DeviceSettings`.
43    pub const fn new(
44        vendor: String,
45        product: String,
46        version: String,
47        event_class_id: String,
48    ) -> Self {
49        Self {
50            vendor,
51            product,
52            version,
53            event_class_id,
54        }
55    }
56}
57
58/// Errors that can occur during CEF serialization.
59#[derive(Debug, Snafu)]
60pub enum CefSerializerError {
61    #[snafu(display(
62        r#"LogEvent field "{}" with the value "{}" exceed {} characters limit: actual {}"#,
63        field_name,
64        field,
65        max_length,
66        actual_length
67    ))]
68    ExceededLength {
69        field: String,
70        field_name: String,
71        max_length: usize,
72        actual_length: usize,
73    },
74    #[snafu(display(
75        r#"LogEvent CEF severity must be a number from 0 to {}: actual {}"#,
76        max_value,
77        actual_value
78    ))]
79    SeverityMaxValue { max_value: u8, actual_value: u8 },
80    #[snafu(display(r#"LogEvent CEF severity must be a number: {}"#, error))]
81    SeverityNumberType { error: ParseIntError },
82    #[snafu(display(r#"LogEvent extension keys can only contain ascii alphabetical characters: invalid key "{}""#, key))]
83    ExtensionNonASCIIKey { key: String },
84}
85
86/// Config used to build a `CefSerializer`.
87#[configurable_component]
88#[derive(Debug, Clone)]
89pub struct CefSerializerConfig {
90    /// The CEF Serializer Options.
91    pub cef: CefSerializerOptions,
92}
93
94impl CefSerializerConfig {
95    /// Creates a new `CefSerializerConfig`.
96    pub const fn new(cef: CefSerializerOptions) -> Self {
97        Self { cef }
98    }
99
100    /// Build the `CefSerializer` from this configuration.
101    pub fn build(&self) -> Result<CefSerializer, BuildError> {
102        let device_vendor = validate_length(
103            &self.cef.device_vendor,
104            "device_vendor",
105            DEVICE_VENDOR_MAX_LENGTH,
106        )?;
107        let device_product = validate_length(
108            &self.cef.device_product,
109            "device_product",
110            DEVICE_PRODUCT_MAX_LENGTH,
111        )?;
112        let device_version = validate_length(
113            &self.cef.device_version,
114            "device_version",
115            DEVICE_VERSION_MAX_LENGTH,
116        )?;
117        let device_event_class_id = validate_length(
118            &self.cef.device_event_class_id,
119            "device_event_class_id",
120            DEVICE_EVENT_CLASS_ID_MAX_LENGTH,
121        )?;
122
123        let invalid_keys: Vec<String> = self
124            .cef
125            .extensions
126            .keys()
127            .filter(|key| !key.chars().all(|c| c.is_ascii_alphabetic()))
128            .cloned()
129            .collect();
130
131        if !invalid_keys.is_empty() {
132            return ExtensionNonASCIIKeySnafu {
133                key: invalid_keys.join(", "),
134            }
135            .fail()
136            .map_err(|e| e.to_string().into());
137        }
138
139        let device = DeviceSettings::new(
140            device_vendor,
141            device_product,
142            device_version,
143            device_event_class_id,
144        );
145
146        Ok(CefSerializer::new(
147            self.cef.version.clone(),
148            device,
149            self.cef.severity.clone(),
150            self.cef.name.clone(),
151            self.cef.extensions.clone(),
152        ))
153    }
154
155    /// The data type of events that are accepted by `CefSerializer`.
156    pub fn input_type(&self) -> DataType {
157        DataType::Log
158    }
159
160    /// The schema required by the serializer.
161    pub fn schema_requirement(&self) -> schema::Requirement {
162        // While technically we support `Value` variants that can't be losslessly serialized to
163        // CEF, we don't want to enforce that limitation to users yet.
164        schema::Requirement::empty()
165    }
166}
167
168/// CEF version.
169#[configurable_component]
170#[derive(Debug, Default, Clone)]
171pub enum Version {
172    #[default]
173    /// CEF specification version 0.1.
174    V0,
175    /// CEF specification version 1.x.
176    V1,
177}
178
179impl Version {
180    fn as_str(&self) -> &'static str {
181        match self {
182            Version::V0 => "0",
183            Version::V1 => "1",
184        }
185    }
186}
187
188impl std::fmt::Display for Version {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        write!(f, "{}", self.as_str())
191    }
192}
193
194/// Config used to build a `CefSerializer`.
195#[configurable_component]
196#[derive(Debug, Clone)]
197pub struct CefSerializerOptions {
198    /// CEF Version. Can be either 0 or 1.
199    /// Set to "0" by default.
200    pub version: Version,
201
202    /// Identifies the vendor of the product.
203    /// The part of a unique device identifier. No two products can use the same combination of device vendor and device product.
204    /// The value length must be less than or equal to 63.
205    pub device_vendor: String,
206
207    /// Identifies the product of a vendor.
208    /// The part of a unique device identifier. No two products can use the same combination of device vendor and device product.
209    /// The value length must be less than or equal to 63.
210    pub device_product: String,
211
212    /// Identifies the version of the problem. The combination of the device product, vendor and this value make up the unique id of the device that sends messages.
213    /// The value length must be less than or equal to 31.
214    pub device_version: String,
215
216    /// Unique identifier for each event type. Identifies the type of event reported.
217    /// The value length must be less than or equal to 1023.
218    pub device_event_class_id: String,
219
220    /// This is a path that points to the field of a log event that reflects importance of the event.
221    /// Reflects importance of the event.
222    ///
223    /// It must point to a number from 0 to 10.
224    /// 0 = lowest_importance, 10 = highest_importance.
225    /// Set to "cef.severity" by default.
226    pub severity: ConfigTargetPath,
227
228    /// This is a path that points to the human-readable description of a log event.
229    /// The value length must be less than or equal to 512.
230    /// Equals "cef.name" by default.
231    pub name: ConfigTargetPath,
232
233    /// The collection of key-value pairs. Keys are the keys of the extensions, and values are paths that point to the extension values of a log event.
234    /// The event can have any number of key-value pairs in any order.
235    #[configurable(metadata(
236        docs::additional_props_description = "This is a path that points to the extension value of a log event."
237    ))]
238    pub extensions: HashMap<String, ConfigTargetPath>,
239    // TODO: use Template instead of ConfigTargetPath.
240    //   Templates are in the src/ package, and codes are in the lib/codecs.
241    //   Moving the Template to the lib/ package in order to prevent the circular dependency.
242}
243
244impl Default for CefSerializerOptions {
245    fn default() -> Self {
246        Self {
247            version: Version::default(),
248            device_vendor: String::from(DEFAULT_DEVICE_VENDOR),
249            device_product: String::from(DEFAULT_DEVICE_PRODUCT),
250            device_version: String::from(DEFAULT_DEVICE_VERSION),
251            device_event_class_id: String::from(DEFAULT_EVENT_CLASS_ID),
252            severity: ConfigTargetPath::try_from("cef.severity".to_string())
253                .expect("could not parse path"),
254            name: ConfigTargetPath::try_from("cef.name".to_string()).expect("could not parse path"),
255            extensions: HashMap::new(),
256        }
257    }
258}
259
260/// Serializer that converts an `Event` to the bytes using the CEF format.
261/// CEF:{version}|{device_vendor}|{device_product}|{device_version>|{device_event_class}|{name}|{severity}|{encoded_fields}
262#[derive(Debug, Clone)]
263pub struct CefSerializer {
264    version: Version,
265    device: DeviceSettings,
266    severity: ConfigTargetPath,
267    name: ConfigTargetPath,
268    extensions: HashMap<String, ConfigTargetPath>,
269}
270
271impl CefSerializer {
272    /// Creates a new `CefSerializer`.
273    pub const fn new(
274        version: Version,
275        device: DeviceSettings,
276        severity: ConfigTargetPath,
277        name: ConfigTargetPath,
278        extensions: HashMap<String, ConfigTargetPath>,
279    ) -> Self {
280        Self {
281            version,
282            device,
283            severity,
284            name,
285            extensions,
286        }
287    }
288}
289
290impl Encoder<Event> for CefSerializer {
291    type Error = vector_common::Error;
292
293    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
294        let log = event.into_log();
295
296        let severity: u8 = match get_log_event_value(&log, &self.severity).parse() {
297            Err(err) => {
298                return SeverityNumberTypeSnafu { error: err }
299                    .fail()
300                    .map_err(|e| e.to_string().into());
301            }
302            Ok(severity) => {
303                if severity > SEVERITY_MAX {
304                    return SeverityMaxValueSnafu {
305                        max_value: SEVERITY_MAX,
306                        actual_value: severity,
307                    }
308                    .fail()
309                    .map_err(|e| e.to_string().into());
310                };
311                severity
312            }
313        };
314
315        let name: String = get_log_event_value(&log, &self.name);
316        let name = validate_length(&name, "name", NAME_MAX_LENGTH)?;
317
318        let mut formatted_extensions = Vec::with_capacity(self.extensions.len());
319        for (extension, field) in &self.extensions {
320            let value = get_log_event_value(&log, field);
321            if value.is_empty() {
322                continue;
323            }
324            let value = escape_extension(&value);
325            formatted_extensions.push(format!("{extension}={value}"));
326        }
327
328        buffer.write_fmt(format_args!(
329            "CEF:{}|{}|{}|{}|{}|{}|{}",
330            &self.version,
331            &self.device.vendor,
332            &self.device.product,
333            &self.device.version,
334            &self.device.event_class_id,
335            name,
336            severity,
337        ))?;
338        if !formatted_extensions.is_empty() {
339            formatted_extensions.sort();
340
341            buffer.write_char('|')?;
342            buffer.write_str(formatted_extensions.join(" ").as_str())?;
343        }
344
345        Ok(())
346    }
347}
348
349fn get_log_event_value(log: &LogEvent, field: &ConfigTargetPath) -> String {
350    match log.get(field) {
351        Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).to_string(),
352        Some(Value::Integer(int)) => int.to_string(),
353        Some(Value::Float(float)) => float.to_string(),
354        Some(Value::Boolean(bool)) => bool.to_string(),
355        Some(Value::Timestamp(timestamp)) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true),
356        Some(Value::Null) => String::from(""),
357        // Other value types: Array, Regex, Object are not supported by the CEF format.
358        Some(_) => String::from(""),
359        None => String::from(""),
360    }
361}
362
363fn escape_header(s: &str) -> String {
364    escape_special_chars(s, '|')
365}
366fn escape_extension(s: &str) -> String {
367    escape_special_chars(s, '=')
368}
369
370fn escape_special_chars(s: &str, extra_char: char) -> String {
371    s.replace('\\', r#"\\"#)
372        .replace(extra_char, &format!(r#"\{extra_char}"#))
373}
374
375fn validate_length(field: &str, field_name: &str, max_length: usize) -> Result<String, BuildError> {
376    let escaped = escape_header(field);
377    if escaped.len() > max_length {
378        ExceededLengthSnafu {
379            field: escaped.clone(),
380            field_name,
381            max_length,
382            actual_length: escaped.len(),
383        }
384        .fail()?;
385    }
386    Ok(escaped)
387}
388
389#[cfg(test)]
390mod tests {
391    use bytes::BytesMut;
392    use chrono::DateTime;
393    use ordered_float::NotNan;
394    use vector_common::btreemap;
395    use vector_core::event::{Event, LogEvent, Value};
396
397    use super::*;
398
399    #[test]
400    fn build_error_on_invalid_extension() {
401        let extensions = HashMap::from([(
402            String::from("foo.test"),
403            ConfigTargetPath::try_from("foo".to_string()).unwrap(),
404        )]);
405        let opts: CefSerializerOptions = CefSerializerOptions {
406            extensions,
407            ..CefSerializerOptions::default()
408        };
409        let config = CefSerializerConfig::new(opts);
410        let err = config.build().unwrap_err();
411        assert_eq!(
412            err.to_string(),
413            "LogEvent extension keys can only contain ascii alphabetical characters: invalid key \"foo.test\""
414        );
415    }
416
417    #[test]
418    fn build_error_max_length() {
419        let extensions = HashMap::from([(
420            String::from("foo-test"),
421            ConfigTargetPath::try_from("foo".to_string()).unwrap(),
422        )]);
423        let opts: CefSerializerOptions = CefSerializerOptions {
424            device_vendor: "Repeat".repeat(11), // more than max length
425            extensions,
426            ..CefSerializerOptions::default()
427        };
428        let config = CefSerializerConfig::new(opts);
429        let err = config.build().unwrap_err();
430        assert_eq!(
431            err.to_string(),
432            "LogEvent field \"device_vendor\" with the value \"RepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeat\" exceed 63 characters limit: actual 66"
433        );
434    }
435
436    #[test]
437    fn try_escape_header() {
438        let s1 = String::from(r#"Test | test"#);
439        let s2 = String::from(r#"Test \ test"#);
440        let s3 = String::from(r#"Test test"#);
441        let s4 = String::from(r#"Test \| \| test"#);
442
443        let s1 = escape_header(&s1);
444        let s2 = escape_header(&s2);
445        let s3: String = escape_header(&s3);
446        let s4: String = escape_header(&s4);
447
448        assert_eq!(s1, r#"Test \| test"#);
449        assert_eq!(s2, r#"Test \\ test"#);
450        assert_eq!(s3, r#"Test test"#);
451        assert_eq!(s4, r#"Test \\\| \\\| test"#);
452    }
453
454    #[test]
455    fn try_escape_extension() {
456        let s1 = String::from(r#"Test=test"#);
457        let s2 = String::from(r#"Test = test"#);
458        let s3 = String::from(r#"Test test"#);
459        let s4 = String::from(r#"Test \| \| test"#);
460
461        let s1 = escape_extension(&s1);
462        let s2 = escape_extension(&s2);
463        let s3: String = escape_extension(&s3);
464        let s4: String = escape_extension(&s4);
465
466        assert_eq!(s1, r#"Test\=test"#);
467        assert_eq!(s2, r#"Test \= test"#);
468        assert_eq!(s3, r#"Test test"#);
469        assert_eq!(s4, r#"Test \\| \\| test"#);
470    }
471
472    #[test]
473    fn serialize_extensions() {
474        let event = Event::Log(LogEvent::from(btreemap! {
475            "cef" => Value::from(btreemap! {
476                "severity" => Value::from(1),
477                "name" => Value::from("Event name"),
478            }),
479            "foo" => Value::from("bar"),
480            "int" => Value::from(123),
481            "comma" => Value::from("abc,bcd"),
482            "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
483            "space" => Value::from("sp ace"),
484            "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
485            "quote" => Value::from("the \"quote\" should be escaped"),
486            "bool" => Value::from(true),
487            "other" => Value::from("data"),
488        }));
489
490        let extensions = HashMap::from([
491            (
492                String::from("foo"),
493                ConfigTargetPath::try_from("foo".to_string()).unwrap(),
494            ),
495            (
496                String::from("int"),
497                ConfigTargetPath::try_from("int".to_string()).unwrap(),
498            ),
499            (
500                String::from("comma"),
501                ConfigTargetPath::try_from("comma".to_string()).unwrap(),
502            ),
503            (
504                String::from("float"),
505                ConfigTargetPath::try_from("float".to_string()).unwrap(),
506            ),
507            (
508                String::from("missing"),
509                ConfigTargetPath::try_from("missing".to_string()).unwrap(),
510            ),
511            (
512                String::from("space"),
513                ConfigTargetPath::try_from("space".to_string()).unwrap(),
514            ),
515            (
516                String::from("time"),
517                ConfigTargetPath::try_from("time".to_string()).unwrap(),
518            ),
519            (
520                String::from("quote"),
521                ConfigTargetPath::try_from("quote".to_string()).unwrap(),
522            ),
523            (
524                String::from("bool"),
525                ConfigTargetPath::try_from("bool".to_string()).unwrap(),
526            ),
527        ]);
528
529        let opts: CefSerializerOptions = CefSerializerOptions {
530            extensions,
531            ..CefSerializerOptions::default()
532        };
533
534        let config = CefSerializerConfig::new(opts);
535        let mut serializer = config.build().unwrap();
536        let mut bytes = BytesMut::new();
537
538        serializer.encode(event, &mut bytes).unwrap();
539        let expected = b"CEF:0|Datadog|Vector|0|Telemetry Event|Event name|1|bool=true comma=abc,bcd float=3.1415925 foo=bar int=123 quote=the \"quote\" should be escaped space=sp ace time=2023-02-27T07:04:49.363Z";
540
541        assert_eq!(bytes.as_ref(), expected);
542    }
543}