1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use derivative::Derivative;
4use lookup::{event_path, owned_value_path};
5use serde::{Deserialize, Serialize};
6use serde_with::{serde_as, TimestampSecondsWithFrac};
7use smallvec::{smallvec, SmallVec};
8use std::collections::HashMap;
9use vector_config::configurable_component;
10use vector_core::config::LogNamespace;
11use vector_core::{
12 config::{log_schema, DataType},
13 event::Event,
14 event::LogEvent,
15 schema,
16};
17use vrl::value::kind::Collection;
18use vrl::value::{Kind, Value};
19
20use super::{default_lossy, Deserializer};
21use crate::gelf::GELF_TARGET_PATHS;
22use crate::{gelf_fields::*, VALID_FIELD_REGEX};
23
24#[configurable_component]
35#[derive(Debug, Clone, Default)]
36pub struct GelfDeserializerConfig {
37 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
39 pub gelf: GelfDeserializerOptions,
40}
41
42impl GelfDeserializerConfig {
43 pub fn new(options: GelfDeserializerOptions) -> Self {
45 Self { gelf: options }
46 }
47
48 pub fn build(&self) -> GelfDeserializer {
50 GelfDeserializer {
51 lossy: self.gelf.lossy,
52 }
53 }
54
55 pub fn output_type(&self) -> DataType {
57 DataType::Log
58 }
59
60 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
62 schema::Definition::new_with_default_metadata(
63 Kind::object(Collection::empty()),
64 [log_namespace],
65 )
66 .with_event_field(&owned_value_path!(VERSION), Kind::bytes(), None)
67 .with_event_field(&owned_value_path!(HOST), Kind::bytes(), None)
68 .with_event_field(&owned_value_path!(SHORT_MESSAGE), Kind::bytes(), None)
69 .optional_field(&owned_value_path!(FULL_MESSAGE), Kind::bytes(), None)
70 .optional_field(&owned_value_path!(TIMESTAMP), Kind::timestamp(), None)
71 .optional_field(&owned_value_path!(LEVEL), Kind::integer(), None)
72 .optional_field(&owned_value_path!(FACILITY), Kind::bytes(), None)
73 .optional_field(&owned_value_path!(LINE), Kind::integer(), None)
74 .optional_field(&owned_value_path!(FILE), Kind::bytes(), None)
75 .unknown_fields(Kind::bytes().or_integer().or_float())
79 }
80}
81
82#[configurable_component]
84#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
85#[derivative(Default)]
86pub struct GelfDeserializerOptions {
87 #[serde(
93 default = "default_lossy",
94 skip_serializing_if = "vector_core::serde::is_default"
95 )]
96 #[derivative(Default(value = "default_lossy()"))]
97 pub lossy: bool,
98}
99
100#[derive(Debug, Clone, Derivative)]
102#[derivative(Default)]
103pub struct GelfDeserializer {
104 #[derivative(Default(value = "default_lossy()"))]
105 lossy: bool,
106}
107
108impl GelfDeserializer {
109 pub fn new(lossy: bool) -> GelfDeserializer {
111 GelfDeserializer { lossy }
112 }
113
114 fn message_to_event(&self, parsed: &GelfMessage) -> vector_common::Result<Event> {
117 let mut log = LogEvent::from_str_legacy(parsed.short_message.to_string());
118
119 if parsed.version != GELF_VERSION {
121 return Err(
122 format!("{VERSION} does not match GELF spec version ({GELF_VERSION})").into(),
123 );
124 }
125
126 log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
127 log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());
128
129 if let Some(full_message) = &parsed.full_message {
130 log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
131 }
132
133 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
134 if let Some(timestamp) = parsed.timestamp {
135 log.insert(timestamp_key, timestamp);
136 } else {
138 log.insert(timestamp_key, Utc::now());
139 }
140 }
141
142 if let Some(level) = parsed.level {
143 log.insert(&GELF_TARGET_PATHS.level, level);
144 }
145 if let Some(facility) = &parsed.facility {
146 log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
147 }
148 if let Some(line) = parsed.line {
149 log.insert(
150 &GELF_TARGET_PATHS.line,
151 Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
152 );
153 }
154 if let Some(file) = &parsed.file {
155 log.insert(&GELF_TARGET_PATHS.file, file.to_string());
156 }
157
158 if let Some(add) = &parsed.additional_fields {
159 for (key, val) in add.iter() {
160 if key == "_id" {
162 continue;
163 }
164 if !key.starts_with('_') {
166 return Err(format!(
167 "'{key}' field is invalid. \
168 Additional field names must be prefixed with an underscore."
169 )
170 .into());
171 }
172 if !VALID_FIELD_REGEX.is_match(key) {
174 return Err(format!("'{key}' field contains invalid characters. Field names may \
175 contain only letters, numbers, underscores, dashes and dots.").into());
176 }
177
178 if val.is_string() || val.is_number() {
180 let vector_val: Value = val.into();
181 log.insert(event_path!(key.as_str()), vector_val);
182 } else {
183 let type_ = match val {
184 serde_json::Value::Null => "null",
185 serde_json::Value::Bool(_) => "boolean",
186 serde_json::Value::Number(_) => "number",
187 serde_json::Value::String(_) => "string",
188 serde_json::Value::Array(_) => "array",
189 serde_json::Value::Object(_) => "object",
190 };
191 return Err(format!("The value type for field {key} is an invalid type ({type_}). Additional field values \
192 should be either strings or numbers.").into());
193 }
194 }
195 }
196 Ok(Event::Log(log))
197 }
198}
199
200#[serde_as]
201#[derive(Serialize, Deserialize, Debug)]
202struct GelfMessage {
203 version: String,
204 host: String,
205 short_message: String,
206 full_message: Option<String>,
207 #[serde_as(as = "Option<TimestampSecondsWithFrac<f64>>")]
208 timestamp: Option<DateTime<Utc>>,
209 level: Option<u8>,
210 facility: Option<String>,
211 line: Option<f64>,
212 file: Option<String>,
213 #[serde(flatten)]
214 additional_fields: Option<HashMap<String, serde_json::Value>>,
215}
216
217impl Deserializer for GelfDeserializer {
218 fn parse(
219 &self,
220 bytes: Bytes,
221 _log_namespace: LogNamespace,
222 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
223 let parsed: GelfMessage = match self.lossy {
224 true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
225 false => serde_json::from_slice(&bytes),
226 }?;
227 let event = self.message_to_event(&parsed)?;
228
229 Ok(smallvec![event])
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use bytes::Bytes;
237 use lookup::event_path;
238 use serde_json::json;
239 use similar_asserts::assert_eq;
240 use smallvec::SmallVec;
241 use vector_core::{config::log_schema, event::Event};
242 use vrl::value::Value;
243
244 fn deserialize_gelf_input(
245 input: &serde_json::Value,
246 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
247 let config = GelfDeserializerConfig::default();
248 let deserializer = config.build();
249 let buffer = Bytes::from(serde_json::to_vec(&input).unwrap());
250 deserializer.parse(buffer, LogNamespace::Legacy)
251 }
252
253 #[test]
255 fn gelf_deserialize_correctness() {
256 let add_on_int_in = "_an.add-field_int";
257 let add_on_str_in = "_an.add-field_str";
258
259 let input = json!({
260 VERSION: "1.1",
261 HOST: "example.org",
262 SHORT_MESSAGE: "A short message that helps you identify what is going on",
263 FULL_MESSAGE: "Backtrace here\n\nmore stuff",
264 TIMESTAMP: 1385053862.3072,
265 LEVEL: 1,
266 FACILITY: "foo",
267 LINE: 42,
268 FILE: "/tmp/bar",
269 add_on_int_in: 2001.1002,
270 add_on_str_in: "A Space Odyssey",
271 });
272
273 let events = deserialize_gelf_input(&input).unwrap();
275 assert_eq!(events.len(), 1);
276
277 let log = events[0].as_log();
278
279 assert_eq!(
280 log.get(VERSION),
281 Some(&Value::Bytes(Bytes::from_static(b"1.1")))
282 );
283 assert_eq!(
284 log.get(HOST),
285 Some(&Value::Bytes(Bytes::from_static(b"example.org")))
286 );
287 assert_eq!(
288 log.get(log_schema().message_key_target_path().unwrap()),
289 Some(&Value::Bytes(Bytes::from_static(
290 b"A short message that helps you identify what is going on"
291 )))
292 );
293 assert_eq!(
294 log.get(FULL_MESSAGE),
295 Some(&Value::Bytes(Bytes::from_static(
296 b"Backtrace here\n\nmore stuff"
297 )))
298 );
299 let dt = DateTime::from_timestamp(1385053862, 307_200_000).expect("invalid timestamp");
300 assert_eq!(log.get(TIMESTAMP), Some(&Value::Timestamp(dt)));
301 assert_eq!(log.get(LEVEL), Some(&Value::Integer(1)));
302 assert_eq!(
303 log.get(FACILITY),
304 Some(&Value::Bytes(Bytes::from_static(b"foo")))
305 );
306 assert_eq!(
307 log.get(LINE),
308 Some(&Value::Float(ordered_float::NotNan::new(42.0).unwrap()))
309 );
310 assert_eq!(
311 log.get(FILE),
312 Some(&Value::Bytes(Bytes::from_static(b"/tmp/bar")))
313 );
314 assert_eq!(
315 log.get(event_path!(add_on_int_in)),
316 Some(&Value::Float(
317 ordered_float::NotNan::new(2001.1002).unwrap()
318 ))
319 );
320 assert_eq!(
321 log.get(event_path!(add_on_str_in)),
322 Some(&Value::Bytes(Bytes::from_static(b"A Space Odyssey")))
323 );
324 }
325
326 #[test]
328 fn gelf_deserializing_edge_cases() {
329 {
331 let input = json!({
332 HOST: "example.org",
333 SHORT_MESSAGE: "foobar",
334 VERSION: "1.1",
335 });
336 let events = deserialize_gelf_input(&input).unwrap();
337 assert_eq!(events.len(), 1);
338 let log = events[0].as_log();
339 assert!(log.contains(log_schema().message_key_target_path().unwrap()));
340 }
341
342 {
344 let input = json!({
345 HOST: "example.org",
346 SHORT_MESSAGE: "foobar",
347 VERSION: "1.1",
348 "_id": "S3creTz",
349 });
350 let events = deserialize_gelf_input(&input).unwrap();
351 assert_eq!(events.len(), 1);
352 let log = events[0].as_log();
353 assert!(!log.contains(event_path!("_id")));
354 }
355 }
356
357 #[test]
359 fn gelf_deserializing_err() {
360 fn validate_err(input: &serde_json::Value) {
361 assert!(deserialize_gelf_input(input).is_err());
362 }
363 validate_err(&json!({
365 HOST: "example.org",
366 SHORT_MESSAGE: "foobar",
367 VERSION: "1.1",
368 "_bad%key": "raboof",
369 }));
370
371 validate_err(&json!({
373 HOST: "example.org",
374 SHORT_MESSAGE: "foobar",
375 VERSION: "1.1",
376 "bad-key": "raboof",
377 }));
378
379 validate_err(&json!({
381 HOST: "example.org",
382 VERSION: "1.1",
383 }));
384
385 validate_err(&json!({
387 SHORT_MESSAGE: "foobar",
388 VERSION: "1.1",
389 }));
390
391 validate_err(&json!({
393 HOST: 42,
394 SHORT_MESSAGE: "foobar",
395 VERSION: "1.1",
396 }));
397
398 validate_err(&json!({
400 HOST: "example.org",
401 VERSION: "1.1",
402 SHORT_MESSAGE: "foobar",
403 LEVEL: "baz",
404 }));
405 }
406}