1use crate::encoding::BuildError;
2use bytes::BytesMut;
3use chrono::SecondsFormat;
4use lookup::lookup_v2::ConfigTargetPath;
5use snafu::Snafu;
6use std::num::ParseIntError;
7use std::{collections::HashMap, fmt::Write};
8use tokio_util::codec::Encoder;
9use vector_config_macros::configurable_component;
10use vector_core::{
11 config::DataType,
12 event::{Event, LogEvent, Value},
13 schema,
14};
15
16const DEFAULT_DEVICE_VENDOR: &str = "Datadog";
17const DEFAULT_DEVICE_PRODUCT: &str = "Vector";
18const DEFAULT_DEVICE_VERSION: &str = "0";
23const DEFAULT_EVENT_CLASS_ID: &str = "Telemetry Event";
24const DEVICE_VENDOR_MAX_LENGTH: usize = 63;
25const DEVICE_PRODUCT_MAX_LENGTH: usize = 63;
26const DEVICE_VERSION_MAX_LENGTH: usize = 31;
27const DEVICE_EVENT_CLASS_ID_MAX_LENGTH: usize = 1023;
28const NAME_MAX_LENGTH: usize = 512;
29const SEVERITY_MAX: u8 = 10;
30
31#[derive(Debug, Clone)]
33pub struct DeviceSettings {
34 pub vendor: String,
35 pub product: String,
36 pub version: String,
37 pub event_class_id: String,
38}
39
40impl DeviceSettings {
41 pub const fn new(
43 vendor: String,
44 product: String,
45 version: String,
46 event_class_id: String,
47 ) -> Self {
48 Self {
49 vendor,
50 product,
51 version,
52 event_class_id,
53 }
54 }
55}
56
57#[derive(Debug, Snafu)]
59pub enum CefSerializerError {
60 #[snafu(display(
61 r#"LogEvent field "{}" with the value "{}" exceed {} characters limit: actual {}"#,
62 field_name,
63 field,
64 max_length,
65 actual_length
66 ))]
67 ExceededLength {
68 field: String,
69 field_name: String,
70 max_length: usize,
71 actual_length: usize,
72 },
73 #[snafu(display(
74 r#"LogEvent CEF severity must be a number from 0 to {}: actual {}"#,
75 max_value,
76 actual_value
77 ))]
78 SeverityMaxValue { max_value: u8, actual_value: u8 },
79 #[snafu(display(r#"LogEvent CEF severity must be a number: {}"#, error))]
80 SeverityNumberType { error: ParseIntError },
81 #[snafu(display(r#"LogEvent extension keys can only contain ascii alphabetical characters: invalid key "{}""#, key))]
82 ExtensionNonASCIIKey { key: String },
83}
84
85#[configurable_component]
87#[derive(Debug, Clone)]
88pub struct CefSerializerConfig {
89 pub cef: CefSerializerOptions,
91}
92
93impl CefSerializerConfig {
94 pub const fn new(cef: CefSerializerOptions) -> Self {
96 Self { cef }
97 }
98
99 pub fn build(&self) -> Result<CefSerializer, BuildError> {
101 let device_vendor = validate_length(
102 &self.cef.device_vendor,
103 "device_vendor",
104 DEVICE_VENDOR_MAX_LENGTH,
105 )?;
106 let device_product = validate_length(
107 &self.cef.device_product,
108 "device_product",
109 DEVICE_PRODUCT_MAX_LENGTH,
110 )?;
111 let device_version = validate_length(
112 &self.cef.device_version,
113 "device_version",
114 DEVICE_VERSION_MAX_LENGTH,
115 )?;
116 let device_event_class_id = validate_length(
117 &self.cef.device_event_class_id,
118 "device_event_class_id",
119 DEVICE_EVENT_CLASS_ID_MAX_LENGTH,
120 )?;
121
122 let invalid_keys: Vec<String> = self
123 .cef
124 .extensions
125 .keys()
126 .filter(|key| !key.chars().all(|c| c.is_ascii_alphabetic()))
127 .cloned()
128 .collect();
129
130 if !invalid_keys.is_empty() {
131 return ExtensionNonASCIIKeySnafu {
132 key: invalid_keys.join(", "),
133 }
134 .fail()
135 .map_err(|e| e.to_string().into());
136 }
137
138 let device = DeviceSettings::new(
139 device_vendor,
140 device_product,
141 device_version,
142 device_event_class_id,
143 );
144
145 Ok(CefSerializer::new(
146 self.cef.version.clone(),
147 device,
148 self.cef.severity.clone(),
149 self.cef.name.clone(),
150 self.cef.extensions.clone(),
151 ))
152 }
153
154 pub fn input_type(&self) -> DataType {
156 DataType::Log
157 }
158
159 pub fn schema_requirement(&self) -> schema::Requirement {
161 schema::Requirement::empty()
164 }
165}
166
167#[configurable_component]
169#[derive(Debug, Default, Clone)]
170pub enum Version {
171 #[default]
172 V0,
174 V1,
176}
177
178impl Version {
179 fn as_str(&self) -> &'static str {
180 match self {
181 Version::V0 => "0",
182 Version::V1 => "1",
183 }
184 }
185}
186
187impl std::fmt::Display for Version {
188 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 write!(f, "{}", self.as_str())
190 }
191}
192
193#[configurable_component]
195#[derive(Debug, Clone)]
196pub struct CefSerializerOptions {
197 pub version: Version,
200
201 pub device_vendor: String,
205
206 pub device_product: String,
210
211 pub device_version: String,
214
215 pub device_event_class_id: String,
218
219 pub severity: ConfigTargetPath,
226
227 pub name: ConfigTargetPath,
231
232 #[configurable(metadata(
235 docs::additional_props_description = "This is a path that points to the extension value of a log event."
236 ))]
237 pub extensions: HashMap<String, ConfigTargetPath>,
238 }
242
243impl Default for CefSerializerOptions {
244 fn default() -> Self {
245 Self {
246 version: Version::default(),
247 device_vendor: String::from(DEFAULT_DEVICE_VENDOR),
248 device_product: String::from(DEFAULT_DEVICE_PRODUCT),
249 device_version: String::from(DEFAULT_DEVICE_VERSION),
250 device_event_class_id: String::from(DEFAULT_EVENT_CLASS_ID),
251 severity: ConfigTargetPath::try_from("cef.severity".to_string())
252 .expect("could not parse path"),
253 name: ConfigTargetPath::try_from("cef.name".to_string()).expect("could not parse path"),
254 extensions: HashMap::new(),
255 }
256 }
257}
258
259#[derive(Debug, Clone)]
262pub struct CefSerializer {
263 version: Version,
264 device: DeviceSettings,
265 severity: ConfigTargetPath,
266 name: ConfigTargetPath,
267 extensions: HashMap<String, ConfigTargetPath>,
268}
269
270impl CefSerializer {
271 pub const fn new(
273 version: Version,
274 device: DeviceSettings,
275 severity: ConfigTargetPath,
276 name: ConfigTargetPath,
277 extensions: HashMap<String, ConfigTargetPath>,
278 ) -> Self {
279 Self {
280 version,
281 device,
282 severity,
283 name,
284 extensions,
285 }
286 }
287}
288
289impl Encoder<Event> for CefSerializer {
290 type Error = vector_common::Error;
291
292 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
293 let log = event.into_log();
294
295 let severity: u8 = match get_log_event_value(&log, &self.severity).parse() {
296 Err(err) => {
297 return SeverityNumberTypeSnafu { error: err }
298 .fail()
299 .map_err(|e| e.to_string().into());
300 }
301 Ok(severity) => {
302 if severity > SEVERITY_MAX {
303 return SeverityMaxValueSnafu {
304 max_value: SEVERITY_MAX,
305 actual_value: severity,
306 }
307 .fail()
308 .map_err(|e| e.to_string().into());
309 };
310 severity
311 }
312 };
313
314 let name: String = get_log_event_value(&log, &self.name);
315 let name = validate_length(&name, "name", NAME_MAX_LENGTH)?;
316
317 let mut formatted_extensions = Vec::with_capacity(self.extensions.len());
318 for (extension, field) in &self.extensions {
319 let value = get_log_event_value(&log, field);
320 if value.is_empty() {
321 continue;
322 }
323 let value = escape_extension(&value);
324 formatted_extensions.push(format!("{extension}={value}"));
325 }
326
327 buffer.write_fmt(format_args!(
328 "CEF:{}|{}|{}|{}|{}|{}|{}",
329 &self.version,
330 &self.device.vendor,
331 &self.device.product,
332 &self.device.version,
333 &self.device.event_class_id,
334 name,
335 severity,
336 ))?;
337 if !formatted_extensions.is_empty() {
338 formatted_extensions.sort();
339
340 buffer.write_char('|')?;
341 buffer.write_str(formatted_extensions.join(" ").as_str())?;
342 }
343
344 Ok(())
345 }
346}
347
348fn get_log_event_value(log: &LogEvent, field: &ConfigTargetPath) -> String {
349 match log.get(field) {
350 Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).to_string(),
351 Some(Value::Integer(int)) => int.to_string(),
352 Some(Value::Float(float)) => float.to_string(),
353 Some(Value::Boolean(bool)) => bool.to_string(),
354 Some(Value::Timestamp(timestamp)) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true),
355 Some(Value::Null) => String::from(""),
356 Some(_) => String::from(""),
358 None => String::from(""),
359 }
360}
361
362fn escape_header(s: &str) -> String {
363 escape_special_chars(s, '|')
364}
365fn escape_extension(s: &str) -> String {
366 escape_special_chars(s, '=')
367}
368
369fn escape_special_chars(s: &str, extra_char: char) -> String {
370 s.replace('\\', r#"\\"#)
371 .replace(extra_char, &format!(r#"\{extra_char}"#))
372}
373
374fn validate_length(field: &str, field_name: &str, max_length: usize) -> Result<String, BuildError> {
375 let escaped = escape_header(field);
376 if escaped.len() > max_length {
377 ExceededLengthSnafu {
378 field: escaped.clone(),
379 field_name,
380 max_length,
381 actual_length: escaped.len(),
382 }
383 .fail()?;
384 }
385 Ok(escaped)
386}
387
388#[cfg(test)]
389mod tests {
390 use bytes::BytesMut;
391 use chrono::DateTime;
392 use ordered_float::NotNan;
393 use vector_common::btreemap;
394 use vector_core::event::{Event, LogEvent, Value};
395
396 use super::*;
397
398 #[test]
399 fn build_error_on_invalid_extension() {
400 let extensions = HashMap::from([(
401 String::from("foo.test"),
402 ConfigTargetPath::try_from("foo".to_string()).unwrap(),
403 )]);
404 let opts: CefSerializerOptions = CefSerializerOptions {
405 extensions,
406 ..CefSerializerOptions::default()
407 };
408 let config = CefSerializerConfig::new(opts);
409 let err = config.build().unwrap_err();
410 assert_eq!(
411 err.to_string(),
412 "LogEvent extension keys can only contain ascii alphabetical characters: invalid key \"foo.test\""
413 );
414 }
415
416 #[test]
417 fn build_error_max_length() {
418 let extensions = HashMap::from([(
419 String::from("foo-test"),
420 ConfigTargetPath::try_from("foo".to_string()).unwrap(),
421 )]);
422 let opts: CefSerializerOptions = CefSerializerOptions {
423 device_vendor: "Repeat".repeat(11), extensions,
425 ..CefSerializerOptions::default()
426 };
427 let config = CefSerializerConfig::new(opts);
428 let err = config.build().unwrap_err();
429 assert_eq!(
430 err.to_string(),
431 "LogEvent field \"device_vendor\" with the value \"RepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeat\" exceed 63 characters limit: actual 66"
432 );
433 }
434
435 #[test]
436 fn try_escape_header() {
437 let s1 = String::from(r#"Test | test"#);
438 let s2 = String::from(r#"Test \ test"#);
439 let s3 = String::from(r#"Test test"#);
440 let s4 = String::from(r#"Test \| \| test"#);
441
442 let s1 = escape_header(&s1);
443 let s2 = escape_header(&s2);
444 let s3: String = escape_header(&s3);
445 let s4: String = escape_header(&s4);
446
447 assert_eq!(s1, r#"Test \| test"#);
448 assert_eq!(s2, r#"Test \\ test"#);
449 assert_eq!(s3, r#"Test test"#);
450 assert_eq!(s4, r#"Test \\\| \\\| test"#);
451 }
452
453 #[test]
454 fn try_escape_extension() {
455 let s1 = String::from(r#"Test=test"#);
456 let s2 = String::from(r#"Test = test"#);
457 let s3 = String::from(r#"Test test"#);
458 let s4 = String::from(r#"Test \| \| test"#);
459
460 let s1 = escape_extension(&s1);
461 let s2 = escape_extension(&s2);
462 let s3: String = escape_extension(&s3);
463 let s4: String = escape_extension(&s4);
464
465 assert_eq!(s1, r#"Test\=test"#);
466 assert_eq!(s2, r#"Test \= test"#);
467 assert_eq!(s3, r#"Test test"#);
468 assert_eq!(s4, r#"Test \\| \\| test"#);
469 }
470
471 #[test]
472 fn serialize_extensions() {
473 let event = Event::Log(LogEvent::from(btreemap! {
474 "cef" => Value::from(btreemap! {
475 "severity" => Value::from(1),
476 "name" => Value::from("Event name"),
477 }),
478 "foo" => Value::from("bar"),
479 "int" => Value::from(123),
480 "comma" => Value::from("abc,bcd"),
481 "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
482 "space" => Value::from("sp ace"),
483 "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
484 "quote" => Value::from("the \"quote\" should be escaped"),
485 "bool" => Value::from(true),
486 "other" => Value::from("data"),
487 }));
488
489 let extensions = HashMap::from([
490 (
491 String::from("foo"),
492 ConfigTargetPath::try_from("foo".to_string()).unwrap(),
493 ),
494 (
495 String::from("int"),
496 ConfigTargetPath::try_from("int".to_string()).unwrap(),
497 ),
498 (
499 String::from("comma"),
500 ConfigTargetPath::try_from("comma".to_string()).unwrap(),
501 ),
502 (
503 String::from("float"),
504 ConfigTargetPath::try_from("float".to_string()).unwrap(),
505 ),
506 (
507 String::from("missing"),
508 ConfigTargetPath::try_from("missing".to_string()).unwrap(),
509 ),
510 (
511 String::from("space"),
512 ConfigTargetPath::try_from("space".to_string()).unwrap(),
513 ),
514 (
515 String::from("time"),
516 ConfigTargetPath::try_from("time".to_string()).unwrap(),
517 ),
518 (
519 String::from("quote"),
520 ConfigTargetPath::try_from("quote".to_string()).unwrap(),
521 ),
522 (
523 String::from("bool"),
524 ConfigTargetPath::try_from("bool".to_string()).unwrap(),
525 ),
526 ]);
527
528 let opts: CefSerializerOptions = CefSerializerOptions {
529 extensions,
530 ..CefSerializerOptions::default()
531 };
532
533 let config = CefSerializerConfig::new(opts);
534 let mut serializer = config.build().unwrap();
535 let mut bytes = BytesMut::new();
536
537 serializer.encode(event, &mut bytes).unwrap();
538 let expected = b"CEF:0|Datadog|Vector|0|Telemetry Event|Event name|1|bool=true comma=abc,bcd float=3.1415925 foo=bar int=123 quote=the \"quote\" should be escaped space=sp ace time=2023-02-27T07:04:49.363Z";
539
540 assert_eq!(bytes.as_ref(), expected);
541 }
542}