codecs/encoding/format/
gelf.rs

1use crate::{VALID_FIELD_REGEX, encoding::GelfChunker, gelf::GELF_TARGET_PATHS, gelf_fields::*};
2use bytes::{BufMut, BytesMut};
3use lookup::event_path;
4use ordered_float::NotNan;
5use snafu::Snafu;
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9    config::{DataType, log_schema},
10    event::{Event, KeyString, LogEvent, Value},
11    schema,
12};
13
14/// Config used to build a `GelfSerializer`.
15#[configurable_component]
16#[derive(Debug, Clone)]
17pub struct GelfSerializerOptions {
18    /// Maximum size for each GELF chunked datagram (including 12-byte header).
19    /// Chunking starts when datagrams exceed this size.
20    /// For Graylog target, keep at or below 8192 bytes; for Vector target (`gelf` decoding with `chunked_gelf` framing), up to 65,500 bytes is recommended.
21    #[configurable(validation(range(min = 13)))]
22    #[serde(default = "default_max_chunk_size")]
23    pub max_chunk_size: usize,
24}
25
26const fn default_max_chunk_size() -> usize {
27    8192
28}
29
30impl Default for GelfSerializerOptions {
31    fn default() -> Self {
32        Self {
33            max_chunk_size: default_max_chunk_size(),
34        }
35    }
36}
37
38/// On GELF encoding behavior:
39///   Graylog has a relaxed parsing. They are much more lenient than the spec would
40///   suggest. We've elected to take a more strict approach to maintain backwards compatibility
41///   in the event that we need to change the behavior to be more relaxed, so that prior versions
42///   of vector will still work.
43///   The exception is that if 'Additional fields' are found to be missing an underscore prefix and
44///   are otherwise valid field names, we prepend the underscore.
45///
46/// Errors that can occur during GELF serialization.
47#[derive(Debug, Snafu)]
48pub enum GelfSerializerError {
49    #[snafu(display(r#"LogEvent does not contain required field: "{}""#, field))]
50    MissingField { field: KeyString },
51    #[snafu(display(
52        r#"LogEvent contains field with invalid name not matching pattern '{}': "{}""#,
53        pattern,
54        field,
55    ))]
56    InvalidField { field: KeyString, pattern: String },
57    #[snafu(display(
58        r#"LogEvent contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#,
59        field,
60        actual_type,
61        expected_type
62    ))]
63    InvalidValueType {
64        field: String,
65        actual_type: String,
66        expected_type: String,
67    },
68}
69
70/// Config used to build a `GelfSerializer`.
71#[configurable_component]
72#[derive(Debug, Clone, Default)]
73pub struct GelfSerializerConfig {
74    /// The GELF Serializer Options.
75    #[serde(default, rename = "gelf")]
76    pub options: GelfSerializerOptions,
77}
78
79impl GelfSerializerConfig {
80    /// Creates a new `GelfSerializerConfig`.
81    pub const fn new(options: GelfSerializerOptions) -> Self {
82        Self { options }
83    }
84
85    /// Build the `GelfSerializer` from this configuration.
86    pub fn build(&self) -> GelfSerializer {
87        GelfSerializer::new(self.options.clone())
88    }
89
90    /// The data type of events that are accepted by `GelfSerializer`.
91    pub fn input_type(&self) -> DataType {
92        DataType::Log
93    }
94
95    /// The schema required by the serializer.
96    pub fn schema_requirement(&self) -> schema::Requirement {
97        // While technically we support `Value` variants that can't be losslessly serialized to
98        // JSON, we don't want to enforce that limitation to users yet.
99        schema::Requirement::empty()
100    }
101}
102
103/// Serializer that converts an `Event` to bytes using the GELF format.
104/// Spec: <https://docs.graylog.org/docs/gelf>
105#[derive(Debug, Clone)]
106pub struct GelfSerializer {
107    options: GelfSerializerOptions,
108}
109
110impl GelfSerializer {
111    /// Creates a new `GelfSerializer`.
112    pub fn new(options: GelfSerializerOptions) -> Self {
113        GelfSerializer { options }
114    }
115
116    /// Encode event and represent it as JSON value.
117    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
118        // input_type() restricts the event type to LogEvents
119        let log = to_gelf_event(event.into_log())?;
120        serde_json::to_value(&log).map_err(|e| e.to_string().into())
121    }
122
123    /// Instantiates the GELF chunking configuration.
124    pub fn chunker(&self) -> GelfChunker {
125        GelfChunker {
126            max_chunk_size: self.options.max_chunk_size,
127        }
128    }
129}
130
131impl Encoder<Event> for GelfSerializer {
132    type Error = vector_common::Error;
133
134    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
135        let log = to_gelf_event(event.into_log())?;
136        let writer = buffer.writer();
137        serde_json::to_writer(writer, &log)?;
138        Ok(())
139    }
140}
141
142/// Returns Error for invalid type.
143fn err_invalid_type(
144    field: &str,
145    expected_type: &str,
146    actual_type: &str,
147) -> vector_common::Result<()> {
148    InvalidValueTypeSnafu {
149        field,
150        actual_type,
151        expected_type,
152    }
153    .fail()
154    .map_err(|e| e.to_string().into())
155}
156
157/// Validates that the GELF required fields exist in the event, coercing in some cases.
158fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent> {
159    // returns Error for missing field
160    fn err_missing_field(field: &str) -> vector_common::Result<()> {
161        MissingFieldSnafu { field }
162            .fail()
163            .map_err(|e| e.to_string().into())
164    }
165
166    // add the VERSION if it does not exist
167    if !log.contains(&GELF_TARGET_PATHS.version) {
168        log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
169    }
170
171    if !log.contains(&GELF_TARGET_PATHS.host) {
172        err_missing_field(HOST)?;
173    }
174
175    if !log.contains(&GELF_TARGET_PATHS.short_message)
176        && let Some(message_key) = log_schema().message_key_target_path()
177    {
178        if log.contains(message_key) {
179            log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
180        } else {
181            err_missing_field(SHORT_MESSAGE)?;
182        }
183    }
184    Ok(log)
185}
186
187/// Validates rules for field names and value types, coercing in some cases.
188fn coerce_field_names_and_values(
189    mut log: LogEvent,
190) -> vector_common::Result<(LogEvent, Vec<String>)> {
191    let mut missing_prefix = vec![];
192    if let Some(event_data) = log.as_map_mut() {
193        for (field, value) in event_data.iter_mut() {
194            match field.as_str() {
195                VERSION | HOST | SHORT_MESSAGE | FULL_MESSAGE | FACILITY | FILE => {
196                    if !value.is_bytes() {
197                        err_invalid_type(field, "UTF-8 string", value.kind_str())?;
198                    }
199                }
200                TIMESTAMP => {
201                    if !(value.is_timestamp() || value.is_integer()) {
202                        err_invalid_type(field, "timestamp or integer", value.kind_str())?;
203                    }
204
205                    // convert a `Value::Timestamp` to a GELF specified timestamp where milliseconds are represented by the fractional part of a float.
206                    if let Value::Timestamp(ts) = value {
207                        let ts_millis = ts.timestamp_millis();
208                        if ts_millis % 1000 != 0 {
209                            // i64 to f64 / 1000.0 will never be NaN
210                            *value = Value::Float(
211                                NotNan::new(ts_millis as f64 / 1000.0)
212                                    .expect("i64 -> f64 produced NaN"),
213                            );
214                        } else {
215                            // keep full range of representable time if no milliseconds are set
216                            // but still convert to numeric according to GELF protocol
217                            *value = Value::Integer(ts.timestamp())
218                        }
219                    }
220                }
221                LEVEL => {
222                    if !value.is_integer() {
223                        err_invalid_type(field, "integer", value.kind_str())?;
224                    }
225                }
226                LINE => {
227                    if !(value.is_float() || value.is_integer()) {
228                        err_invalid_type(field, "number", value.kind_str())?;
229                    }
230                }
231                _ => {
232                    // additional fields must be only word chars, dashes and periods.
233                    if !VALID_FIELD_REGEX.is_match(field) {
234                        return InvalidFieldSnafu {
235                            field: field.clone(),
236                            pattern: VALID_FIELD_REGEX.to_string(),
237                        }
238                        .fail()
239                        .map_err(|e| e.to_string().into());
240                    }
241
242                    // additional field values must be only strings or numbers
243                    if !(value.is_integer() || value.is_float() || value.is_bytes()) {
244                        err_invalid_type(field, "string or number", value.kind_str())?;
245                    }
246
247                    // Additional fields must be prefixed with underscores.
248                    // Prepending the underscore since vector adds fields such as 'source_type'
249                    // which would otherwise throw errors.
250                    if !field.is_empty() && !field.starts_with('_') {
251                        // flag the field as missing prefix to be modified later
252                        missing_prefix.push(field.to_string());
253                    }
254                }
255            }
256        }
257    }
258    Ok((log, missing_prefix))
259}
260
261/// Validate if the input log event is valid GELF, potentially coercing the event into valid GELF.
262fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
263    let log = coerce_required_fields(log).and_then(|log| {
264        coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
265            // rename additional fields that were flagged as missing the underscore prefix
266            for field in missing_prefix {
267                log.rename_key(
268                    event_path!(field.as_str()),
269                    event_path!(format!("_{}", &field).as_str()),
270                );
271            }
272            log
273        })
274    })?;
275
276    Ok(log)
277}
278
279#[cfg(test)]
280mod tests {
281    use chrono::NaiveDateTime;
282    use vector_core::event::{Event, EventMetadata};
283    use vrl::{
284        btreemap,
285        value::{ObjectMap, Value},
286    };
287
288    use super::*;
289    use crate::encoding::SerializerConfig;
290
291    fn do_serialize(expect_success: bool, event_fields: ObjectMap) -> Option<serde_json::Value> {
292        let config = GelfSerializerConfig::new(GelfSerializerOptions::default());
293        let mut serializer = config.build();
294        let event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
295        let mut buffer = BytesMut::new();
296
297        if expect_success {
298            assert!(serializer.encode(event, &mut buffer).is_ok());
299            let buffer_str = std::str::from_utf8(&buffer).unwrap();
300            let result = serde_json::from_str(buffer_str);
301            assert!(result.is_ok());
302            Some(result.unwrap())
303        } else {
304            assert!(serializer.encode(event, &mut buffer).is_err());
305            None
306        }
307    }
308
309    #[test]
310    fn gelf_serde_json_to_value_supported_success() {
311        let serializer = SerializerConfig::Gelf(Default::default()).build().unwrap();
312
313        let event_fields = btreemap! {
314            VERSION => "1.1",
315            HOST => "example.org",
316            SHORT_MESSAGE => "Some message",
317        };
318
319        let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
320        assert!(serializer.supports_json());
321        assert!(serializer.to_json_value(log_event).is_ok());
322    }
323
324    #[test]
325    fn gelf_serde_json_to_value_supported_failure_to_encode() {
326        let serializer = SerializerConfig::Gelf(Default::default()).build().unwrap();
327        let event_fields = btreemap! {};
328        let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
329        assert!(serializer.supports_json());
330        assert!(serializer.to_json_value(log_event).is_err());
331    }
332
333    #[test]
334    fn gelf_serializing_valid() {
335        let event_fields = btreemap! {
336            VERSION => "1.1",
337            HOST => "example.org",
338            SHORT_MESSAGE => "Some message",
339            FULL_MESSAGE => "Even more message",
340            FACILITY => "",
341            FILE => "/tmp/foobar",
342            LINE => Value::Float(ordered_float::NotNan::new(1.5).unwrap()),
343            LEVEL => 5,
344        };
345
346        let jsn = do_serialize(true, event_fields).unwrap();
347
348        assert_eq!(jsn.get(VERSION).unwrap(), "1.1");
349        assert_eq!(jsn.get(HOST).unwrap(), "example.org");
350        assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
351    }
352
353    #[test]
354    fn gelf_serializing_coerced() {
355        // no underscore
356        {
357            let event_fields = btreemap! {
358                VERSION => "1.1",
359                HOST => "example.org",
360                SHORT_MESSAGE => "Some message",
361                "noUnderScore" => 0,
362            };
363
364            let jsn = do_serialize(true, event_fields).unwrap();
365            assert_eq!(jsn.get("_noUnderScore").unwrap(), 0);
366        }
367
368        // "message" => SHORT_MESSAGE
369        {
370            let event_fields = btreemap! {
371                VERSION => "1.1",
372                HOST => "example.org",
373                log_schema().message_key().unwrap().to_string() => "Some message",
374            };
375
376            let jsn = do_serialize(true, event_fields).unwrap();
377            assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
378        }
379    }
380
381    #[test]
382    fn gelf_serializing_timestamp() {
383        // floating point in case of sub second timestamp
384        {
385            let naive_dt =
386                NaiveDateTime::parse_from_str("1970-01-01 00:00:00.1", "%Y-%m-%d %H:%M:%S%.f");
387            let dt = naive_dt.unwrap().and_utc();
388
389            let event_fields = btreemap! {
390                VERSION => "1.1",
391                SHORT_MESSAGE => "Some message",
392                HOST => "example.org",
393                TIMESTAMP => dt,
394            };
395
396            let jsn = do_serialize(true, event_fields).unwrap();
397            assert!(jsn.get(TIMESTAMP).unwrap().is_f64());
398            assert_eq!(jsn.get(TIMESTAMP).unwrap().as_f64().unwrap(), 0.1,);
399        }
400
401        // integer in case of no sub second timestamp
402        {
403            let naive_dt =
404                NaiveDateTime::parse_from_str("1970-01-01 00:00:00.0", "%Y-%m-%d %H:%M:%S%.f");
405            let dt = naive_dt.unwrap().and_utc();
406
407            let event_fields = btreemap! {
408                VERSION => "1.1",
409                SHORT_MESSAGE => "Some message",
410                HOST => "example.org",
411                TIMESTAMP => dt,
412            };
413
414            let jsn = do_serialize(true, event_fields).unwrap();
415            assert!(jsn.get(TIMESTAMP).unwrap().is_i64());
416            assert_eq!(jsn.get(TIMESTAMP).unwrap().as_i64().unwrap(), 0);
417        }
418    }
419
420    #[test]
421    fn gelf_serializing_invalid_error() {
422        // no host
423        {
424            let event_fields = btreemap! {
425                VERSION => "1.1",
426                SHORT_MESSAGE => "Some message",
427            };
428            do_serialize(false, event_fields);
429        }
430        // no message
431        {
432            let event_fields = btreemap! {
433                HOST => "example.org",
434                VERSION => "1.1",
435            };
436            do_serialize(false, event_fields);
437        }
438        // expected string
439        {
440            let event_fields = btreemap! {
441                HOST => "example.org",
442                VERSION => "1.1",
443                SHORT_MESSAGE => 0,
444            };
445            do_serialize(false, event_fields);
446        }
447        // expected integer
448        {
449            let event_fields = btreemap! {
450                HOST => "example.org",
451                VERSION => "1.1",
452                SHORT_MESSAGE => "Some message",
453                LEVEL => "1",
454            };
455            do_serialize(false, event_fields);
456        }
457        // expected float
458        {
459            let event_fields = btreemap! {
460                HOST => "example.org",
461                VERSION => "1.1",
462                SHORT_MESSAGE => "Some message",
463                LINE => "1.2",
464            };
465            do_serialize(false, event_fields);
466        }
467        // invalid field name
468        {
469            let event_fields = btreemap! {
470                HOST => "example.org",
471                VERSION => "1.1",
472                SHORT_MESSAGE => "Some message",
473                "invalid%field" => "foo",
474            };
475            do_serialize(false, event_fields);
476        }
477        // invalid additional value type - bool
478        {
479            let event_fields = btreemap! {
480                HOST => "example.org",
481                VERSION => "1.1",
482                SHORT_MESSAGE => "Some message",
483                "_foobar" => false,
484            };
485            do_serialize(false, event_fields);
486        }
487        // invalid additional value type - null
488        {
489            let event_fields = btreemap! {
490                HOST => "example.org",
491                VERSION => "1.1",
492                SHORT_MESSAGE => "Some message",
493                "_foobar" => serde_json::Value::Null,
494            };
495            do_serialize(false, event_fields);
496        }
497    }
498}