1use std::collections::HashMap;
2
3use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use derivative::Derivative;
6use lookup::{event_path, owned_value_path};
7use serde::{Deserialize, Serialize};
8use serde_with::{TimestampSecondsWithFrac, serde_as};
9use smallvec::{SmallVec, smallvec};
10use vector_config::configurable_component;
11use vector_core::{
12 config::{DataType, LogNamespace, log_schema},
13 event::{Event, LogEvent},
14 schema,
15};
16use vrl::value::{Kind, Value, kind::Collection};
17
18use super::{Deserializer, default_lossy};
19use crate::{VALID_FIELD_REGEX, gelf::GELF_TARGET_PATHS, gelf_fields::*};
20
21#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct GelfDeserializerConfig {
34 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
36 pub gelf: GelfDeserializerOptions,
37}
38
39impl GelfDeserializerConfig {
40 pub fn new(options: GelfDeserializerOptions) -> Self {
42 Self { gelf: options }
43 }
44
45 pub fn build(&self) -> GelfDeserializer {
47 GelfDeserializer {
48 lossy: self.gelf.lossy,
49 }
50 }
51
52 pub fn output_type(&self) -> DataType {
54 DataType::Log
55 }
56
57 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
59 schema::Definition::new_with_default_metadata(
60 Kind::object(Collection::empty()),
61 [log_namespace],
62 )
63 .with_event_field(&owned_value_path!(VERSION), Kind::bytes(), None)
64 .with_event_field(&owned_value_path!(HOST), Kind::bytes(), None)
65 .with_event_field(&owned_value_path!(SHORT_MESSAGE), Kind::bytes(), None)
66 .optional_field(&owned_value_path!(FULL_MESSAGE), Kind::bytes(), None)
67 .optional_field(&owned_value_path!(TIMESTAMP), Kind::timestamp(), None)
68 .optional_field(&owned_value_path!(LEVEL), Kind::integer(), None)
69 .optional_field(&owned_value_path!(FACILITY), Kind::bytes(), None)
70 .optional_field(&owned_value_path!(LINE), Kind::integer(), None)
71 .optional_field(&owned_value_path!(FILE), Kind::bytes(), None)
72 .unknown_fields(Kind::bytes().or_integer().or_float())
76 }
77}
78
79#[configurable_component]
81#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
82#[derivative(Default)]
83pub struct GelfDeserializerOptions {
84 #[serde(
90 default = "default_lossy",
91 skip_serializing_if = "vector_core::serde::is_default"
92 )]
93 #[derivative(Default(value = "default_lossy()"))]
94 pub lossy: bool,
95}
96
97#[derive(Debug, Clone, Derivative)]
99#[derivative(Default)]
100pub struct GelfDeserializer {
101 #[derivative(Default(value = "default_lossy()"))]
102 lossy: bool,
103}
104
105impl GelfDeserializer {
106 pub fn new(lossy: bool) -> GelfDeserializer {
108 GelfDeserializer { lossy }
109 }
110
111 fn message_to_event(&self, parsed: &GelfMessage) -> vector_common::Result<Event> {
114 let mut log = LogEvent::from_str_legacy(parsed.short_message.to_string());
115
116 if parsed.version != GELF_VERSION {
118 return Err(
119 format!("{VERSION} does not match GELF spec version ({GELF_VERSION})").into(),
120 );
121 }
122
123 log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
124 log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());
125
126 if let Some(full_message) = &parsed.full_message {
127 log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
128 }
129
130 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
131 if let Some(timestamp) = parsed.timestamp {
132 log.insert(timestamp_key, timestamp);
133 } else {
135 log.insert(timestamp_key, Utc::now());
136 }
137 }
138
139 if let Some(level) = parsed.level {
140 log.insert(&GELF_TARGET_PATHS.level, level);
141 }
142 if let Some(facility) = &parsed.facility {
143 log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
144 }
145 if let Some(line) = parsed.line {
146 log.insert(
147 &GELF_TARGET_PATHS.line,
148 Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
149 );
150 }
151 if let Some(file) = &parsed.file {
152 log.insert(&GELF_TARGET_PATHS.file, file.to_string());
153 }
154
155 if let Some(add) = &parsed.additional_fields {
156 for (key, val) in add.iter() {
157 if key == "_id" {
159 continue;
160 }
161 if !key.starts_with('_') {
163 return Err(format!(
164 "'{key}' field is invalid. \
165 Additional field names must be prefixed with an underscore."
166 )
167 .into());
168 }
169 if !VALID_FIELD_REGEX.is_match(key) {
171 return Err(format!(
172 "'{key}' field contains invalid characters. Field names may \
173 contain only letters, numbers, underscores, dashes and dots."
174 )
175 .into());
176 }
177
178 if val.is_string() || val.is_number() {
180 let vector_val: Value = val.into();
181 log.insert(event_path!(key.as_str()), vector_val);
182 } else {
183 let type_ = match val {
184 serde_json::Value::Null => "null",
185 serde_json::Value::Bool(_) => "boolean",
186 serde_json::Value::Number(_) => "number",
187 serde_json::Value::String(_) => "string",
188 serde_json::Value::Array(_) => "array",
189 serde_json::Value::Object(_) => "object",
190 };
191 return Err(format!("The value type for field {key} is an invalid type ({type_}). Additional field values \
192 should be either strings or numbers.").into());
193 }
194 }
195 }
196 Ok(Event::Log(log))
197 }
198}
199
200#[serde_as]
201#[derive(Serialize, Deserialize, Debug)]
202struct GelfMessage {
203 version: String,
204 host: String,
205 short_message: String,
206 full_message: Option<String>,
207 #[serde_as(as = "Option<TimestampSecondsWithFrac<f64>>")]
208 timestamp: Option<DateTime<Utc>>,
209 level: Option<u8>,
210 facility: Option<String>,
211 line: Option<f64>,
212 file: Option<String>,
213 #[serde(flatten)]
214 additional_fields: Option<HashMap<String, serde_json::Value>>,
215}
216
217impl Deserializer for GelfDeserializer {
218 fn parse(
219 &self,
220 bytes: Bytes,
221 _log_namespace: LogNamespace,
222 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
223 let parsed: GelfMessage = match self.lossy {
224 true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
225 false => serde_json::from_slice(&bytes),
226 }?;
227 let event = self.message_to_event(&parsed)?;
228
229 Ok(smallvec![event])
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use bytes::Bytes;
236 use lookup::event_path;
237 use serde_json::json;
238 use similar_asserts::assert_eq;
239 use smallvec::SmallVec;
240 use vector_core::{config::log_schema, event::Event};
241 use vrl::value::Value;
242
243 use super::*;
244
245 fn deserialize_gelf_input(
246 input: &serde_json::Value,
247 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
248 let config = GelfDeserializerConfig::default();
249 let deserializer = config.build();
250 let buffer = Bytes::from(serde_json::to_vec(&input).unwrap());
251 deserializer.parse(buffer, LogNamespace::Legacy)
252 }
253
254 #[test]
256 fn gelf_deserialize_correctness() {
257 let add_on_int_in = "_an.add-field_int";
258 let add_on_str_in = "_an.add-field_str";
259
260 let input = json!({
261 VERSION: "1.1",
262 HOST: "example.org",
263 SHORT_MESSAGE: "A short message that helps you identify what is going on",
264 FULL_MESSAGE: "Backtrace here\n\nmore stuff",
265 TIMESTAMP: 1385053862.3072,
266 LEVEL: 1,
267 FACILITY: "foo",
268 LINE: 42,
269 FILE: "/tmp/bar",
270 add_on_int_in: 2001.1002,
271 add_on_str_in: "A Space Odyssey",
272 });
273
274 let events = deserialize_gelf_input(&input).unwrap();
276 assert_eq!(events.len(), 1);
277
278 let log = events[0].as_log();
279
280 assert_eq!(
281 log.get(VERSION),
282 Some(&Value::Bytes(Bytes::from_static(b"1.1")))
283 );
284 assert_eq!(
285 log.get(HOST),
286 Some(&Value::Bytes(Bytes::from_static(b"example.org")))
287 );
288 assert_eq!(
289 log.get(log_schema().message_key_target_path().unwrap()),
290 Some(&Value::Bytes(Bytes::from_static(
291 b"A short message that helps you identify what is going on"
292 )))
293 );
294 assert_eq!(
295 log.get(FULL_MESSAGE),
296 Some(&Value::Bytes(Bytes::from_static(
297 b"Backtrace here\n\nmore stuff"
298 )))
299 );
300 let dt = DateTime::from_timestamp(1385053862, 307_200_000).expect("invalid timestamp");
301 assert_eq!(log.get(TIMESTAMP), Some(&Value::Timestamp(dt)));
302 assert_eq!(log.get(LEVEL), Some(&Value::Integer(1)));
303 assert_eq!(
304 log.get(FACILITY),
305 Some(&Value::Bytes(Bytes::from_static(b"foo")))
306 );
307 assert_eq!(
308 log.get(LINE),
309 Some(&Value::Float(ordered_float::NotNan::new(42.0).unwrap()))
310 );
311 assert_eq!(
312 log.get(FILE),
313 Some(&Value::Bytes(Bytes::from_static(b"/tmp/bar")))
314 );
315 assert_eq!(
316 log.get(event_path!(add_on_int_in)),
317 Some(&Value::Float(
318 ordered_float::NotNan::new(2001.1002).unwrap()
319 ))
320 );
321 assert_eq!(
322 log.get(event_path!(add_on_str_in)),
323 Some(&Value::Bytes(Bytes::from_static(b"A Space Odyssey")))
324 );
325 }
326
327 #[test]
329 fn gelf_deserializing_edge_cases() {
330 {
332 let input = json!({
333 HOST: "example.org",
334 SHORT_MESSAGE: "foobar",
335 VERSION: "1.1",
336 });
337 let events = deserialize_gelf_input(&input).unwrap();
338 assert_eq!(events.len(), 1);
339 let log = events[0].as_log();
340 assert!(log.contains(log_schema().message_key_target_path().unwrap()));
341 }
342
343 {
345 let input = json!({
346 HOST: "example.org",
347 SHORT_MESSAGE: "foobar",
348 VERSION: "1.1",
349 "_id": "S3creTz",
350 });
351 let events = deserialize_gelf_input(&input).unwrap();
352 assert_eq!(events.len(), 1);
353 let log = events[0].as_log();
354 assert!(!log.contains(event_path!("_id")));
355 }
356 }
357
358 #[test]
360 fn gelf_deserializing_err() {
361 fn validate_err(input: &serde_json::Value) {
362 assert!(deserialize_gelf_input(input).is_err());
363 }
364 validate_err(&json!({
366 HOST: "example.org",
367 SHORT_MESSAGE: "foobar",
368 VERSION: "1.1",
369 "_bad%key": "raboof",
370 }));
371
372 validate_err(&json!({
374 HOST: "example.org",
375 SHORT_MESSAGE: "foobar",
376 VERSION: "1.1",
377 "bad-key": "raboof",
378 }));
379
380 validate_err(&json!({
382 HOST: "example.org",
383 VERSION: "1.1",
384 }));
385
386 validate_err(&json!({
388 SHORT_MESSAGE: "foobar",
389 VERSION: "1.1",
390 }));
391
392 validate_err(&json!({
394 HOST: 42,
395 SHORT_MESSAGE: "foobar",
396 VERSION: "1.1",
397 }));
398
399 validate_err(&json!({
401 HOST: "example.org",
402 VERSION: "1.1",
403 SHORT_MESSAGE: "foobar",
404 LEVEL: "baz",
405 }));
406 }
407}