codecs/encoding/format/
gelf.rs

1use crate::gelf::GELF_TARGET_PATHS;
2use crate::{gelf_fields::*, VALID_FIELD_REGEX};
3use bytes::{BufMut, BytesMut};
4use lookup::event_path;
5use ordered_float::NotNan;
6use serde::{Deserialize, Serialize};
7use snafu::Snafu;
8use tokio_util::codec::Encoder;
9use vector_core::{
10    config::{log_schema, DataType},
11    event::{Event, KeyString, LogEvent, Value},
12    schema,
13};
14
15/// On GELF encoding behavior:
16///   Graylog has a relaxed parsing. They are much more lenient than the spec would
17///   suggest. We've elected to take a more strict approach to maintain backwards compatibility
18///   in the event that we need to change the behavior to be more relaxed, so that prior versions
19///   of vector will still work.
20///   The exception is that if 'Additional fields' are found to be missing an underscore prefix and
21///   are otherwise valid field names, we prepend the underscore.
22///
23/// Errors that can occur during GELF serialization.
24#[derive(Debug, Snafu)]
25pub enum GelfSerializerError {
26    #[snafu(display(r#"LogEvent does not contain required field: "{}""#, field))]
27    MissingField { field: KeyString },
28    #[snafu(display(
29        r#"LogEvent contains field with invalid name not matching pattern '{}': "{}""#,
30        pattern,
31        field,
32    ))]
33    InvalidField { field: KeyString, pattern: String },
34    #[snafu(display(
35        r#"LogEvent contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#,
36        field,
37        actual_type,
38        expected_type
39    ))]
40    InvalidValueType {
41        field: String,
42        actual_type: String,
43        expected_type: String,
44    },
45}
46
47/// Config used to build a `GelfSerializer`.
48#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct GelfSerializerConfig;
50
51impl GelfSerializerConfig {
52    /// Creates a new `GelfSerializerConfig`.
53    pub const fn new() -> Self {
54        Self
55    }
56
57    /// Build the `GelfSerializer` from this configuration.
58    pub fn build(&self) -> GelfSerializer {
59        GelfSerializer::new()
60    }
61
62    /// The data type of events that are accepted by `GelfSerializer`.
63    pub fn input_type() -> DataType {
64        DataType::Log
65    }
66
67    /// The schema required by the serializer.
68    pub fn schema_requirement() -> schema::Requirement {
69        // While technically we support `Value` variants that can't be losslessly serialized to
70        // JSON, we don't want to enforce that limitation to users yet.
71        schema::Requirement::empty()
72    }
73}
74
75/// Serializer that converts an `Event` to bytes using the GELF format.
76/// Spec: <https://docs.graylog.org/docs/gelf>
77#[derive(Debug, Clone)]
78pub struct GelfSerializer;
79
80impl GelfSerializer {
81    /// Creates a new `GelfSerializer`.
82    pub fn new() -> Self {
83        GelfSerializer
84    }
85
86    /// Encode event and represent it as JSON value.
87    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
88        // input_type() restricts the event type to LogEvents
89        let log = to_gelf_event(event.into_log())?;
90        serde_json::to_value(&log).map_err(|e| e.to_string().into())
91    }
92}
93
94impl Default for GelfSerializer {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100impl Encoder<Event> for GelfSerializer {
101    type Error = vector_common::Error;
102
103    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
104        let log = to_gelf_event(event.into_log())?;
105        let writer = buffer.writer();
106        serde_json::to_writer(writer, &log)?;
107        Ok(())
108    }
109}
110
111/// Returns Error for invalid type.
112fn err_invalid_type(
113    field: &str,
114    expected_type: &str,
115    actual_type: &str,
116) -> vector_common::Result<()> {
117    InvalidValueTypeSnafu {
118        field,
119        actual_type,
120        expected_type,
121    }
122    .fail()
123    .map_err(|e| e.to_string().into())
124}
125
126/// Validates that the GELF required fields exist in the event, coercing in some cases.
127fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent> {
128    // returns Error for missing field
129    fn err_missing_field(field: &str) -> vector_common::Result<()> {
130        MissingFieldSnafu { field }
131            .fail()
132            .map_err(|e| e.to_string().into())
133    }
134
135    // add the VERSION if it does not exist
136    if !log.contains(&GELF_TARGET_PATHS.version) {
137        log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
138    }
139
140    if !log.contains(&GELF_TARGET_PATHS.host) {
141        err_missing_field(HOST)?;
142    }
143
144    if !log.contains(&GELF_TARGET_PATHS.short_message) {
145        if let Some(message_key) = log_schema().message_key_target_path() {
146            if log.contains(message_key) {
147                log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
148            } else {
149                err_missing_field(SHORT_MESSAGE)?;
150            }
151        }
152    }
153    Ok(log)
154}
155
156/// Validates rules for field names and value types, coercing in some cases.
157fn coerce_field_names_and_values(
158    mut log: LogEvent,
159) -> vector_common::Result<(LogEvent, Vec<String>)> {
160    let mut missing_prefix = vec![];
161    if let Some(event_data) = log.as_map_mut() {
162        for (field, value) in event_data.iter_mut() {
163            match field.as_str() {
164                VERSION | HOST | SHORT_MESSAGE | FULL_MESSAGE | FACILITY | FILE => {
165                    if !value.is_bytes() {
166                        err_invalid_type(field, "UTF-8 string", value.kind_str())?;
167                    }
168                }
169                TIMESTAMP => {
170                    if !(value.is_timestamp() || value.is_integer()) {
171                        err_invalid_type(field, "timestamp or integer", value.kind_str())?;
172                    }
173
174                    // convert a `Value::Timestamp` to a GELF specified timestamp where milliseconds are represented by the fractional part of a float.
175                    if let Value::Timestamp(ts) = value {
176                        let ts_millis = ts.timestamp_millis();
177                        if ts_millis % 1000 != 0 {
178                            *value = Value::Float(NotNan::new(ts_millis as f64 / 1000.0).unwrap());
179                        } else {
180                            // keep full range of representable time if no milliseconds are set
181                            // but still convert to numeric according to GELF protocol
182                            *value = Value::Integer(ts.timestamp())
183                        }
184                    }
185                }
186                LEVEL => {
187                    if !value.is_integer() {
188                        err_invalid_type(field, "integer", value.kind_str())?;
189                    }
190                }
191                LINE => {
192                    if !(value.is_float() || value.is_integer()) {
193                        err_invalid_type(field, "number", value.kind_str())?;
194                    }
195                }
196                _ => {
197                    // additional fields must be only word chars, dashes and periods.
198                    if !VALID_FIELD_REGEX.is_match(field) {
199                        return InvalidFieldSnafu {
200                            field: field.clone(),
201                            pattern: VALID_FIELD_REGEX.to_string(),
202                        }
203                        .fail()
204                        .map_err(|e| e.to_string().into());
205                    }
206
207                    // additional field values must be only strings or numbers
208                    if !(value.is_integer() || value.is_float() || value.is_bytes()) {
209                        err_invalid_type(field, "string or number", value.kind_str())?;
210                    }
211
212                    // Additional fields must be prefixed with underscores.
213                    // Prepending the underscore since vector adds fields such as 'source_type'
214                    // which would otherwise throw errors.
215                    if !field.is_empty() && !field.starts_with('_') {
216                        // flag the field as missing prefix to be modified later
217                        missing_prefix.push(field.to_string());
218                    }
219                }
220            }
221        }
222    }
223    Ok((log, missing_prefix))
224}
225
226/// Validate if the input log event is valid GELF, potentially coercing the event into valid GELF.
227fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
228    let log = coerce_required_fields(log).and_then(|log| {
229        coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
230            // rename additional fields that were flagged as missing the underscore prefix
231            for field in missing_prefix {
232                log.rename_key(
233                    event_path!(field.as_str()),
234                    event_path!(format!("_{}", &field).as_str()),
235                );
236            }
237            log
238        })
239    })?;
240
241    Ok(log)
242}
243
244#[cfg(test)]
245mod tests {
246    use crate::encoding::SerializerConfig;
247
248    use super::*;
249    use chrono::NaiveDateTime;
250    use vector_core::event::{Event, EventMetadata};
251    use vrl::btreemap;
252    use vrl::value::{ObjectMap, Value};
253
254    fn do_serialize(expect_success: bool, event_fields: ObjectMap) -> Option<serde_json::Value> {
255        let config = GelfSerializerConfig::new();
256        let mut serializer = config.build();
257        let event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
258        let mut buffer = BytesMut::new();
259
260        if expect_success {
261            assert!(serializer.encode(event, &mut buffer).is_ok());
262            let buffer_str = std::str::from_utf8(&buffer).unwrap();
263            let result = serde_json::from_str(buffer_str);
264            assert!(result.is_ok());
265            Some(result.unwrap())
266        } else {
267            assert!(serializer.encode(event, &mut buffer).is_err());
268            None
269        }
270    }
271
272    #[test]
273    fn gelf_serde_json_to_value_supported_success() {
274        let serializer = SerializerConfig::Gelf.build().unwrap();
275
276        let event_fields = btreemap! {
277            VERSION => "1.1",
278            HOST => "example.org",
279            SHORT_MESSAGE => "Some message",
280        };
281
282        let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
283        assert!(serializer.supports_json());
284        assert!(serializer.to_json_value(log_event).is_ok());
285    }
286
287    #[test]
288    fn gelf_serde_json_to_value_supported_failure_to_encode() {
289        let serializer = SerializerConfig::Gelf.build().unwrap();
290        let event_fields = btreemap! {};
291        let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
292        assert!(serializer.supports_json());
293        assert!(serializer.to_json_value(log_event).is_err());
294    }
295
296    #[test]
297    fn gelf_serializing_valid() {
298        let event_fields = btreemap! {
299            VERSION => "1.1",
300            HOST => "example.org",
301            SHORT_MESSAGE => "Some message",
302            FULL_MESSAGE => "Even more message",
303            FACILITY => "",
304            FILE => "/tmp/foobar",
305            LINE => Value::Float(ordered_float::NotNan::new(1.5).unwrap()),
306            LEVEL => 5,
307        };
308
309        let jsn = do_serialize(true, event_fields).unwrap();
310
311        assert_eq!(jsn.get(VERSION).unwrap(), "1.1");
312        assert_eq!(jsn.get(HOST).unwrap(), "example.org");
313        assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
314    }
315
316    #[test]
317    fn gelf_serializing_coerced() {
318        // no underscore
319        {
320            let event_fields = btreemap! {
321                VERSION => "1.1",
322                HOST => "example.org",
323                SHORT_MESSAGE => "Some message",
324                "noUnderScore" => 0,
325            };
326
327            let jsn = do_serialize(true, event_fields).unwrap();
328            assert_eq!(jsn.get("_noUnderScore").unwrap(), 0);
329        }
330
331        // "message" => SHORT_MESSAGE
332        {
333            let event_fields = btreemap! {
334                VERSION => "1.1",
335                HOST => "example.org",
336                log_schema().message_key().unwrap().to_string() => "Some message",
337            };
338
339            let jsn = do_serialize(true, event_fields).unwrap();
340            assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
341        }
342    }
343
344    #[test]
345    fn gelf_serializing_timestamp() {
346        // floating point in case of sub second timestamp
347        {
348            let naive_dt =
349                NaiveDateTime::parse_from_str("1970-01-01 00:00:00.1", "%Y-%m-%d %H:%M:%S%.f");
350            let dt = naive_dt.unwrap().and_utc();
351
352            let event_fields = btreemap! {
353                VERSION => "1.1",
354                SHORT_MESSAGE => "Some message",
355                HOST => "example.org",
356                TIMESTAMP => dt,
357            };
358
359            let jsn = do_serialize(true, event_fields).unwrap();
360            assert!(jsn.get(TIMESTAMP).unwrap().is_f64());
361            assert_eq!(jsn.get(TIMESTAMP).unwrap().as_f64().unwrap(), 0.1,);
362        }
363
364        // integer in case of no sub second timestamp
365        {
366            let naive_dt =
367                NaiveDateTime::parse_from_str("1970-01-01 00:00:00.0", "%Y-%m-%d %H:%M:%S%.f");
368            let dt = naive_dt.unwrap().and_utc();
369
370            let event_fields = btreemap! {
371                VERSION => "1.1",
372                SHORT_MESSAGE => "Some message",
373                HOST => "example.org",
374                TIMESTAMP => dt,
375            };
376
377            let jsn = do_serialize(true, event_fields).unwrap();
378            assert!(jsn.get(TIMESTAMP).unwrap().is_i64());
379            assert_eq!(jsn.get(TIMESTAMP).unwrap().as_i64().unwrap(), 0);
380        }
381    }
382
383    #[test]
384    fn gelf_serializing_invalid_error() {
385        // no host
386        {
387            let event_fields = btreemap! {
388                VERSION => "1.1",
389                SHORT_MESSAGE => "Some message",
390            };
391            do_serialize(false, event_fields);
392        }
393        // no message
394        {
395            let event_fields = btreemap! {
396                HOST => "example.org",
397                VERSION => "1.1",
398            };
399            do_serialize(false, event_fields);
400        }
401        // expected string
402        {
403            let event_fields = btreemap! {
404                HOST => "example.org",
405                VERSION => "1.1",
406                SHORT_MESSAGE => 0,
407            };
408            do_serialize(false, event_fields);
409        }
410        // expected integer
411        {
412            let event_fields = btreemap! {
413                HOST => "example.org",
414                VERSION => "1.1",
415                SHORT_MESSAGE => "Some message",
416                LEVEL => "1",
417            };
418            do_serialize(false, event_fields);
419        }
420        // expected float
421        {
422            let event_fields = btreemap! {
423                HOST => "example.org",
424                VERSION => "1.1",
425                SHORT_MESSAGE => "Some message",
426                LINE => "1.2",
427            };
428            do_serialize(false, event_fields);
429        }
430        // invalid field name
431        {
432            let event_fields = btreemap! {
433                HOST => "example.org",
434                VERSION => "1.1",
435                SHORT_MESSAGE => "Some message",
436                "invalid%field" => "foo",
437            };
438            do_serialize(false, event_fields);
439        }
440        // invalid additional value type - bool
441        {
442            let event_fields = btreemap! {
443                HOST => "example.org",
444                VERSION => "1.1",
445                SHORT_MESSAGE => "Some message",
446                "_foobar" => false,
447            };
448            do_serialize(false, event_fields);
449        }
450        // invalid additional value type - null
451        {
452            let event_fields = btreemap! {
453                HOST => "example.org",
454                VERSION => "1.1",
455                SHORT_MESSAGE => "Some message",
456                "_foobar" => serde_json::Value::Null,
457            };
458            do_serialize(false, event_fields);
459        }
460    }
461}