codecs/decoding/format/
gelf.rs

1use std::collections::HashMap;
2
3use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use derivative::Derivative;
6use lookup::{event_path, owned_value_path};
7use serde::{Deserialize, Serialize};
8use serde_with::{TimestampSecondsWithFrac, serde_as};
9use smallvec::{SmallVec, smallvec};
10use vector_config::configurable_component;
11use vector_core::{
12    config::{DataType, LogNamespace, log_schema},
13    event::{Event, LogEvent},
14    schema,
15};
16use vrl::value::{Kind, Value, kind::Collection};
17
18use super::{Deserializer, default_lossy};
19use crate::{VALID_FIELD_REGEX, gelf::GELF_TARGET_PATHS, gelf_fields::*};
20
21// On GELF decoding behavior:
22//   Graylog has a relaxed decoding. They are much more lenient than the spec would
23//   suggest. We've elected to take a more strict approach to maintain backwards compatibility
24//   in the event that we need to change the behavior to be more relaxed, so that prior versions
25//   of vector will still work with the new relaxed decoding.
26//
27//   Additionally, Graylog's own GELF Output produces GELF messages with any field names present
28//   in the sending Stream, exceeding the specified field name character set.
29
30/// Config used to build a `GelfDeserializer`.
31#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct GelfDeserializerConfig {
34    /// GELF-specific decoding options.
35    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
36    pub gelf: GelfDeserializerOptions,
37}
38
39impl GelfDeserializerConfig {
40    /// Creates a new `GelfDeserializerConfig`.
41    pub fn new(options: GelfDeserializerOptions) -> Self {
42        Self { gelf: options }
43    }
44
45    /// Build the `GelfDeserializer` from this configuration.
46    pub fn build(&self) -> GelfDeserializer {
47        GelfDeserializer {
48            lossy: self.gelf.lossy,
49        }
50    }
51
52    /// Return the type of event built by this deserializer.
53    pub fn output_type(&self) -> DataType {
54        DataType::Log
55    }
56
57    /// The schema produced by the deserializer.
58    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
59        schema::Definition::new_with_default_metadata(
60            Kind::object(Collection::empty()),
61            [log_namespace],
62        )
63        .with_event_field(&owned_value_path!(VERSION), Kind::bytes(), None)
64        .with_event_field(&owned_value_path!(HOST), Kind::bytes(), None)
65        .with_event_field(&owned_value_path!(SHORT_MESSAGE), Kind::bytes(), None)
66        .optional_field(&owned_value_path!(FULL_MESSAGE), Kind::bytes(), None)
67        .optional_field(&owned_value_path!(TIMESTAMP), Kind::timestamp(), None)
68        .optional_field(&owned_value_path!(LEVEL), Kind::integer(), None)
69        .optional_field(&owned_value_path!(FACILITY), Kind::bytes(), None)
70        .optional_field(&owned_value_path!(LINE), Kind::integer(), None)
71        .optional_field(&owned_value_path!(FILE), Kind::bytes(), None)
72        // Every field with an underscore (_) prefix will be treated as an additional field.
73        // Allowed characters in field names are any word character (letter, number, underscore), dashes and dots.
74        // Libraries SHOULD not allow to send id as additional field ( _id). Graylog server nodes omit this field automatically.
75        .unknown_fields(Kind::bytes().or_integer().or_float())
76    }
77}
78
79/// GELF-specific decoding options.
80#[configurable_component]
81#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
82#[derivative(Default)]
83pub struct GelfDeserializerOptions {
84    /// Determines whether to replace invalid UTF-8 sequences instead of failing.
85    ///
86    /// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
87    ///
88    /// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
89    #[serde(
90        default = "default_lossy",
91        skip_serializing_if = "vector_core::serde::is_default"
92    )]
93    #[derivative(Default(value = "default_lossy()"))]
94    pub lossy: bool,
95}
96
97/// Deserializer that builds an `Event` from a byte frame containing a GELF log message.
98#[derive(Debug, Clone, Derivative)]
99#[derivative(Default)]
100pub struct GelfDeserializer {
101    #[derivative(Default(value = "default_lossy()"))]
102    lossy: bool,
103}
104
105impl GelfDeserializer {
106    /// Create a new `GelfDeserializer`.
107    pub fn new(lossy: bool) -> GelfDeserializer {
108        GelfDeserializer { lossy }
109    }
110
111    /// Builds a LogEvent from the parsed GelfMessage.
112    /// The logic follows strictly the documented GELF standard.
113    fn message_to_event(&self, parsed: &GelfMessage) -> vector_common::Result<Event> {
114        let mut log = LogEvent::from_str_legacy(parsed.short_message.to_string());
115
116        // GELF spec defines the version as 1.1 which has not changed since 2013
117        if parsed.version != GELF_VERSION {
118            return Err(
119                format!("{VERSION} does not match GELF spec version ({GELF_VERSION})").into(),
120            );
121        }
122
123        log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
124        log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());
125
126        if let Some(full_message) = &parsed.full_message {
127            log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
128        }
129
130        if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
131            if let Some(timestamp) = parsed.timestamp {
132                log.insert(timestamp_key, timestamp);
133                // per GELF spec- add timestamp if not provided
134            } else {
135                log.insert(timestamp_key, Utc::now());
136            }
137        }
138
139        if let Some(level) = parsed.level {
140            log.insert(&GELF_TARGET_PATHS.level, level);
141        }
142        if let Some(facility) = &parsed.facility {
143            log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
144        }
145        if let Some(line) = parsed.line {
146            log.insert(
147                &GELF_TARGET_PATHS.line,
148                Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
149            );
150        }
151        if let Some(file) = &parsed.file {
152            log.insert(&GELF_TARGET_PATHS.file, file.to_string());
153        }
154
155        if let Some(add) = &parsed.additional_fields {
156            for (key, val) in add.iter() {
157                // per GELF spec, filter out _id
158                if key == "_id" {
159                    continue;
160                }
161                // per GELF spec, Additional field names must be prefixed with an underscore
162                if !key.starts_with('_') {
163                    return Err(format!(
164                        "'{key}' field is invalid. \
165                                       Additional field names must be prefixed with an underscore."
166                    )
167                    .into());
168                }
169                // per GELF spec, Additional field names must be characters dashes or dots
170                if !VALID_FIELD_REGEX.is_match(key) {
171                    return Err(format!(
172                        "'{key}' field contains invalid characters. Field names may \
173                                       contain only letters, numbers, underscores, dashes and dots."
174                    )
175                    .into());
176                }
177
178                // per GELF spec, Additional field values must be either strings or numbers
179                if val.is_string() || val.is_number() {
180                    let vector_val: Value = val.into();
181                    log.insert(event_path!(key.as_str()), vector_val);
182                } else {
183                    let type_ = match val {
184                        serde_json::Value::Null => "null",
185                        serde_json::Value::Bool(_) => "boolean",
186                        serde_json::Value::Number(_) => "number",
187                        serde_json::Value::String(_) => "string",
188                        serde_json::Value::Array(_) => "array",
189                        serde_json::Value::Object(_) => "object",
190                    };
191                    return Err(format!("The value type for field {key} is an invalid type ({type_}). Additional field values \
192                                       should be either strings or numbers.").into());
193                }
194            }
195        }
196        Ok(Event::Log(log))
197    }
198}
199
200#[serde_as]
201#[derive(Serialize, Deserialize, Debug)]
202struct GelfMessage {
203    version: String,
204    host: String,
205    short_message: String,
206    full_message: Option<String>,
207    #[serde_as(as = "Option<TimestampSecondsWithFrac<f64>>")]
208    timestamp: Option<DateTime<Utc>>,
209    level: Option<u8>,
210    facility: Option<String>,
211    line: Option<f64>,
212    file: Option<String>,
213    #[serde(flatten)]
214    additional_fields: Option<HashMap<String, serde_json::Value>>,
215}
216
217impl Deserializer for GelfDeserializer {
218    fn parse(
219        &self,
220        bytes: Bytes,
221        _log_namespace: LogNamespace,
222    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
223        let parsed: GelfMessage = match self.lossy {
224            true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
225            false => serde_json::from_slice(&bytes),
226        }?;
227        let event = self.message_to_event(&parsed)?;
228
229        Ok(smallvec![event])
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use bytes::Bytes;
236    use lookup::event_path;
237    use serde_json::json;
238    use similar_asserts::assert_eq;
239    use smallvec::SmallVec;
240    use vector_core::{config::log_schema, event::Event};
241    use vrl::value::Value;
242
243    use super::*;
244
245    fn deserialize_gelf_input(
246        input: &serde_json::Value,
247    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
248        let config = GelfDeserializerConfig::default();
249        let deserializer = config.build();
250        let buffer = Bytes::from(serde_json::to_vec(&input).unwrap());
251        deserializer.parse(buffer, LogNamespace::Legacy)
252    }
253
254    /// Validates all the spec'd fields of GELF are deserialized correctly.
255    #[test]
256    fn gelf_deserialize_correctness() {
257        let add_on_int_in = "_an.add-field_int";
258        let add_on_str_in = "_an.add-field_str";
259
260        let input = json!({
261            VERSION: "1.1",
262            HOST: "example.org",
263            SHORT_MESSAGE: "A short message that helps you identify what is going on",
264            FULL_MESSAGE: "Backtrace here\n\nmore stuff",
265            TIMESTAMP: 1385053862.3072,
266            LEVEL: 1,
267            FACILITY: "foo",
268            LINE: 42,
269            FILE: "/tmp/bar",
270            add_on_int_in: 2001.1002,
271            add_on_str_in: "A Space Odyssey",
272        });
273
274        // Ensure that we can parse the gelf json successfully
275        let events = deserialize_gelf_input(&input).unwrap();
276        assert_eq!(events.len(), 1);
277
278        let log = events[0].as_log();
279
280        assert_eq!(
281            log.get(VERSION),
282            Some(&Value::Bytes(Bytes::from_static(b"1.1")))
283        );
284        assert_eq!(
285            log.get(HOST),
286            Some(&Value::Bytes(Bytes::from_static(b"example.org")))
287        );
288        assert_eq!(
289            log.get(log_schema().message_key_target_path().unwrap()),
290            Some(&Value::Bytes(Bytes::from_static(
291                b"A short message that helps you identify what is going on"
292            )))
293        );
294        assert_eq!(
295            log.get(FULL_MESSAGE),
296            Some(&Value::Bytes(Bytes::from_static(
297                b"Backtrace here\n\nmore stuff"
298            )))
299        );
300        let dt = DateTime::from_timestamp(1385053862, 307_200_000).expect("invalid timestamp");
301        assert_eq!(log.get(TIMESTAMP), Some(&Value::Timestamp(dt)));
302        assert_eq!(log.get(LEVEL), Some(&Value::Integer(1)));
303        assert_eq!(
304            log.get(FACILITY),
305            Some(&Value::Bytes(Bytes::from_static(b"foo")))
306        );
307        assert_eq!(
308            log.get(LINE),
309            Some(&Value::Float(ordered_float::NotNan::new(42.0).unwrap()))
310        );
311        assert_eq!(
312            log.get(FILE),
313            Some(&Value::Bytes(Bytes::from_static(b"/tmp/bar")))
314        );
315        assert_eq!(
316            log.get(event_path!(add_on_int_in)),
317            Some(&Value::Float(
318                ordered_float::NotNan::new(2001.1002).unwrap()
319            ))
320        );
321        assert_eq!(
322            log.get(event_path!(add_on_str_in)),
323            Some(&Value::Bytes(Bytes::from_static(b"A Space Odyssey")))
324        );
325    }
326
327    /// Validates deserialization succeeds for edge case inputs.
328    #[test]
329    fn gelf_deserializing_edge_cases() {
330        // timestamp is set if omitted from input
331        {
332            let input = json!({
333                HOST: "example.org",
334                SHORT_MESSAGE: "foobar",
335                VERSION: "1.1",
336            });
337            let events = deserialize_gelf_input(&input).unwrap();
338            assert_eq!(events.len(), 1);
339            let log = events[0].as_log();
340            assert!(log.contains(log_schema().message_key_target_path().unwrap()));
341        }
342
343        // filter out id
344        {
345            let input = json!({
346                HOST: "example.org",
347                SHORT_MESSAGE: "foobar",
348                VERSION: "1.1",
349                "_id": "S3creTz",
350            });
351            let events = deserialize_gelf_input(&input).unwrap();
352            assert_eq!(events.len(), 1);
353            let log = events[0].as_log();
354            assert!(!log.contains(event_path!("_id")));
355        }
356    }
357
358    /// Validates the error conditions in deserialization
359    #[test]
360    fn gelf_deserializing_err() {
361        fn validate_err(input: &serde_json::Value) {
362            assert!(deserialize_gelf_input(input).is_err());
363        }
364        //  invalid character in field name
365        validate_err(&json!({
366            HOST: "example.org",
367            SHORT_MESSAGE: "foobar",
368            VERSION: "1.1",
369            "_bad%key": "raboof",
370        }));
371
372        //  not prefixed with underscore
373        validate_err(&json!({
374            HOST: "example.org",
375            SHORT_MESSAGE: "foobar",
376            VERSION: "1.1",
377            "bad-key": "raboof",
378        }));
379
380        // missing short_message
381        validate_err(&json!({
382            HOST: "example.org",
383            VERSION: "1.1",
384        }));
385
386        // host is not specified
387        validate_err(&json!({
388            SHORT_MESSAGE: "foobar",
389            VERSION: "1.1",
390        }));
391
392        // host is not a string
393        validate_err(&json!({
394            HOST: 42,
395            SHORT_MESSAGE: "foobar",
396            VERSION: "1.1",
397        }));
398
399        //  level / line is string and not numeric
400        validate_err(&json!({
401            HOST: "example.org",
402            VERSION: "1.1",
403            SHORT_MESSAGE: "foobar",
404            LEVEL: "baz",
405        }));
406    }
407}