1use std::collections::HashMap;
2
3use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use derivative::Derivative;
6use lookup::{event_path, owned_value_path};
7use serde::{Deserialize, Serialize};
8use serde_with::{TimestampSecondsWithFrac, serde_as};
9use smallvec::{SmallVec, smallvec};
10use vector_config::configurable_component;
11use vector_core::{
12 config::{DataType, LogNamespace, log_schema},
13 event::{Event, LogEvent},
14 schema,
15};
16use vrl::value::{Kind, Value, kind::Collection};
17
18use super::{Deserializer, default_lossy};
19use crate::{VALID_FIELD_REGEX, gelf::GELF_TARGET_PATHS, gelf_fields::*};
20
21#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct GelfDeserializerConfig {
34 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
36 pub gelf: GelfDeserializerOptions,
37}
38
39#[configurable_component]
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42#[serde(rename_all = "snake_case")]
43pub enum ValidationMode {
44 #[default]
46 Strict,
47
48 Relaxed,
53}
54
55impl GelfDeserializerConfig {
56 pub fn new(options: GelfDeserializerOptions) -> Self {
58 Self { gelf: options }
59 }
60
61 pub fn build(&self) -> GelfDeserializer {
63 GelfDeserializer {
64 lossy: self.gelf.lossy,
65 validation: self.gelf.validation,
66 }
67 }
68
69 pub fn output_type(&self) -> DataType {
71 DataType::Log
72 }
73
74 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
76 schema::Definition::new_with_default_metadata(
77 Kind::object(Collection::empty()),
78 [log_namespace],
79 )
80 .with_event_field(&owned_value_path!(VERSION), Kind::bytes(), None)
81 .with_event_field(&owned_value_path!(HOST), Kind::bytes(), None)
82 .with_event_field(&owned_value_path!(SHORT_MESSAGE), Kind::bytes(), None)
83 .optional_field(&owned_value_path!(FULL_MESSAGE), Kind::bytes(), None)
84 .optional_field(&owned_value_path!(TIMESTAMP), Kind::timestamp(), None)
85 .optional_field(&owned_value_path!(LEVEL), Kind::integer(), None)
86 .optional_field(&owned_value_path!(FACILITY), Kind::bytes(), None)
87 .optional_field(&owned_value_path!(LINE), Kind::integer(), None)
88 .optional_field(&owned_value_path!(FILE), Kind::bytes(), None)
89 .unknown_fields(Kind::bytes().or_integer().or_float())
93 }
94}
95
96#[configurable_component]
98#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
99#[derivative(Default)]
100pub struct GelfDeserializerOptions {
101 #[serde(
107 default = "default_lossy",
108 skip_serializing_if = "vector_core::serde::is_default"
109 )]
110 #[derivative(Default(value = "default_lossy()"))]
111 pub lossy: bool,
112
113 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
115 pub validation: ValidationMode,
116}
117
118#[derive(Debug, Clone, Derivative)]
120#[derivative(Default)]
121pub struct GelfDeserializer {
122 #[derivative(Default(value = "default_lossy()"))]
123 lossy: bool,
124
125 validation: ValidationMode,
126}
127
128impl GelfDeserializer {
129 pub fn new(lossy: bool, validation: ValidationMode) -> GelfDeserializer {
131 GelfDeserializer { lossy, validation }
132 }
133
134 fn message_to_event(&self, parsed: &GelfMessage) -> vector_common::Result<Event> {
137 let mut log = LogEvent::from_str_legacy(parsed.short_message.to_string());
138
139 if self.validation == ValidationMode::Strict && parsed.version != GELF_VERSION {
141 return Err(
142 format!("{VERSION} does not match GELF spec version ({GELF_VERSION})").into(),
143 );
144 }
145
146 log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
147 log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());
148
149 if let Some(full_message) = &parsed.full_message {
150 log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
151 }
152
153 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
154 if let Some(timestamp) = parsed.timestamp {
155 log.insert(timestamp_key, timestamp);
156 } else {
158 log.insert(timestamp_key, Utc::now());
159 }
160 }
161
162 if let Some(level) = parsed.level {
163 log.insert(&GELF_TARGET_PATHS.level, level);
164 }
165 if let Some(facility) = &parsed.facility {
166 log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
167 }
168 if let Some(line) = parsed.line {
169 log.insert(
170 &GELF_TARGET_PATHS.line,
171 Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
172 );
173 }
174 if let Some(file) = &parsed.file {
175 log.insert(&GELF_TARGET_PATHS.file, file.to_string());
176 }
177
178 if let Some(add) = &parsed.additional_fields {
179 for (key, val) in add.iter() {
180 if key == "_id" {
182 continue;
183 }
184 if self.validation == ValidationMode::Strict && !key.starts_with('_') {
186 return Err(format!(
187 "'{key}' field is invalid. \
188 Additional field names must be prefixed with an underscore."
189 )
190 .into());
191 }
192 if self.validation == ValidationMode::Strict && !VALID_FIELD_REGEX.is_match(key) {
194 return Err(format!(
195 "'{key}' field contains invalid characters. Field names may \
196 contain only letters, numbers, underscores, dashes and dots."
197 )
198 .into());
199 }
200
201 if self.validation != ValidationMode::Strict || val.is_string() || val.is_number() {
203 let vector_val: Value = val.into();
204 log.insert(event_path!(key.as_str()), vector_val);
205 } else {
206 let type_ = match val {
207 serde_json::Value::Null => "null",
208 serde_json::Value::Bool(_) => "boolean",
209 serde_json::Value::Number(_) => "number",
210 serde_json::Value::String(_) => "string",
211 serde_json::Value::Array(_) => "array",
212 serde_json::Value::Object(_) => "object",
213 };
214 return Err(format!("The value type for field {key} is an invalid type ({type_}). Additional field values \
215 should be either strings or numbers.").into());
216 }
217 }
218 }
219 Ok(Event::Log(log))
220 }
221}
222
223#[serde_as]
224#[derive(Serialize, Deserialize, Debug)]
225struct GelfMessage {
226 version: String,
227 host: String,
228 short_message: String,
229 full_message: Option<String>,
230 #[serde_as(as = "Option<TimestampSecondsWithFrac<f64>>")]
231 timestamp: Option<DateTime<Utc>>,
232 level: Option<u8>,
233 facility: Option<String>,
234 line: Option<f64>,
235 file: Option<String>,
236 #[serde(flatten)]
237 additional_fields: Option<HashMap<String, serde_json::Value>>,
238}
239
240impl Deserializer for GelfDeserializer {
241 fn parse(
242 &self,
243 bytes: Bytes,
244 _log_namespace: LogNamespace,
245 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
246 let parsed: GelfMessage = match self.lossy {
247 true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
248 false => serde_json::from_slice(&bytes),
249 }?;
250 let event = self.message_to_event(&parsed)?;
251
252 Ok(smallvec![event])
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use bytes::Bytes;
259 use lookup::event_path;
260 use serde_json::json;
261 use similar_asserts::assert_eq;
262 use smallvec::SmallVec;
263 use vector_core::{config::log_schema, event::Event};
264 use vrl::value::Value;
265
266 use super::*;
267
268 fn deserialize_gelf_input(
269 input: &serde_json::Value,
270 options: GelfDeserializerOptions,
271 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
272 let config = GelfDeserializerConfig::new(options);
273 let deserializer = config.build();
274 let buffer = Bytes::from(serde_json::to_vec(&input).unwrap());
275 deserializer.parse(buffer, LogNamespace::Legacy)
276 }
277
278 #[test]
280 fn gelf_deserialize_correctness() {
281 let add_on_int_in = "_an.add-field_int";
282 let add_on_str_in = "_an.add-field_str";
283
284 let input = json!({
285 VERSION: "1.1",
286 HOST: "example.org",
287 SHORT_MESSAGE: "A short message that helps you identify what is going on",
288 FULL_MESSAGE: "Backtrace here\n\nmore stuff",
289 TIMESTAMP: 1385053862.3072,
290 LEVEL: 1,
291 FACILITY: "foo",
292 LINE: 42,
293 FILE: "/tmp/bar",
294 add_on_int_in: 2001.1002,
295 add_on_str_in: "A Space Odyssey",
296 });
297
298 let events = deserialize_gelf_input(&input, GelfDeserializerOptions::default()).unwrap();
300 assert_eq!(events.len(), 1);
301
302 let log = events[0].as_log();
303
304 assert_eq!(
305 log.get(VERSION),
306 Some(&Value::Bytes(Bytes::from_static(b"1.1")))
307 );
308 assert_eq!(
309 log.get(HOST),
310 Some(&Value::Bytes(Bytes::from_static(b"example.org")))
311 );
312 assert_eq!(
313 log.get(log_schema().message_key_target_path().unwrap()),
314 Some(&Value::Bytes(Bytes::from_static(
315 b"A short message that helps you identify what is going on"
316 )))
317 );
318 assert_eq!(
319 log.get(FULL_MESSAGE),
320 Some(&Value::Bytes(Bytes::from_static(
321 b"Backtrace here\n\nmore stuff"
322 )))
323 );
324 let dt = DateTime::from_timestamp(1385053862, 307_200_000).expect("invalid timestamp");
325 assert_eq!(log.get(TIMESTAMP), Some(&Value::Timestamp(dt)));
326 assert_eq!(log.get(LEVEL), Some(&Value::Integer(1)));
327 assert_eq!(
328 log.get(FACILITY),
329 Some(&Value::Bytes(Bytes::from_static(b"foo")))
330 );
331 assert_eq!(
332 log.get(LINE),
333 Some(&Value::Float(ordered_float::NotNan::new(42.0).unwrap()))
334 );
335 assert_eq!(
336 log.get(FILE),
337 Some(&Value::Bytes(Bytes::from_static(b"/tmp/bar")))
338 );
339 assert_eq!(
340 log.get(event_path!(add_on_int_in)),
341 Some(&Value::Float(
342 ordered_float::NotNan::new(2001.1002).unwrap()
343 ))
344 );
345 assert_eq!(
346 log.get(event_path!(add_on_str_in)),
347 Some(&Value::Bytes(Bytes::from_static(b"A Space Odyssey")))
348 );
349 }
350
351 #[test]
353 fn gelf_deserializing_edge_cases() {
354 {
356 let input = json!({
357 HOST: "example.org",
358 SHORT_MESSAGE: "foobar",
359 VERSION: "1.1",
360 });
361 let events =
362 deserialize_gelf_input(&input, GelfDeserializerOptions::default()).unwrap();
363 assert_eq!(events.len(), 1);
364 let log = events[0].as_log();
365 assert!(log.contains(log_schema().message_key_target_path().unwrap()));
366 }
367
368 {
370 let input = json!({
371 HOST: "example.org",
372 SHORT_MESSAGE: "foobar",
373 VERSION: "1.1",
374 "_id": "S3creTz",
375 });
376 let events =
377 deserialize_gelf_input(&input, GelfDeserializerOptions::default()).unwrap();
378 assert_eq!(events.len(), 1);
379 let log = events[0].as_log();
380 assert!(!log.contains(event_path!("_id")));
381 }
382 }
383
384 #[test]
386 fn gelf_deserializing_err() {
387 fn validate_err(input: &serde_json::Value) {
388 assert!(deserialize_gelf_input(input, GelfDeserializerOptions::default()).is_err());
389 }
390 validate_err(&json!({
392 HOST: "example.org",
393 SHORT_MESSAGE: "foobar",
394 VERSION: "1.1",
395 "_bad%key": "raboof",
396 }));
397
398 validate_err(&json!({
400 HOST: "example.org",
401 SHORT_MESSAGE: "foobar",
402 VERSION: "1.1",
403 "bad-key": "raboof",
404 }));
405
406 validate_err(&json!({
408 HOST: "example.org",
409 VERSION: "1.1",
410 }));
411
412 validate_err(&json!({
414 SHORT_MESSAGE: "foobar",
415 VERSION: "1.1",
416 }));
417
418 validate_err(&json!({
420 HOST: 42,
421 SHORT_MESSAGE: "foobar",
422 VERSION: "1.1",
423 }));
424
425 validate_err(&json!({
427 HOST: "example.org",
428 VERSION: "1.1",
429 SHORT_MESSAGE: "foobar",
430 LEVEL: "baz",
431 }));
432 }
433
434 #[test]
436 fn gelf_deserialize_relaxed() {
437 let incorrect_extra_field = "incorrect^_extra_field";
438 let input = json!({
439 VERSION: "1.0",
440 HOST: "example.org",
441 SHORT_MESSAGE: "A short message that helps you identify what is going on",
442 FULL_MESSAGE: "Backtrace here\n\nmore stuff",
443 TIMESTAMP: 1385053862.3072,
444 LEVEL: 1,
445 FACILITY: "foo",
446 LINE: 42,
447 FILE: "/tmp/bar",
448 incorrect_extra_field: null,
449 });
450
451 assert!(
452 deserialize_gelf_input(
453 &input,
454 GelfDeserializerOptions {
455 validation: ValidationMode::Strict,
456 ..Default::default()
457 }
458 )
459 .is_err()
460 );
461
462 let events = deserialize_gelf_input(
463 &input,
464 GelfDeserializerOptions {
465 validation: ValidationMode::Relaxed,
466 ..Default::default()
467 },
468 )
469 .unwrap();
470 assert_eq!(events.len(), 1);
471
472 let log = events[0].as_log();
473
474 assert_eq!(
475 log.get(VERSION),
476 Some(&Value::Bytes(Bytes::from_static(b"1.0")))
477 );
478
479 assert_eq!(
480 log.get(event_path!(incorrect_extra_field)),
481 Some(&Value::Null)
482 );
483 }
484}