codecs/decoding/format/
gelf.rs

1use std::collections::HashMap;
2
3use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use derivative::Derivative;
6use lookup::{event_path, owned_value_path};
7use serde::{Deserialize, Serialize};
8use serde_with::{TimestampSecondsWithFrac, serde_as};
9use smallvec::{SmallVec, smallvec};
10use vector_config::configurable_component;
11use vector_core::{
12    config::{DataType, LogNamespace, log_schema},
13    event::{Event, LogEvent},
14    schema,
15};
16use vrl::value::{Kind, Value, kind::Collection};
17
18use super::{Deserializer, default_lossy};
19use crate::{VALID_FIELD_REGEX, gelf::GELF_TARGET_PATHS, gelf_fields::*};
20
21// On GELF decoding behavior:
22//   Graylog has a relaxed decoding. They are much more lenient than the spec would
23//   suggest. We've elected to take a more strict approach to maintain backwards compatibility
24//   in the event that we need to change the behavior to be more relaxed, so that prior versions
25//   of vector will still work with the new relaxed decoding.
26//
27//   Additionally, Graylog's own GELF Output produces GELF messages with any field names present
28//   in the sending Stream, exceeding the specified field name character set.
29
30/// Config used to build a `GelfDeserializer`.
31#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct GelfDeserializerConfig {
34    /// GELF-specific decoding options.
35    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
36    pub gelf: GelfDeserializerOptions,
37}
38
39/// Configures the decoding validation mode.
40#[configurable_component]
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42#[serde(rename_all = "snake_case")]
43pub enum ValidationMode {
44    /// Uses strict validation that closely follows the GELF spec.
45    #[default]
46    Strict,
47
48    /// Uses more relaxed validation that skips strict GELF specification checks.
49    ///
50    /// This mode will not treat specification violations as errors, allowing the decoder
51    /// to accept messages from sources that don't strictly follow the GELF spec.
52    Relaxed,
53}
54
55impl GelfDeserializerConfig {
56    /// Creates a new `GelfDeserializerConfig`.
57    pub fn new(options: GelfDeserializerOptions) -> Self {
58        Self { gelf: options }
59    }
60
61    /// Build the `GelfDeserializer` from this configuration.
62    pub fn build(&self) -> GelfDeserializer {
63        GelfDeserializer {
64            lossy: self.gelf.lossy,
65            validation: self.gelf.validation,
66        }
67    }
68
69    /// Return the type of event built by this deserializer.
70    pub fn output_type(&self) -> DataType {
71        DataType::Log
72    }
73
74    /// The schema produced by the deserializer.
75    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
76        schema::Definition::new_with_default_metadata(
77            Kind::object(Collection::empty()),
78            [log_namespace],
79        )
80        .with_event_field(&owned_value_path!(VERSION), Kind::bytes(), None)
81        .with_event_field(&owned_value_path!(HOST), Kind::bytes(), None)
82        .with_event_field(&owned_value_path!(SHORT_MESSAGE), Kind::bytes(), None)
83        .optional_field(&owned_value_path!(FULL_MESSAGE), Kind::bytes(), None)
84        .optional_field(&owned_value_path!(TIMESTAMP), Kind::timestamp(), None)
85        .optional_field(&owned_value_path!(LEVEL), Kind::integer(), None)
86        .optional_field(&owned_value_path!(FACILITY), Kind::bytes(), None)
87        .optional_field(&owned_value_path!(LINE), Kind::integer(), None)
88        .optional_field(&owned_value_path!(FILE), Kind::bytes(), None)
89        // Every field with an underscore (_) prefix will be treated as an additional field.
90        // Allowed characters in field names are any word character (letter, number, underscore), dashes and dots.
91        // Libraries SHOULD not allow to send id as additional field ( _id). Graylog server nodes omit this field automatically.
92        .unknown_fields(Kind::bytes().or_integer().or_float())
93    }
94}
95
96/// GELF-specific decoding options.
97#[configurable_component]
98#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
99#[derivative(Default)]
100pub struct GelfDeserializerOptions {
101    /// Determines whether to replace invalid UTF-8 sequences instead of failing.
102    ///
103    /// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
104    ///
105    /// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
106    #[serde(
107        default = "default_lossy",
108        skip_serializing_if = "vector_core::serde::is_default"
109    )]
110    #[derivative(Default(value = "default_lossy()"))]
111    pub lossy: bool,
112
113    /// Configures the decoding validation mode.
114    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
115    pub validation: ValidationMode,
116}
117
118/// Deserializer that builds an `Event` from a byte frame containing a GELF log message.
119#[derive(Debug, Clone, Derivative)]
120#[derivative(Default)]
121pub struct GelfDeserializer {
122    #[derivative(Default(value = "default_lossy()"))]
123    lossy: bool,
124
125    validation: ValidationMode,
126}
127
128impl GelfDeserializer {
129    /// Create a new `GelfDeserializer`.
130    pub fn new(lossy: bool, validation: ValidationMode) -> GelfDeserializer {
131        GelfDeserializer { lossy, validation }
132    }
133
134    /// Builds a LogEvent from the parsed GelfMessage.
135    /// The logic follows strictly the documented GELF standard.
136    fn message_to_event(&self, parsed: &GelfMessage) -> vector_common::Result<Event> {
137        let mut log = LogEvent::from_str_legacy(parsed.short_message.to_string());
138
139        // GELF spec defines the version as 1.1 which has not changed since 2013
140        if self.validation == ValidationMode::Strict && parsed.version != GELF_VERSION {
141            return Err(
142                format!("{VERSION} does not match GELF spec version ({GELF_VERSION})").into(),
143            );
144        }
145
146        log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
147        log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());
148
149        if let Some(full_message) = &parsed.full_message {
150            log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
151        }
152
153        if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
154            if let Some(timestamp) = parsed.timestamp {
155                log.insert(timestamp_key, timestamp);
156                // per GELF spec- add timestamp if not provided
157            } else {
158                log.insert(timestamp_key, Utc::now());
159            }
160        }
161
162        if let Some(level) = parsed.level {
163            log.insert(&GELF_TARGET_PATHS.level, level);
164        }
165        if let Some(facility) = &parsed.facility {
166            log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
167        }
168        if let Some(line) = parsed.line {
169            log.insert(
170                &GELF_TARGET_PATHS.line,
171                Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
172            );
173        }
174        if let Some(file) = &parsed.file {
175            log.insert(&GELF_TARGET_PATHS.file, file.to_string());
176        }
177
178        if let Some(add) = &parsed.additional_fields {
179            for (key, val) in add.iter() {
180                // per GELF spec, filter out _id
181                if key == "_id" {
182                    continue;
183                }
184                // per GELF spec, Additional field names must be prefixed with an underscore
185                if self.validation == ValidationMode::Strict && !key.starts_with('_') {
186                    return Err(format!(
187                        "'{key}' field is invalid. \
188                                       Additional field names must be prefixed with an underscore."
189                    )
190                    .into());
191                }
192                // per GELF spec, Additional field names must be characters dashes or dots
193                if self.validation == ValidationMode::Strict && !VALID_FIELD_REGEX.is_match(key) {
194                    return Err(format!(
195                        "'{key}' field contains invalid characters. Field names may \
196                                       contain only letters, numbers, underscores, dashes and dots."
197                    )
198                    .into());
199                }
200
201                // per GELF spec, Additional field values must be either strings or numbers
202                if self.validation != ValidationMode::Strict || val.is_string() || val.is_number() {
203                    let vector_val: Value = val.into();
204                    log.insert(event_path!(key.as_str()), vector_val);
205                } else {
206                    let type_ = match val {
207                        serde_json::Value::Null => "null",
208                        serde_json::Value::Bool(_) => "boolean",
209                        serde_json::Value::Number(_) => "number",
210                        serde_json::Value::String(_) => "string",
211                        serde_json::Value::Array(_) => "array",
212                        serde_json::Value::Object(_) => "object",
213                    };
214                    return Err(format!("The value type for field {key} is an invalid type ({type_}). Additional field values \
215                                       should be either strings or numbers.").into());
216                }
217            }
218        }
219        Ok(Event::Log(log))
220    }
221}
222
223#[serde_as]
224#[derive(Serialize, Deserialize, Debug)]
225struct GelfMessage {
226    version: String,
227    host: String,
228    short_message: String,
229    full_message: Option<String>,
230    #[serde_as(as = "Option<TimestampSecondsWithFrac<f64>>")]
231    timestamp: Option<DateTime<Utc>>,
232    level: Option<u8>,
233    facility: Option<String>,
234    line: Option<f64>,
235    file: Option<String>,
236    #[serde(flatten)]
237    additional_fields: Option<HashMap<String, serde_json::Value>>,
238}
239
240impl Deserializer for GelfDeserializer {
241    fn parse(
242        &self,
243        bytes: Bytes,
244        _log_namespace: LogNamespace,
245    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
246        let parsed: GelfMessage = match self.lossy {
247            true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
248            false => serde_json::from_slice(&bytes),
249        }?;
250        let event = self.message_to_event(&parsed)?;
251
252        Ok(smallvec![event])
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use bytes::Bytes;
259    use lookup::event_path;
260    use serde_json::json;
261    use similar_asserts::assert_eq;
262    use smallvec::SmallVec;
263    use vector_core::{config::log_schema, event::Event};
264    use vrl::value::Value;
265
266    use super::*;
267
268    fn deserialize_gelf_input(
269        input: &serde_json::Value,
270        options: GelfDeserializerOptions,
271    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
272        let config = GelfDeserializerConfig::new(options);
273        let deserializer = config.build();
274        let buffer = Bytes::from(serde_json::to_vec(&input).unwrap());
275        deserializer.parse(buffer, LogNamespace::Legacy)
276    }
277
278    /// Validates all the spec'd fields of GELF are deserialized correctly.
279    #[test]
280    fn gelf_deserialize_correctness() {
281        let add_on_int_in = "_an.add-field_int";
282        let add_on_str_in = "_an.add-field_str";
283
284        let input = json!({
285            VERSION: "1.1",
286            HOST: "example.org",
287            SHORT_MESSAGE: "A short message that helps you identify what is going on",
288            FULL_MESSAGE: "Backtrace here\n\nmore stuff",
289            TIMESTAMP: 1385053862.3072,
290            LEVEL: 1,
291            FACILITY: "foo",
292            LINE: 42,
293            FILE: "/tmp/bar",
294            add_on_int_in: 2001.1002,
295            add_on_str_in: "A Space Odyssey",
296        });
297
298        // Ensure that we can parse the gelf json successfully
299        let events = deserialize_gelf_input(&input, GelfDeserializerOptions::default()).unwrap();
300        assert_eq!(events.len(), 1);
301
302        let log = events[0].as_log();
303
304        assert_eq!(
305            log.get(VERSION),
306            Some(&Value::Bytes(Bytes::from_static(b"1.1")))
307        );
308        assert_eq!(
309            log.get(HOST),
310            Some(&Value::Bytes(Bytes::from_static(b"example.org")))
311        );
312        assert_eq!(
313            log.get(log_schema().message_key_target_path().unwrap()),
314            Some(&Value::Bytes(Bytes::from_static(
315                b"A short message that helps you identify what is going on"
316            )))
317        );
318        assert_eq!(
319            log.get(FULL_MESSAGE),
320            Some(&Value::Bytes(Bytes::from_static(
321                b"Backtrace here\n\nmore stuff"
322            )))
323        );
324        let dt = DateTime::from_timestamp(1385053862, 307_200_000).expect("invalid timestamp");
325        assert_eq!(log.get(TIMESTAMP), Some(&Value::Timestamp(dt)));
326        assert_eq!(log.get(LEVEL), Some(&Value::Integer(1)));
327        assert_eq!(
328            log.get(FACILITY),
329            Some(&Value::Bytes(Bytes::from_static(b"foo")))
330        );
331        assert_eq!(
332            log.get(LINE),
333            Some(&Value::Float(ordered_float::NotNan::new(42.0).unwrap()))
334        );
335        assert_eq!(
336            log.get(FILE),
337            Some(&Value::Bytes(Bytes::from_static(b"/tmp/bar")))
338        );
339        assert_eq!(
340            log.get(event_path!(add_on_int_in)),
341            Some(&Value::Float(
342                ordered_float::NotNan::new(2001.1002).unwrap()
343            ))
344        );
345        assert_eq!(
346            log.get(event_path!(add_on_str_in)),
347            Some(&Value::Bytes(Bytes::from_static(b"A Space Odyssey")))
348        );
349    }
350
351    /// Validates deserialization succeeds for edge case inputs.
352    #[test]
353    fn gelf_deserializing_edge_cases() {
354        // timestamp is set if omitted from input
355        {
356            let input = json!({
357                HOST: "example.org",
358                SHORT_MESSAGE: "foobar",
359                VERSION: "1.1",
360            });
361            let events =
362                deserialize_gelf_input(&input, GelfDeserializerOptions::default()).unwrap();
363            assert_eq!(events.len(), 1);
364            let log = events[0].as_log();
365            assert!(log.contains(log_schema().message_key_target_path().unwrap()));
366        }
367
368        // filter out id
369        {
370            let input = json!({
371                HOST: "example.org",
372                SHORT_MESSAGE: "foobar",
373                VERSION: "1.1",
374                "_id": "S3creTz",
375            });
376            let events =
377                deserialize_gelf_input(&input, GelfDeserializerOptions::default()).unwrap();
378            assert_eq!(events.len(), 1);
379            let log = events[0].as_log();
380            assert!(!log.contains(event_path!("_id")));
381        }
382    }
383
384    /// Validates the error conditions in deserialization
385    #[test]
386    fn gelf_deserializing_err() {
387        fn validate_err(input: &serde_json::Value) {
388            assert!(deserialize_gelf_input(input, GelfDeserializerOptions::default()).is_err());
389        }
390        //  invalid character in field name
391        validate_err(&json!({
392            HOST: "example.org",
393            SHORT_MESSAGE: "foobar",
394            VERSION: "1.1",
395            "_bad%key": "raboof",
396        }));
397
398        //  not prefixed with underscore
399        validate_err(&json!({
400            HOST: "example.org",
401            SHORT_MESSAGE: "foobar",
402            VERSION: "1.1",
403            "bad-key": "raboof",
404        }));
405
406        // missing short_message
407        validate_err(&json!({
408            HOST: "example.org",
409            VERSION: "1.1",
410        }));
411
412        // host is not specified
413        validate_err(&json!({
414            SHORT_MESSAGE: "foobar",
415            VERSION: "1.1",
416        }));
417
418        // host is not a string
419        validate_err(&json!({
420            HOST: 42,
421            SHORT_MESSAGE: "foobar",
422            VERSION: "1.1",
423        }));
424
425        //  level / line is string and not numeric
426        validate_err(&json!({
427            HOST: "example.org",
428            VERSION: "1.1",
429            SHORT_MESSAGE: "foobar",
430            LEVEL: "baz",
431        }));
432    }
433
434    /// Validates the relaxed validation mode
435    #[test]
436    fn gelf_deserialize_relaxed() {
437        let incorrect_extra_field = "incorrect^_extra_field";
438        let input = json!({
439            VERSION: "1.0",
440            HOST: "example.org",
441            SHORT_MESSAGE: "A short message that helps you identify what is going on",
442            FULL_MESSAGE: "Backtrace here\n\nmore stuff",
443            TIMESTAMP: 1385053862.3072,
444            LEVEL: 1,
445            FACILITY: "foo",
446            LINE: 42,
447            FILE: "/tmp/bar",
448            incorrect_extra_field: null,
449        });
450
451        assert!(
452            deserialize_gelf_input(
453                &input,
454                GelfDeserializerOptions {
455                    validation: ValidationMode::Strict,
456                    ..Default::default()
457                }
458            )
459            .is_err()
460        );
461
462        let events = deserialize_gelf_input(
463            &input,
464            GelfDeserializerOptions {
465                validation: ValidationMode::Relaxed,
466                ..Default::default()
467            },
468        )
469        .unwrap();
470        assert_eq!(events.len(), 1);
471
472        let log = events[0].as_log();
473
474        assert_eq!(
475            log.get(VERSION),
476            Some(&Value::Bytes(Bytes::from_static(b"1.0")))
477        );
478
479        assert_eq!(
480            log.get(event_path!(incorrect_extra_field)),
481            Some(&Value::Null)
482        );
483    }
484}