codecs/decoding/format/
gelf.rs

1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use derivative::Derivative;
4use lookup::{event_path, owned_value_path};
5use serde::{Deserialize, Serialize};
6use serde_with::{serde_as, TimestampSecondsWithFrac};
7use smallvec::{smallvec, SmallVec};
8use std::collections::HashMap;
9use vector_config::configurable_component;
10use vector_core::config::LogNamespace;
11use vector_core::{
12    config::{log_schema, DataType},
13    event::Event,
14    event::LogEvent,
15    schema,
16};
17use vrl::value::kind::Collection;
18use vrl::value::{Kind, Value};
19
20use super::{default_lossy, Deserializer};
21use crate::gelf::GELF_TARGET_PATHS;
22use crate::{gelf_fields::*, VALID_FIELD_REGEX};
23
24// On GELF decoding behavior:
25//   Graylog has a relaxed decoding. They are much more lenient than the spec would
26//   suggest. We've elected to take a more strict approach to maintain backwards compatibility
27//   in the event that we need to change the behavior to be more relaxed, so that prior versions
28//   of vector will still work with the new relaxed decoding.
29//
30//   Additionally, Graylog's own GELF Output produces GELF messages with any field names present
31//   in the sending Stream, exceeding the specified field name character set.
32
33/// Config used to build a `GelfDeserializer`.
34#[configurable_component]
35#[derive(Debug, Clone, Default)]
36pub struct GelfDeserializerConfig {
37    /// GELF-specific decoding options.
38    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
39    pub gelf: GelfDeserializerOptions,
40}
41
42impl GelfDeserializerConfig {
43    /// Creates a new `GelfDeserializerConfig`.
44    pub fn new(options: GelfDeserializerOptions) -> Self {
45        Self { gelf: options }
46    }
47
48    /// Build the `GelfDeserializer` from this configuration.
49    pub fn build(&self) -> GelfDeserializer {
50        GelfDeserializer {
51            lossy: self.gelf.lossy,
52        }
53    }
54
55    /// Return the type of event built by this deserializer.
56    pub fn output_type(&self) -> DataType {
57        DataType::Log
58    }
59
60    /// The schema produced by the deserializer.
61    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
62        schema::Definition::new_with_default_metadata(
63            Kind::object(Collection::empty()),
64            [log_namespace],
65        )
66        .with_event_field(&owned_value_path!(VERSION), Kind::bytes(), None)
67        .with_event_field(&owned_value_path!(HOST), Kind::bytes(), None)
68        .with_event_field(&owned_value_path!(SHORT_MESSAGE), Kind::bytes(), None)
69        .optional_field(&owned_value_path!(FULL_MESSAGE), Kind::bytes(), None)
70        .optional_field(&owned_value_path!(TIMESTAMP), Kind::timestamp(), None)
71        .optional_field(&owned_value_path!(LEVEL), Kind::integer(), None)
72        .optional_field(&owned_value_path!(FACILITY), Kind::bytes(), None)
73        .optional_field(&owned_value_path!(LINE), Kind::integer(), None)
74        .optional_field(&owned_value_path!(FILE), Kind::bytes(), None)
75        // Every field with an underscore (_) prefix will be treated as an additional field.
76        // Allowed characters in field names are any word character (letter, number, underscore), dashes and dots.
77        // Libraries SHOULD not allow to send id as additional field ( _id). Graylog server nodes omit this field automatically.
78        .unknown_fields(Kind::bytes().or_integer().or_float())
79    }
80}
81
82/// GELF-specific decoding options.
83#[configurable_component]
84#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
85#[derivative(Default)]
86pub struct GelfDeserializerOptions {
87    /// Determines whether to replace invalid UTF-8 sequences instead of failing.
88    ///
89    /// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
90    ///
91    /// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
92    #[serde(
93        default = "default_lossy",
94        skip_serializing_if = "vector_core::serde::is_default"
95    )]
96    #[derivative(Default(value = "default_lossy()"))]
97    pub lossy: bool,
98}
99
100/// Deserializer that builds an `Event` from a byte frame containing a GELF log message.
101#[derive(Debug, Clone, Derivative)]
102#[derivative(Default)]
103pub struct GelfDeserializer {
104    #[derivative(Default(value = "default_lossy()"))]
105    lossy: bool,
106}
107
108impl GelfDeserializer {
109    /// Create a new `GelfDeserializer`.
110    pub fn new(lossy: bool) -> GelfDeserializer {
111        GelfDeserializer { lossy }
112    }
113
114    /// Builds a LogEvent from the parsed GelfMessage.
115    /// The logic follows strictly the documented GELF standard.
116    fn message_to_event(&self, parsed: &GelfMessage) -> vector_common::Result<Event> {
117        let mut log = LogEvent::from_str_legacy(parsed.short_message.to_string());
118
119        // GELF spec defines the version as 1.1 which has not changed since 2013
120        if parsed.version != GELF_VERSION {
121            return Err(
122                format!("{VERSION} does not match GELF spec version ({GELF_VERSION})").into(),
123            );
124        }
125
126        log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
127        log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());
128
129        if let Some(full_message) = &parsed.full_message {
130            log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
131        }
132
133        if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
134            if let Some(timestamp) = parsed.timestamp {
135                log.insert(timestamp_key, timestamp);
136                // per GELF spec- add timestamp if not provided
137            } else {
138                log.insert(timestamp_key, Utc::now());
139            }
140        }
141
142        if let Some(level) = parsed.level {
143            log.insert(&GELF_TARGET_PATHS.level, level);
144        }
145        if let Some(facility) = &parsed.facility {
146            log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
147        }
148        if let Some(line) = parsed.line {
149            log.insert(
150                &GELF_TARGET_PATHS.line,
151                Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
152            );
153        }
154        if let Some(file) = &parsed.file {
155            log.insert(&GELF_TARGET_PATHS.file, file.to_string());
156        }
157
158        if let Some(add) = &parsed.additional_fields {
159            for (key, val) in add.iter() {
160                // per GELF spec, filter out _id
161                if key == "_id" {
162                    continue;
163                }
164                // per GELF spec, Additional field names must be prefixed with an underscore
165                if !key.starts_with('_') {
166                    return Err(format!(
167                        "'{key}' field is invalid. \
168                                       Additional field names must be prefixed with an underscore."
169                    )
170                    .into());
171                }
172                // per GELF spec, Additional field names must be characters dashes or dots
173                if !VALID_FIELD_REGEX.is_match(key) {
174                    return Err(format!("'{key}' field contains invalid characters. Field names may \
175                                       contain only letters, numbers, underscores, dashes and dots.").into());
176                }
177
178                // per GELF spec, Additional field values must be either strings or numbers
179                if val.is_string() || val.is_number() {
180                    let vector_val: Value = val.into();
181                    log.insert(event_path!(key.as_str()), vector_val);
182                } else {
183                    let type_ = match val {
184                        serde_json::Value::Null => "null",
185                        serde_json::Value::Bool(_) => "boolean",
186                        serde_json::Value::Number(_) => "number",
187                        serde_json::Value::String(_) => "string",
188                        serde_json::Value::Array(_) => "array",
189                        serde_json::Value::Object(_) => "object",
190                    };
191                    return Err(format!("The value type for field {key} is an invalid type ({type_}). Additional field values \
192                                       should be either strings or numbers.").into());
193                }
194            }
195        }
196        Ok(Event::Log(log))
197    }
198}
199
200#[serde_as]
201#[derive(Serialize, Deserialize, Debug)]
202struct GelfMessage {
203    version: String,
204    host: String,
205    short_message: String,
206    full_message: Option<String>,
207    #[serde_as(as = "Option<TimestampSecondsWithFrac<f64>>")]
208    timestamp: Option<DateTime<Utc>>,
209    level: Option<u8>,
210    facility: Option<String>,
211    line: Option<f64>,
212    file: Option<String>,
213    #[serde(flatten)]
214    additional_fields: Option<HashMap<String, serde_json::Value>>,
215}
216
217impl Deserializer for GelfDeserializer {
218    fn parse(
219        &self,
220        bytes: Bytes,
221        _log_namespace: LogNamespace,
222    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
223        let parsed: GelfMessage = match self.lossy {
224            true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
225            false => serde_json::from_slice(&bytes),
226        }?;
227        let event = self.message_to_event(&parsed)?;
228
229        Ok(smallvec![event])
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use bytes::Bytes;
237    use lookup::event_path;
238    use serde_json::json;
239    use similar_asserts::assert_eq;
240    use smallvec::SmallVec;
241    use vector_core::{config::log_schema, event::Event};
242    use vrl::value::Value;
243
244    fn deserialize_gelf_input(
245        input: &serde_json::Value,
246    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
247        let config = GelfDeserializerConfig::default();
248        let deserializer = config.build();
249        let buffer = Bytes::from(serde_json::to_vec(&input).unwrap());
250        deserializer.parse(buffer, LogNamespace::Legacy)
251    }
252
253    /// Validates all the spec'd fields of GELF are deserialized correctly.
254    #[test]
255    fn gelf_deserialize_correctness() {
256        let add_on_int_in = "_an.add-field_int";
257        let add_on_str_in = "_an.add-field_str";
258
259        let input = json!({
260            VERSION: "1.1",
261            HOST: "example.org",
262            SHORT_MESSAGE: "A short message that helps you identify what is going on",
263            FULL_MESSAGE: "Backtrace here\n\nmore stuff",
264            TIMESTAMP: 1385053862.3072,
265            LEVEL: 1,
266            FACILITY: "foo",
267            LINE: 42,
268            FILE: "/tmp/bar",
269            add_on_int_in: 2001.1002,
270            add_on_str_in: "A Space Odyssey",
271        });
272
273        // Ensure that we can parse the gelf json successfully
274        let events = deserialize_gelf_input(&input).unwrap();
275        assert_eq!(events.len(), 1);
276
277        let log = events[0].as_log();
278
279        assert_eq!(
280            log.get(VERSION),
281            Some(&Value::Bytes(Bytes::from_static(b"1.1")))
282        );
283        assert_eq!(
284            log.get(HOST),
285            Some(&Value::Bytes(Bytes::from_static(b"example.org")))
286        );
287        assert_eq!(
288            log.get(log_schema().message_key_target_path().unwrap()),
289            Some(&Value::Bytes(Bytes::from_static(
290                b"A short message that helps you identify what is going on"
291            )))
292        );
293        assert_eq!(
294            log.get(FULL_MESSAGE),
295            Some(&Value::Bytes(Bytes::from_static(
296                b"Backtrace here\n\nmore stuff"
297            )))
298        );
299        let dt = DateTime::from_timestamp(1385053862, 307_200_000).expect("invalid timestamp");
300        assert_eq!(log.get(TIMESTAMP), Some(&Value::Timestamp(dt)));
301        assert_eq!(log.get(LEVEL), Some(&Value::Integer(1)));
302        assert_eq!(
303            log.get(FACILITY),
304            Some(&Value::Bytes(Bytes::from_static(b"foo")))
305        );
306        assert_eq!(
307            log.get(LINE),
308            Some(&Value::Float(ordered_float::NotNan::new(42.0).unwrap()))
309        );
310        assert_eq!(
311            log.get(FILE),
312            Some(&Value::Bytes(Bytes::from_static(b"/tmp/bar")))
313        );
314        assert_eq!(
315            log.get(event_path!(add_on_int_in)),
316            Some(&Value::Float(
317                ordered_float::NotNan::new(2001.1002).unwrap()
318            ))
319        );
320        assert_eq!(
321            log.get(event_path!(add_on_str_in)),
322            Some(&Value::Bytes(Bytes::from_static(b"A Space Odyssey")))
323        );
324    }
325
326    /// Validates deserialization succeeds for edge case inputs.
327    #[test]
328    fn gelf_deserializing_edge_cases() {
329        // timestamp is set if omitted from input
330        {
331            let input = json!({
332                HOST: "example.org",
333                SHORT_MESSAGE: "foobar",
334                VERSION: "1.1",
335            });
336            let events = deserialize_gelf_input(&input).unwrap();
337            assert_eq!(events.len(), 1);
338            let log = events[0].as_log();
339            assert!(log.contains(log_schema().message_key_target_path().unwrap()));
340        }
341
342        // filter out id
343        {
344            let input = json!({
345                HOST: "example.org",
346                SHORT_MESSAGE: "foobar",
347                VERSION: "1.1",
348                "_id": "S3creTz",
349            });
350            let events = deserialize_gelf_input(&input).unwrap();
351            assert_eq!(events.len(), 1);
352            let log = events[0].as_log();
353            assert!(!log.contains(event_path!("_id")));
354        }
355    }
356
357    /// Validates the error conditions in deserialization
358    #[test]
359    fn gelf_deserializing_err() {
360        fn validate_err(input: &serde_json::Value) {
361            assert!(deserialize_gelf_input(input).is_err());
362        }
363        //  invalid character in field name
364        validate_err(&json!({
365            HOST: "example.org",
366            SHORT_MESSAGE: "foobar",
367            VERSION: "1.1",
368            "_bad%key": "raboof",
369        }));
370
371        //  not prefixed with underscore
372        validate_err(&json!({
373            HOST: "example.org",
374            SHORT_MESSAGE: "foobar",
375            VERSION: "1.1",
376            "bad-key": "raboof",
377        }));
378
379        // missing short_message
380        validate_err(&json!({
381            HOST: "example.org",
382            VERSION: "1.1",
383        }));
384
385        // host is not specified
386        validate_err(&json!({
387            SHORT_MESSAGE: "foobar",
388            VERSION: "1.1",
389        }));
390
391        // host is not a string
392        validate_err(&json!({
393            HOST: 42,
394            SHORT_MESSAGE: "foobar",
395            VERSION: "1.1",
396        }));
397
398        //  level / line is string and not numeric
399        validate_err(&json!({
400            HOST: "example.org",
401            VERSION: "1.1",
402            SHORT_MESSAGE: "foobar",
403            LEVEL: "baz",
404        }));
405    }
406}