1use std::{collections::HashMap, fmt::Write, num::ParseIntError};
2
3use bytes::BytesMut;
4use chrono::SecondsFormat;
5use lookup::lookup_v2::ConfigTargetPath;
6use snafu::Snafu;
7use tokio_util::codec::Encoder;
8use vector_config_macros::configurable_component;
9use vector_core::{
10 config::DataType,
11 event::{Event, LogEvent, Value},
12 schema,
13};
14
15use crate::encoding::BuildError;
16
17const DEFAULT_DEVICE_VENDOR: &str = "Datadog";
18const DEFAULT_DEVICE_PRODUCT: &str = "Vector";
19const DEFAULT_DEVICE_VERSION: &str = "0";
24const DEFAULT_EVENT_CLASS_ID: &str = "Telemetry Event";
25const DEVICE_VENDOR_MAX_LENGTH: usize = 63;
26const DEVICE_PRODUCT_MAX_LENGTH: usize = 63;
27const DEVICE_VERSION_MAX_LENGTH: usize = 31;
28const DEVICE_EVENT_CLASS_ID_MAX_LENGTH: usize = 1023;
29const NAME_MAX_LENGTH: usize = 512;
30const SEVERITY_MAX: u8 = 10;
31
32#[derive(Debug, Clone)]
34pub struct DeviceSettings {
35 pub vendor: String,
36 pub product: String,
37 pub version: String,
38 pub event_class_id: String,
39}
40
41impl DeviceSettings {
42 pub const fn new(
44 vendor: String,
45 product: String,
46 version: String,
47 event_class_id: String,
48 ) -> Self {
49 Self {
50 vendor,
51 product,
52 version,
53 event_class_id,
54 }
55 }
56}
57
58#[derive(Debug, Snafu)]
60pub enum CefSerializerError {
61 #[snafu(display(
62 r#"LogEvent field "{}" with the value "{}" exceed {} characters limit: actual {}"#,
63 field_name,
64 field,
65 max_length,
66 actual_length
67 ))]
68 ExceededLength {
69 field: String,
70 field_name: String,
71 max_length: usize,
72 actual_length: usize,
73 },
74 #[snafu(display(
75 r#"LogEvent CEF severity must be a number from 0 to {}: actual {}"#,
76 max_value,
77 actual_value
78 ))]
79 SeverityMaxValue { max_value: u8, actual_value: u8 },
80 #[snafu(display(r#"LogEvent CEF severity must be a number: {}"#, error))]
81 SeverityNumberType { error: ParseIntError },
82 #[snafu(display(r#"LogEvent extension keys can only contain ascii alphabetical characters: invalid key "{}""#, key))]
83 ExtensionNonASCIIKey { key: String },
84}
85
86#[configurable_component]
88#[derive(Debug, Clone)]
89pub struct CefSerializerConfig {
90 pub cef: CefSerializerOptions,
92}
93
94impl CefSerializerConfig {
95 pub const fn new(cef: CefSerializerOptions) -> Self {
97 Self { cef }
98 }
99
100 pub fn build(&self) -> Result<CefSerializer, BuildError> {
102 let device_vendor = validate_length(
103 &self.cef.device_vendor,
104 "device_vendor",
105 DEVICE_VENDOR_MAX_LENGTH,
106 )?;
107 let device_product = validate_length(
108 &self.cef.device_product,
109 "device_product",
110 DEVICE_PRODUCT_MAX_LENGTH,
111 )?;
112 let device_version = validate_length(
113 &self.cef.device_version,
114 "device_version",
115 DEVICE_VERSION_MAX_LENGTH,
116 )?;
117 let device_event_class_id = validate_length(
118 &self.cef.device_event_class_id,
119 "device_event_class_id",
120 DEVICE_EVENT_CLASS_ID_MAX_LENGTH,
121 )?;
122
123 let invalid_keys: Vec<String> = self
124 .cef
125 .extensions
126 .keys()
127 .filter(|key| !key.chars().all(|c| c.is_ascii_alphabetic()))
128 .cloned()
129 .collect();
130
131 if !invalid_keys.is_empty() {
132 return ExtensionNonASCIIKeySnafu {
133 key: invalid_keys.join(", "),
134 }
135 .fail()
136 .map_err(|e| e.to_string().into());
137 }
138
139 let device = DeviceSettings::new(
140 device_vendor,
141 device_product,
142 device_version,
143 device_event_class_id,
144 );
145
146 Ok(CefSerializer::new(
147 self.cef.version.clone(),
148 device,
149 self.cef.severity.clone(),
150 self.cef.name.clone(),
151 self.cef.extensions.clone(),
152 ))
153 }
154
155 pub fn input_type(&self) -> DataType {
157 DataType::Log
158 }
159
160 pub fn schema_requirement(&self) -> schema::Requirement {
162 schema::Requirement::empty()
165 }
166}
167
168#[configurable_component]
170#[derive(Debug, Default, Clone)]
171pub enum Version {
172 #[default]
173 V0,
175 V1,
177}
178
179impl Version {
180 fn as_str(&self) -> &'static str {
181 match self {
182 Version::V0 => "0",
183 Version::V1 => "1",
184 }
185 }
186}
187
188impl std::fmt::Display for Version {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 write!(f, "{}", self.as_str())
191 }
192}
193
194#[configurable_component]
196#[derive(Debug, Clone)]
197pub struct CefSerializerOptions {
198 pub version: Version,
201
202 pub device_vendor: String,
206
207 pub device_product: String,
211
212 pub device_version: String,
215
216 pub device_event_class_id: String,
219
220 pub severity: ConfigTargetPath,
226
227 pub name: ConfigTargetPath,
231
232 #[configurable(metadata(
235 docs::additional_props_description = "This is a path that points to the extension value of a log event."
236 ))]
237 pub extensions: HashMap<String, ConfigTargetPath>,
238 }
242
243impl Default for CefSerializerOptions {
244 fn default() -> Self {
245 Self {
246 version: Version::default(),
247 device_vendor: String::from(DEFAULT_DEVICE_VENDOR),
248 device_product: String::from(DEFAULT_DEVICE_PRODUCT),
249 device_version: String::from(DEFAULT_DEVICE_VERSION),
250 device_event_class_id: String::from(DEFAULT_EVENT_CLASS_ID),
251 severity: ConfigTargetPath::try_from("cef.severity".to_string())
252 .expect("could not parse path"),
253 name: ConfigTargetPath::try_from("cef.name".to_string()).expect("could not parse path"),
254 extensions: HashMap::new(),
255 }
256 }
257}
258
259#[derive(Debug, Clone)]
262pub struct CefSerializer {
263 version: Version,
264 device: DeviceSettings,
265 severity: ConfigTargetPath,
266 name: ConfigTargetPath,
267 extensions: HashMap<String, ConfigTargetPath>,
268}
269
270impl CefSerializer {
271 pub const fn new(
273 version: Version,
274 device: DeviceSettings,
275 severity: ConfigTargetPath,
276 name: ConfigTargetPath,
277 extensions: HashMap<String, ConfigTargetPath>,
278 ) -> Self {
279 Self {
280 version,
281 device,
282 severity,
283 name,
284 extensions,
285 }
286 }
287}
288
289impl Encoder<Event> for CefSerializer {
290 type Error = vector_common::Error;
291
292 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
293 let log = event.into_log();
294
295 let severity: u8 = match get_log_event_value(&log, &self.severity).parse() {
296 Err(err) => {
297 return SeverityNumberTypeSnafu { error: err }
298 .fail()
299 .map_err(|e| e.to_string().into());
300 }
301 Ok(severity) => {
302 if severity > SEVERITY_MAX {
303 return SeverityMaxValueSnafu {
304 max_value: SEVERITY_MAX,
305 actual_value: severity,
306 }
307 .fail()
308 .map_err(|e| e.to_string().into());
309 };
310 severity
311 }
312 };
313
314 let name: String = get_log_event_value(&log, &self.name);
315 let name = validate_length(&name, "name", NAME_MAX_LENGTH)?;
316
317 let mut formatted_extensions = Vec::with_capacity(self.extensions.len());
318 for (extension, field) in &self.extensions {
319 let value = get_log_event_value(&log, field);
320 if value.is_empty() {
321 continue;
322 }
323 let value = escape_extension(&value);
324 formatted_extensions.push(format!("{extension}={value}"));
325 }
326
327 buffer.write_fmt(format_args!(
328 "CEF:{}|{}|{}|{}|{}|{}|{}",
329 &self.version,
330 &self.device.vendor,
331 &self.device.product,
332 &self.device.version,
333 &self.device.event_class_id,
334 name,
335 severity,
336 ))?;
337 if !formatted_extensions.is_empty() {
338 formatted_extensions.sort();
339
340 buffer.write_char('|')?;
341 buffer.write_str(formatted_extensions.join(" ").as_str())?;
342 }
343
344 Ok(())
345 }
346}
347
348fn get_log_event_value(log: &LogEvent, field: &ConfigTargetPath) -> String {
349 match log.get(field) {
350 Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).to_string(),
351 Some(Value::Integer(int)) => int.to_string(),
352 Some(Value::Float(float)) => float.to_string(),
353 Some(Value::Boolean(bool)) => bool.to_string(),
354 Some(Value::Timestamp(timestamp)) => timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true),
355 Some(Value::Null) => String::from(""),
356 Some(_) => String::from(""),
358 None => String::from(""),
359 }
360}
361
362fn escape_header(s: &str) -> String {
363 escape_special_chars(s, '|')
364}
365fn escape_extension(s: &str) -> String {
366 escape_special_chars(s, '=')
367}
368
369fn escape_special_chars(s: &str, extra_char: char) -> String {
370 s.replace('\\', r#"\\"#)
371 .replace(extra_char, &format!(r#"\{extra_char}"#))
372}
373
374fn validate_length(field: &str, field_name: &str, max_length: usize) -> Result<String, BuildError> {
375 let escaped = escape_header(field);
376 if escaped.len() > max_length {
377 ExceededLengthSnafu {
378 field: escaped.clone(),
379 field_name,
380 max_length,
381 actual_length: escaped.len(),
382 }
383 .fail()?;
384 }
385 Ok(escaped)
386}
387
388#[cfg(test)]
389mod tests {
390 use bytes::BytesMut;
391 use chrono::DateTime;
392 use ordered_float::NotNan;
393 use vector_common::btreemap;
394 use vector_core::event::{Event, LogEvent, Value};
395
396 use super::*;
397
398 #[test]
399 fn build_error_on_invalid_extension() {
400 let extensions = HashMap::from([(
401 String::from("foo.test"),
402 ConfigTargetPath::try_from("foo".to_string()).unwrap(),
403 )]);
404 let opts: CefSerializerOptions = CefSerializerOptions {
405 extensions,
406 ..CefSerializerOptions::default()
407 };
408 let config = CefSerializerConfig::new(opts);
409 let err = config.build().unwrap_err();
410 assert_eq!(
411 err.to_string(),
412 "LogEvent extension keys can only contain ascii alphabetical characters: invalid key \"foo.test\""
413 );
414 }
415
416 #[test]
417 fn build_error_max_length() {
418 let extensions = HashMap::from([(
419 String::from("foo-test"),
420 ConfigTargetPath::try_from("foo".to_string()).unwrap(),
421 )]);
422 let opts: CefSerializerOptions = CefSerializerOptions {
423 device_vendor: "Repeat".repeat(11), extensions,
425 ..CefSerializerOptions::default()
426 };
427 let config = CefSerializerConfig::new(opts);
428 let err = config.build().unwrap_err();
429 assert_eq!(
430 err.to_string(),
431 "LogEvent field \"device_vendor\" with the value \"RepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeatRepeat\" exceed 63 characters limit: actual 66"
432 );
433 }
434
435 #[test]
436 fn try_escape_header() {
437 let s1 = String::from(r#"Test | test"#);
438 let s2 = String::from(r#"Test \ test"#);
439 let s3 = String::from(r#"Test test"#);
440 let s4 = String::from(r#"Test \| \| test"#);
441
442 let s1 = escape_header(&s1);
443 let s2 = escape_header(&s2);
444 let s3: String = escape_header(&s3);
445 let s4: String = escape_header(&s4);
446
447 assert_eq!(s1, r#"Test \| test"#);
448 assert_eq!(s2, r#"Test \\ test"#);
449 assert_eq!(s3, r#"Test test"#);
450 assert_eq!(s4, r#"Test \\\| \\\| test"#);
451 }
452
453 #[test]
454 fn try_escape_extension() {
455 let s1 = String::from(r#"Test=test"#);
456 let s2 = String::from(r#"Test = test"#);
457 let s3 = String::from(r#"Test test"#);
458 let s4 = String::from(r#"Test \| \| test"#);
459
460 let s1 = escape_extension(&s1);
461 let s2 = escape_extension(&s2);
462 let s3: String = escape_extension(&s3);
463 let s4: String = escape_extension(&s4);
464
465 assert_eq!(s1, r#"Test\=test"#);
466 assert_eq!(s2, r#"Test \= test"#);
467 assert_eq!(s3, r#"Test test"#);
468 assert_eq!(s4, r#"Test \\| \\| test"#);
469 }
470
471 #[test]
472 fn serialize_extensions() {
473 let event = Event::Log(LogEvent::from(btreemap! {
474 "cef" => Value::from(btreemap! {
475 "severity" => Value::from(1),
476 "name" => Value::from("Event name"),
477 }),
478 "foo" => Value::from("bar"),
479 "int" => Value::from(123),
480 "comma" => Value::from("abc,bcd"),
481 "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
482 "space" => Value::from("sp ace"),
483 "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
484 "quote" => Value::from("the \"quote\" should be escaped"),
485 "bool" => Value::from(true),
486 "other" => Value::from("data"),
487 }));
488
489 let extensions = HashMap::from([
490 (
491 String::from("foo"),
492 ConfigTargetPath::try_from("foo".to_string()).unwrap(),
493 ),
494 (
495 String::from("int"),
496 ConfigTargetPath::try_from("int".to_string()).unwrap(),
497 ),
498 (
499 String::from("comma"),
500 ConfigTargetPath::try_from("comma".to_string()).unwrap(),
501 ),
502 (
503 String::from("float"),
504 ConfigTargetPath::try_from("float".to_string()).unwrap(),
505 ),
506 (
507 String::from("missing"),
508 ConfigTargetPath::try_from("missing".to_string()).unwrap(),
509 ),
510 (
511 String::from("space"),
512 ConfigTargetPath::try_from("space".to_string()).unwrap(),
513 ),
514 (
515 String::from("time"),
516 ConfigTargetPath::try_from("time".to_string()).unwrap(),
517 ),
518 (
519 String::from("quote"),
520 ConfigTargetPath::try_from("quote".to_string()).unwrap(),
521 ),
522 (
523 String::from("bool"),
524 ConfigTargetPath::try_from("bool".to_string()).unwrap(),
525 ),
526 ]);
527
528 let opts: CefSerializerOptions = CefSerializerOptions {
529 extensions,
530 ..CefSerializerOptions::default()
531 };
532
533 let config = CefSerializerConfig::new(opts);
534 let mut serializer = config.build().unwrap();
535 let mut bytes = BytesMut::new();
536
537 serializer.encode(event, &mut bytes).unwrap();
538 let expected = b"CEF:0|Datadog|Vector|0|Telemetry Event|Event name|1|bool=true comma=abc,bcd float=3.1415925 foo=bar int=123 quote=the \"quote\" should be escaped space=sp ace time=2023-02-27T07:04:49.363Z";
539
540 assert_eq!(bytes.as_ref(), expected);
541 }
542}