codecs/encoding/format/
gelf.rs

1use crate::{VALID_FIELD_REGEX, encoding::GelfChunker, gelf::GELF_TARGET_PATHS, gelf_fields::*};
2use bytes::{BufMut, BytesMut};
3use lookup::event_path;
4use ordered_float::NotNan;
5use snafu::Snafu;
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9    config::{DataType, log_schema},
10    event::{Event, KeyString, LogEvent, Value},
11    schema,
12};
13
14/// Config used to build a `GelfSerializer`.
15#[configurable_component]
16#[derive(Debug, Clone)]
17pub struct GelfSerializerOptions {
18    /// Maximum size for each GELF chunked datagram (including 12-byte header).
19    /// Chunking starts when datagrams exceed this size.
20    /// For Graylog target, keep at or below 8192 bytes; for Vector target (`gelf` decoding with `chunked_gelf` framing), up to 65,500 bytes is recommended.
21    #[configurable(validation(range(min = 13)))]
22    #[serde(default = "default_max_chunk_size")]
23    pub max_chunk_size: usize,
24}
25
26const fn default_max_chunk_size() -> usize {
27    8192
28}
29
30impl Default for GelfSerializerOptions {
31    fn default() -> Self {
32        Self {
33            max_chunk_size: default_max_chunk_size(),
34        }
35    }
36}
37
38/// On GELF encoding behavior:
39///   Graylog has a relaxed parsing. They are much more lenient than the spec would
40///   suggest. We've elected to take a more strict approach to maintain backwards compatibility
41///   in the event that we need to change the behavior to be more relaxed, so that prior versions
42///   of vector will still work.
43///   The exception is that if 'Additional fields' are found to be missing an underscore prefix and
44///   are otherwise valid field names, we prepend the underscore.
45///
46/// Errors that can occur during GELF serialization.
47#[derive(Debug, Snafu)]
48pub enum GelfSerializerError {
49    #[snafu(display(r#"LogEvent does not contain required field: "{}""#, field))]
50    MissingField { field: KeyString },
51    #[snafu(display(
52        r#"LogEvent contains field with invalid name not matching pattern '{}': "{}""#,
53        pattern,
54        field,
55    ))]
56    InvalidField { field: KeyString, pattern: String },
57    #[snafu(display(
58        r#"LogEvent contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#,
59        field,
60        actual_type,
61        expected_type
62    ))]
63    InvalidValueType {
64        field: String,
65        actual_type: String,
66        expected_type: String,
67    },
68}
69
70/// Config used to build a `GelfSerializer`.
71#[configurable_component]
72#[derive(Debug, Clone, Default)]
73pub struct GelfSerializerConfig {
74    /// The GELF Serializer Options.
75    #[serde(default, rename = "gelf")]
76    pub options: GelfSerializerOptions,
77}
78
79impl GelfSerializerConfig {
80    /// Creates a new `GelfSerializerConfig`.
81    pub const fn new(options: GelfSerializerOptions) -> Self {
82        Self { options }
83    }
84
85    /// Build the `GelfSerializer` from this configuration.
86    pub fn build(&self) -> GelfSerializer {
87        GelfSerializer::new(self.options.clone())
88    }
89
90    /// The data type of events that are accepted by `GelfSerializer`.
91    pub fn input_type(&self) -> DataType {
92        DataType::Log
93    }
94
95    /// The schema required by the serializer.
96    pub fn schema_requirement(&self) -> schema::Requirement {
97        // While technically we support `Value` variants that can't be losslessly serialized to
98        // JSON, we don't want to enforce that limitation to users yet.
99        schema::Requirement::empty()
100    }
101}
102
103/// Serializer that converts an `Event` to bytes using the GELF format.
104/// Spec: <https://docs.graylog.org/docs/gelf>
105#[derive(Debug, Clone)]
106pub struct GelfSerializer {
107    options: GelfSerializerOptions,
108}
109
110impl GelfSerializer {
111    /// Creates a new `GelfSerializer`.
112    pub fn new(options: GelfSerializerOptions) -> Self {
113        GelfSerializer { options }
114    }
115
116    /// Encode event and represent it as JSON value.
117    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
118        // input_type() restricts the event type to LogEvents
119        let log = to_gelf_event(event.into_log())?;
120        serde_json::to_value(&log).map_err(|e| e.to_string().into())
121    }
122
123    /// Instantiates the GELF chunking configuration.
124    pub fn chunker(&self) -> GelfChunker {
125        GelfChunker {
126            max_chunk_size: self.options.max_chunk_size,
127        }
128    }
129}
130
131impl Encoder<Event> for GelfSerializer {
132    type Error = vector_common::Error;
133
134    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
135        let log = to_gelf_event(event.into_log())?;
136        let writer = buffer.writer();
137        serde_json::to_writer(writer, &log)?;
138        Ok(())
139    }
140}
141
142/// Returns Error for invalid type.
143fn err_invalid_type(
144    field: &str,
145    expected_type: &str,
146    actual_type: &str,
147) -> vector_common::Result<()> {
148    InvalidValueTypeSnafu {
149        field,
150        actual_type,
151        expected_type,
152    }
153    .fail()
154    .map_err(|e| e.to_string().into())
155}
156
157/// Validates that the GELF required fields exist in the event, coercing in some cases.
158fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent> {
159    // returns Error for missing field
160    fn err_missing_field(field: &str) -> vector_common::Result<()> {
161        MissingFieldSnafu { field }
162            .fail()
163            .map_err(|e| e.to_string().into())
164    }
165
166    // add the VERSION if it does not exist
167    if !log.contains(&GELF_TARGET_PATHS.version) {
168        log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
169    }
170
171    if !log.contains(&GELF_TARGET_PATHS.host) {
172        err_missing_field(HOST)?;
173    }
174
175    if !log.contains(&GELF_TARGET_PATHS.short_message)
176        && let Some(message_key) = log_schema().message_key_target_path()
177    {
178        if log.contains(message_key) {
179            log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
180        } else {
181            err_missing_field(SHORT_MESSAGE)?;
182        }
183    }
184    Ok(log)
185}
186
187/// Validates rules for field names and value types, coercing in some cases.
188fn coerce_field_names_and_values(
189    mut log: LogEvent,
190) -> vector_common::Result<(LogEvent, Vec<String>)> {
191    let mut missing_prefix = vec![];
192    if let Some(event_data) = log.as_map_mut() {
193        for (field, value) in event_data.iter_mut() {
194            match field.as_str() {
195                VERSION | HOST | SHORT_MESSAGE | FULL_MESSAGE | FACILITY | FILE => {
196                    if !value.is_bytes() {
197                        err_invalid_type(field, "UTF-8 string", value.kind_str())?;
198                    }
199                }
200                TIMESTAMP => {
201                    if !(value.is_timestamp() || value.is_integer()) {
202                        err_invalid_type(field, "timestamp or integer", value.kind_str())?;
203                    }
204
205                    // convert a `Value::Timestamp` to a GELF specified timestamp where milliseconds are represented by the fractional part of a float.
206                    if let Value::Timestamp(ts) = value {
207                        let ts_millis = ts.timestamp_millis();
208                        if ts_millis % 1000 != 0 {
209                            *value = Value::Float(NotNan::new(ts_millis as f64 / 1000.0).unwrap());
210                        } else {
211                            // keep full range of representable time if no milliseconds are set
212                            // but still convert to numeric according to GELF protocol
213                            *value = Value::Integer(ts.timestamp())
214                        }
215                    }
216                }
217                LEVEL => {
218                    if !value.is_integer() {
219                        err_invalid_type(field, "integer", value.kind_str())?;
220                    }
221                }
222                LINE => {
223                    if !(value.is_float() || value.is_integer()) {
224                        err_invalid_type(field, "number", value.kind_str())?;
225                    }
226                }
227                _ => {
228                    // additional fields must be only word chars, dashes and periods.
229                    if !VALID_FIELD_REGEX.is_match(field) {
230                        return InvalidFieldSnafu {
231                            field: field.clone(),
232                            pattern: VALID_FIELD_REGEX.to_string(),
233                        }
234                        .fail()
235                        .map_err(|e| e.to_string().into());
236                    }
237
238                    // additional field values must be only strings or numbers
239                    if !(value.is_integer() || value.is_float() || value.is_bytes()) {
240                        err_invalid_type(field, "string or number", value.kind_str())?;
241                    }
242
243                    // Additional fields must be prefixed with underscores.
244                    // Prepending the underscore since vector adds fields such as 'source_type'
245                    // which would otherwise throw errors.
246                    if !field.is_empty() && !field.starts_with('_') {
247                        // flag the field as missing prefix to be modified later
248                        missing_prefix.push(field.to_string());
249                    }
250                }
251            }
252        }
253    }
254    Ok((log, missing_prefix))
255}
256
257/// Validate if the input log event is valid GELF, potentially coercing the event into valid GELF.
258fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
259    let log = coerce_required_fields(log).and_then(|log| {
260        coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
261            // rename additional fields that were flagged as missing the underscore prefix
262            for field in missing_prefix {
263                log.rename_key(
264                    event_path!(field.as_str()),
265                    event_path!(format!("_{}", &field).as_str()),
266                );
267            }
268            log
269        })
270    })?;
271
272    Ok(log)
273}
274
275#[cfg(test)]
276mod tests {
277    use chrono::NaiveDateTime;
278    use vector_core::event::{Event, EventMetadata};
279    use vrl::{
280        btreemap,
281        value::{ObjectMap, Value},
282    };
283
284    use super::*;
285    use crate::encoding::SerializerConfig;
286
287    fn do_serialize(expect_success: bool, event_fields: ObjectMap) -> Option<serde_json::Value> {
288        let config = GelfSerializerConfig::new(GelfSerializerOptions::default());
289        let mut serializer = config.build();
290        let event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
291        let mut buffer = BytesMut::new();
292
293        if expect_success {
294            assert!(serializer.encode(event, &mut buffer).is_ok());
295            let buffer_str = std::str::from_utf8(&buffer).unwrap();
296            let result = serde_json::from_str(buffer_str);
297            assert!(result.is_ok());
298            Some(result.unwrap())
299        } else {
300            assert!(serializer.encode(event, &mut buffer).is_err());
301            None
302        }
303    }
304
305    #[test]
306    fn gelf_serde_json_to_value_supported_success() {
307        let serializer = SerializerConfig::Gelf(Default::default()).build().unwrap();
308
309        let event_fields = btreemap! {
310            VERSION => "1.1",
311            HOST => "example.org",
312            SHORT_MESSAGE => "Some message",
313        };
314
315        let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
316        assert!(serializer.supports_json());
317        assert!(serializer.to_json_value(log_event).is_ok());
318    }
319
320    #[test]
321    fn gelf_serde_json_to_value_supported_failure_to_encode() {
322        let serializer = SerializerConfig::Gelf(Default::default()).build().unwrap();
323        let event_fields = btreemap! {};
324        let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
325        assert!(serializer.supports_json());
326        assert!(serializer.to_json_value(log_event).is_err());
327    }
328
329    #[test]
330    fn gelf_serializing_valid() {
331        let event_fields = btreemap! {
332            VERSION => "1.1",
333            HOST => "example.org",
334            SHORT_MESSAGE => "Some message",
335            FULL_MESSAGE => "Even more message",
336            FACILITY => "",
337            FILE => "/tmp/foobar",
338            LINE => Value::Float(ordered_float::NotNan::new(1.5).unwrap()),
339            LEVEL => 5,
340        };
341
342        let jsn = do_serialize(true, event_fields).unwrap();
343
344        assert_eq!(jsn.get(VERSION).unwrap(), "1.1");
345        assert_eq!(jsn.get(HOST).unwrap(), "example.org");
346        assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
347    }
348
349    #[test]
350    fn gelf_serializing_coerced() {
351        // no underscore
352        {
353            let event_fields = btreemap! {
354                VERSION => "1.1",
355                HOST => "example.org",
356                SHORT_MESSAGE => "Some message",
357                "noUnderScore" => 0,
358            };
359
360            let jsn = do_serialize(true, event_fields).unwrap();
361            assert_eq!(jsn.get("_noUnderScore").unwrap(), 0);
362        }
363
364        // "message" => SHORT_MESSAGE
365        {
366            let event_fields = btreemap! {
367                VERSION => "1.1",
368                HOST => "example.org",
369                log_schema().message_key().unwrap().to_string() => "Some message",
370            };
371
372            let jsn = do_serialize(true, event_fields).unwrap();
373            assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
374        }
375    }
376
377    #[test]
378    fn gelf_serializing_timestamp() {
379        // floating point in case of sub second timestamp
380        {
381            let naive_dt =
382                NaiveDateTime::parse_from_str("1970-01-01 00:00:00.1", "%Y-%m-%d %H:%M:%S%.f");
383            let dt = naive_dt.unwrap().and_utc();
384
385            let event_fields = btreemap! {
386                VERSION => "1.1",
387                SHORT_MESSAGE => "Some message",
388                HOST => "example.org",
389                TIMESTAMP => dt,
390            };
391
392            let jsn = do_serialize(true, event_fields).unwrap();
393            assert!(jsn.get(TIMESTAMP).unwrap().is_f64());
394            assert_eq!(jsn.get(TIMESTAMP).unwrap().as_f64().unwrap(), 0.1,);
395        }
396
397        // integer in case of no sub second timestamp
398        {
399            let naive_dt =
400                NaiveDateTime::parse_from_str("1970-01-01 00:00:00.0", "%Y-%m-%d %H:%M:%S%.f");
401            let dt = naive_dt.unwrap().and_utc();
402
403            let event_fields = btreemap! {
404                VERSION => "1.1",
405                SHORT_MESSAGE => "Some message",
406                HOST => "example.org",
407                TIMESTAMP => dt,
408            };
409
410            let jsn = do_serialize(true, event_fields).unwrap();
411            assert!(jsn.get(TIMESTAMP).unwrap().is_i64());
412            assert_eq!(jsn.get(TIMESTAMP).unwrap().as_i64().unwrap(), 0);
413        }
414    }
415
416    #[test]
417    fn gelf_serializing_invalid_error() {
418        // no host
419        {
420            let event_fields = btreemap! {
421                VERSION => "1.1",
422                SHORT_MESSAGE => "Some message",
423            };
424            do_serialize(false, event_fields);
425        }
426        // no message
427        {
428            let event_fields = btreemap! {
429                HOST => "example.org",
430                VERSION => "1.1",
431            };
432            do_serialize(false, event_fields);
433        }
434        // expected string
435        {
436            let event_fields = btreemap! {
437                HOST => "example.org",
438                VERSION => "1.1",
439                SHORT_MESSAGE => 0,
440            };
441            do_serialize(false, event_fields);
442        }
443        // expected integer
444        {
445            let event_fields = btreemap! {
446                HOST => "example.org",
447                VERSION => "1.1",
448                SHORT_MESSAGE => "Some message",
449                LEVEL => "1",
450            };
451            do_serialize(false, event_fields);
452        }
453        // expected float
454        {
455            let event_fields = btreemap! {
456                HOST => "example.org",
457                VERSION => "1.1",
458                SHORT_MESSAGE => "Some message",
459                LINE => "1.2",
460            };
461            do_serialize(false, event_fields);
462        }
463        // invalid field name
464        {
465            let event_fields = btreemap! {
466                HOST => "example.org",
467                VERSION => "1.1",
468                SHORT_MESSAGE => "Some message",
469                "invalid%field" => "foo",
470            };
471            do_serialize(false, event_fields);
472        }
473        // invalid additional value type - bool
474        {
475            let event_fields = btreemap! {
476                HOST => "example.org",
477                VERSION => "1.1",
478                SHORT_MESSAGE => "Some message",
479                "_foobar" => false,
480            };
481            do_serialize(false, event_fields);
482        }
483        // invalid additional value type - null
484        {
485            let event_fields = btreemap! {
486                HOST => "example.org",
487                VERSION => "1.1",
488                SHORT_MESSAGE => "Some message",
489                "_foobar" => serde_json::Value::Null,
490            };
491            do_serialize(false, event_fields);
492        }
493    }
494}