1use std::{collections::HashMap, fmt::Write, num::ParseIntError};
2
3use bytes::BytesMut;
4use chrono::SecondsFormat;
5use lookup::lookup_v2::ConfigTargetPath;
6use snafu::Snafu;
7use tokio_util::codec::Encoder;
8use vector_config_macros::configurable_component;
9use vector_core::{
10 config::DataType,
11 event::{Event, LogEvent, Value},
12 schema,
13};
14
15use crate::encoding::BuildError;
16
17const DEFAULT_DEVICE_VENDOR: &str = "Datadog";
18const DEFAULT_DEVICE_PRODUCT: &str = "Vector";
19const DEFAULT_DEVICE_VERSION: &str = "0";
24const DEFAULT_EVENT_CLASS_ID: &str = "Telemetry Event";
25const DEVICE_VENDOR_MAX_LENGTH: usize = 63;
26const DEVICE_PRODUCT_MAX_LENGTH: usize = 63;
27const DEVICE_VERSION_MAX_LENGTH: usize = 31;
28const DEVICE_EVENT_CLASS_ID_MAX_LENGTH: usize = 1023;
29const NAME_MAX_LENGTH: usize = 512;
30const SEVERITY_MAX: u8 = 10;
31
32#[derive(Debug, Clone)]
34pub struct DeviceSettings {
35 pub vendor: String,
36 pub product: String,
37 pub version: String,
38 pub event_class_id: String,
39}
40
41impl DeviceSettings {
42 pub const fn new(
44 vendor: String,
45 product: String,
46 version: String,
47 event_class_id: String,
48 ) -> Self {
49 Self {
50 vendor,
51 product,
52 version,
53 event_class_id,
54 }
55 }
56}
57
58#[derive(Debug, Snafu)]
60pub enum CefSerializerError {
61 #[snafu(display(
62 r#"LogEvent field "{}" with the value "{}" exceed {} characters limit: actual {}"#,
63 field_name,
64 field,
65 max_length,
66 actual_length
67 ))]
68 ExceededLength {
69 field: String,
70 field_name: String,
71 max_length: usize,
72 actual_length: usize,
73 },
74 #[snafu(display(
75 r#"LogEvent CEF severity must be a number from 0 to {}: actual {}"#,
76 max_value,
77 actual_value
78 ))]
79 SeverityMaxValue { max_value: u8, actual_value: u8 },
80 #[snafu(display(r#"LogEvent CEF severity must be a number: {}"#, error))]
81 SeverityNumberType { error: ParseIntError },
82 #[snafu(display(r#"LogEvent extension keys can only contain ascii alphabetical characters: invalid key "{}""#, key))]
83 ExtensionNonASCIIKey { key: String },
84}
85
86#[configurable_component]
88#[derive(Debug, Clone)]
89pub struct CefSerializerConfig {
90 pub cef: CefSerializerOptions,
92}
93
94impl CefSerializerConfig {
95 pub const fn new(cef: CefSerializerOptions) -> Self {
97 Self { cef }
98 }
99
100 pub fn build(&self) -> Result<CefSerializer, BuildError> {
102 let device_vendor = validate_length(
103 &self.cef.device_vendor,
104 "device_vendor",
105 DEVICE_VENDOR_MAX_LENGTH,
106 )?;
107 let device_product = validate_length(
108 &self.cef.device_product,
109 "device_product",
110 DEVICE_PRODUCT_MAX_LENGTH,
111 )?;
112 let device_version = validate_length(
113 &self.cef.device_version,
114 "device_version",
115 DEVICE_VERSION_MAX_LENGTH,
116 )?;
117 let device_event_class_id = validate_length(
118 &self.cef.device_event_class_id,
119 "device_event_class_id",
120 DEVICE_EVENT_CLASS_ID_MAX_LENGTH,
121 )?;
122
123 let invalid_keys: Vec<String> = self
124 .cef
125 .extensions
126 .keys()
127 .filter(|key| !key.chars().all(|c| c.is_ascii_alphabetic()))
128 .cloned()
129 .collect();
130
131 if !invalid_keys.is_empty() {
132 return ExtensionNonASCIIKeySnafu {
133 key: invalid_keys.join(", "),
134 }
135 .fail()
136 .map_err(|e| e.to_string().into());
137 }
138
139 let device = DeviceSettings::new(
140 device_vendor,
141 device_product,
142 device_version,
143 device_event_class_id,
144 );
145
146 Ok(CefSerializer::new(
147 self.cef.version.clone(),
148 device,
149 self.cef.severity.clone(),
150 self.cef.name.clone(),
151 self.cef.extensions.clone(),
152 ))
153 }
154
155 pub fn input_type(&self) -> DataType {
157 DataType::Log
158 }
159
160 pub fn schema_requirement(&self) -> schema::Requirement {
162 schema::Requirement::empty()
165 }
166}
167
168#[configurable_component]
170#[derive(Debug, Default, Clone)]
171pub enum Version {
172 #[default]
173 V0,
175 V1,
177}
178
179impl Version {
180 fn as_str(&self) -> &'static str {
181 match self {
182 Version::V0 => "0",
183 Version::V1 => "1",
184 }
185 }
186}
187
188impl std::fmt::Display for Version {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 write!(f, "{}", self.as_str())
191 }
192}
193
194#[configurable_component]
196#[derive(Debug, Clone)]
197pub struct CefSerializerOptions {
198 pub version: Version,
201
202 pub device_vendor: String,
206
207 pub device_product: String,
211
212 pub device_version: String,
215
216 pub device_event_class_id: String,
219
220 pub severity: ConfigTargetPath,
227
228 pub name: ConfigTargetPath,
232
233 #[configurable(metadata(
236 docs::additional_props_description = "This is a path that points to the extension value of a log event."
237 ))]
238 pub extensions: HashMap<String, ConfigTargetPath>,
239 }
243
244impl Default for CefSerializerOptions {
245 fn default() -> Self {
246 Self {
247 version: Version::default(),
248 device_vendor: String::from(DEFAULT_DEVICE_VENDOR),
249 device_product: String::from(DEFAULT_DEVICE_PRODUCT),
250 device_version: String::from(DEFAULT_DEVICE_VERSION),
251 device_event_class_id: String::from(DEFAULT_EVENT_CLASS_ID),
252 severity: ConfigTargetPath::try_from("cef.severity".to_string())
253 .expect("could not parse path"),
254 name: ConfigTargetPath::try_from("cef.name".to_string()).expect("could not parse path"),
255 extensions: HashMap::new(),
256 }
257 }
258}
259
260#[derive(Debug, Clone)]
263pub struct CefSerializer {
264 version: Version,
265 device: DeviceSettings,
266 severity: ConfigTargetPath,
267 name: ConfigTargetPath,
268 extensions: HashMap<String, ConfigTargetPath>,
269}
270
271impl CefSerializer {
272 pub const fn new(
274 version: Version,
275 device: DeviceSettings,
276 severity: ConfigTargetPath,
277 name: ConfigTargetPath,
278 extensions: HashMap<String, ConfigTargetPath>,
279 ) -> Self {
280 Self {
281 version,
282 device,
283 severity,
284 name,
285 extensions,
286 }
287 }
288}
289
290impl Encoder<Event> for CefSerializer {
291 type Error = vector_common::Error;
292
293 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
294 let log = event.into_log();
295
296 let severity: u8 = match get_log_event_value(&log, &self.severity).parse() {
297 Err(err) => {
298 return SeverityNumberTypeSnafu { error: err }
299 .fail()
300 .map_err(|e| e.to_string().into());
301 }
302 Ok(severity) => {
303 if severity > SEVERITY_MAX {
304 return SeverityMaxValueSnafu {
305 max_value: SEVERITY_MAX,
306 actual_value: severity,
307 }
308 .fail()
309 .map_err(|e| e.to_string().into());
310 };
311 severity
312 }
313 };
314
315 let name: String = get_log_event_value(&log, &self.name);
316 let name = validate_length(&name, "name", NAME_MAX_LENGTH)?;
317
318 let mut formatted_extensions = Vec::with_capacity(self.extensions.len());
319 for (extension, field) in &self.extensions {
320 let value = get_log_event_value(&log, field);
321 if value.is_empty() {
322 continue;
323 }
324 let value = escape_extension(&value);
325 formatted_extensions.push(format!("{extension}={value}"));
326 }
327
328 buffer.write_fmt(format_args!(
329 "CEF:{}|{}|{}|{}|{}|{}|{}",
330 &self.version,
331 &self.device.vendor,
332 &self.device.product,
333 &self.device.version,
334 &self.device.event_class_id,
335 name,
336 severity,
337 ))?;
338 if !formatted_extensions.is_empty() {
339 formatted_extensions.sort();
340
341 buffer.write_char('|')?;
342 buffer.write_str(formatted_extensions.join(" ").as_str())?;
343 }
344
345 Ok(())
346 }
347}
348
349fn get_log_event_value(log: &LogEvent, field: &ConfigTargetPath) -> String {
350 match log.get(field) {
351 Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).to_string(),
352 Some(Value::Integer(int)) => int.to_string(),
353 Some(Value::Float(float)) => float.to_string(),
354 Some(Value::Boolean(bool)) => bool.to_string(),
355 Some(Value::Timestamp(timestamp)) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true),
356 Some(Value::Null) => String::from(""),
357 Some(_) => String::from(""),
359 None => String::from(""),
360 }
361}
362
363fn escape_header(s: &str) -> String {
364 escape_special_chars(s, '|')
365}
366fn escape_extension(s: &str) -> String {
367 escape_special_chars(s, '=')
368}
369
370fn escape_special_chars(s: &str, extra_char: char) -> String {
371 s.replace('\\', r#"\\"#)
372 .replace(extra_char, &format!(r#"\{extra_char}"#))
373}
374
375fn validate_length(field: &str, field_name: &str, max_length: usize) -> Result<String, BuildError> {
376 let escaped = escape_header(field);
377 if escaped.len() > max_length {
378 ExceededLengthSnafu {
379 field: escaped.clone(),
380 field_name,
381 max_length,
382 actual_length: escaped.len(),
383 }
384 .fail()?;
385 }
386 Ok(escaped)
387}
388
389#[cfg(test)]
390mod tests {
391 use bytes::BytesMut;
392 use chrono::DateTime;
393 use ordered_float::NotNan;
394 use vector_common::btreemap;
395 use vector_core::event::{Event, LogEvent, Value};
396
397 use super::*;
398
399 #[test]
400 fn build_error_on_invalid_extension() {
401 let extensions = HashMap::from([(
402 String::from("foo.test"),
403 ConfigTargetPath::try_from("foo".to_string()).unwrap(),
404 )]);
405 let opts: CefSerializerOptions = CefSerializerOptions {
406 extensions,
407 ..CefSerializerOptions::default()
408 };
409 let config = CefSerializerConfig::new(opts);
410 let err = config.build().unwrap_err();
411 assert_eq!(
412 err.to_string(),
413 "LogEvent extension keys can only contain ascii alphabetical characters: invalid key \"foo.test\""
414 );
415 }
416
417 #[test]
418 fn build_error_max_length() {
419 let extensions = HashMap::from([(
420 String::from("foo-test"),
421 ConfigTargetPath::try_from("foo".to_string()).unwrap(),
422 )]);
423 let opts: CefSerializerOptions = CefSerializerOptions {
424 device_vendor: "Repeat".repeat(11), extensions,
426 ..CefSerializerOptions::default()
427 };
428 let config = CefSerializerConfig::new(opts);
429 let err = config.build().unwrap_err();
430 assert_eq!(
431 err.to_string(),
432 "LogEvent field \"device_vendor\" with the value \"RepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeat\" exceed 63 characters limit: actual 66"
433 );
434 }
435
436 #[test]
437 fn try_escape_header() {
438 let s1 = String::from(r#"Test | test"#);
439 let s2 = String::from(r#"Test \ test"#);
440 let s3 = String::from(r#"Test test"#);
441 let s4 = String::from(r#"Test \| \| test"#);
442
443 let s1 = escape_header(&s1);
444 let s2 = escape_header(&s2);
445 let s3: String = escape_header(&s3);
446 let s4: String = escape_header(&s4);
447
448 assert_eq!(s1, r#"Test \| test"#);
449 assert_eq!(s2, r#"Test \\ test"#);
450 assert_eq!(s3, r#"Test test"#);
451 assert_eq!(s4, r#"Test \\\| \\\| test"#);
452 }
453
454 #[test]
455 fn try_escape_extension() {
456 let s1 = String::from(r#"Test=test"#);
457 let s2 = String::from(r#"Test = test"#);
458 let s3 = String::from(r#"Test test"#);
459 let s4 = String::from(r#"Test \| \| test"#);
460
461 let s1 = escape_extension(&s1);
462 let s2 = escape_extension(&s2);
463 let s3: String = escape_extension(&s3);
464 let s4: String = escape_extension(&s4);
465
466 assert_eq!(s1, r#"Test\=test"#);
467 assert_eq!(s2, r#"Test \= test"#);
468 assert_eq!(s3, r#"Test test"#);
469 assert_eq!(s4, r#"Test \\| \\| test"#);
470 }
471
472 #[test]
473 fn serialize_extensions() {
474 let event = Event::Log(LogEvent::from(btreemap! {
475 "cef" => Value::from(btreemap! {
476 "severity" => Value::from(1),
477 "name" => Value::from("Event name"),
478 }),
479 "foo" => Value::from("bar"),
480 "int" => Value::from(123),
481 "comma" => Value::from("abc,bcd"),
482 "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
483 "space" => Value::from("sp ace"),
484 "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
485 "quote" => Value::from("the \"quote\" should be escaped"),
486 "bool" => Value::from(true),
487 "other" => Value::from("data"),
488 }));
489
490 let extensions = HashMap::from([
491 (
492 String::from("foo"),
493 ConfigTargetPath::try_from("foo".to_string()).unwrap(),
494 ),
495 (
496 String::from("int"),
497 ConfigTargetPath::try_from("int".to_string()).unwrap(),
498 ),
499 (
500 String::from("comma"),
501 ConfigTargetPath::try_from("comma".to_string()).unwrap(),
502 ),
503 (
504 String::from("float"),
505 ConfigTargetPath::try_from("float".to_string()).unwrap(),
506 ),
507 (
508 String::from("missing"),
509 ConfigTargetPath::try_from("missing".to_string()).unwrap(),
510 ),
511 (
512 String::from("space"),
513 ConfigTargetPath::try_from("space".to_string()).unwrap(),
514 ),
515 (
516 String::from("time"),
517 ConfigTargetPath::try_from("time".to_string()).unwrap(),
518 ),
519 (
520 String::from("quote"),
521 ConfigTargetPath::try_from("quote".to_string()).unwrap(),
522 ),
523 (
524 String::from("bool"),
525 ConfigTargetPath::try_from("bool".to_string()).unwrap(),
526 ),
527 ]);
528
529 let opts: CefSerializerOptions = CefSerializerOptions {
530 extensions,
531 ..CefSerializerOptions::default()
532 };
533
534 let config = CefSerializerConfig::new(opts);
535 let mut serializer = config.build().unwrap();
536 let mut bytes = BytesMut::new();
537
538 serializer.encode(event, &mut bytes).unwrap();
539 let expected = b"CEF:0|Datadog|Vector|0|Telemetry Event|Event name|1|bool=true comma=abc,bcd float=3.1415925 foo=bar int=123 quote=the \"quote\" should be escaped space=sp ace time=2023-02-27T07:04:49.363Z";
540
541 assert_eq!(bytes.as_ref(), expected);
542 }
543}