1use crate::gelf::GELF_TARGET_PATHS;
2use crate::{gelf_fields::*, VALID_FIELD_REGEX};
3use bytes::{BufMut, BytesMut};
4use lookup::event_path;
5use ordered_float::NotNan;
6use serde::{Deserialize, Serialize};
7use snafu::Snafu;
8use tokio_util::codec::Encoder;
9use vector_core::{
10 config::{log_schema, DataType},
11 event::{Event, KeyString, LogEvent, Value},
12 schema,
13};
14
15#[derive(Debug, Snafu)]
25pub enum GelfSerializerError {
26 #[snafu(display(r#"LogEvent does not contain required field: "{}""#, field))]
27 MissingField { field: KeyString },
28 #[snafu(display(
29 r#"LogEvent contains field with invalid name not matching pattern '{}': "{}""#,
30 pattern,
31 field,
32 ))]
33 InvalidField { field: KeyString, pattern: String },
34 #[snafu(display(
35 r#"LogEvent contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#,
36 field,
37 actual_type,
38 expected_type
39 ))]
40 InvalidValueType {
41 field: String,
42 actual_type: String,
43 expected_type: String,
44 },
45}
46
47#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct GelfSerializerConfig;
50
51impl GelfSerializerConfig {
52 pub const fn new() -> Self {
54 Self
55 }
56
57 pub fn build(&self) -> GelfSerializer {
59 GelfSerializer::new()
60 }
61
62 pub fn input_type() -> DataType {
64 DataType::Log
65 }
66
67 pub fn schema_requirement() -> schema::Requirement {
69 schema::Requirement::empty()
72 }
73}
74
75#[derive(Debug, Clone)]
78pub struct GelfSerializer;
79
80impl GelfSerializer {
81 pub fn new() -> Self {
83 GelfSerializer
84 }
85
86 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
88 let log = to_gelf_event(event.into_log())?;
90 serde_json::to_value(&log).map_err(|e| e.to_string().into())
91 }
92}
93
94impl Default for GelfSerializer {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100impl Encoder<Event> for GelfSerializer {
101 type Error = vector_common::Error;
102
103 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
104 let log = to_gelf_event(event.into_log())?;
105 let writer = buffer.writer();
106 serde_json::to_writer(writer, &log)?;
107 Ok(())
108 }
109}
110
111fn err_invalid_type(
113 field: &str,
114 expected_type: &str,
115 actual_type: &str,
116) -> vector_common::Result<()> {
117 InvalidValueTypeSnafu {
118 field,
119 actual_type,
120 expected_type,
121 }
122 .fail()
123 .map_err(|e| e.to_string().into())
124}
125
126fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent> {
128 fn err_missing_field(field: &str) -> vector_common::Result<()> {
130 MissingFieldSnafu { field }
131 .fail()
132 .map_err(|e| e.to_string().into())
133 }
134
135 if !log.contains(&GELF_TARGET_PATHS.version) {
137 log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
138 }
139
140 if !log.contains(&GELF_TARGET_PATHS.host) {
141 err_missing_field(HOST)?;
142 }
143
144 if !log.contains(&GELF_TARGET_PATHS.short_message) {
145 if let Some(message_key) = log_schema().message_key_target_path() {
146 if log.contains(message_key) {
147 log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
148 } else {
149 err_missing_field(SHORT_MESSAGE)?;
150 }
151 }
152 }
153 Ok(log)
154}
155
156fn coerce_field_names_and_values(
158 mut log: LogEvent,
159) -> vector_common::Result<(LogEvent, Vec<String>)> {
160 let mut missing_prefix = vec![];
161 if let Some(event_data) = log.as_map_mut() {
162 for (field, value) in event_data.iter_mut() {
163 match field.as_str() {
164 VERSION | HOST | SHORT_MESSAGE | FULL_MESSAGE | FACILITY | FILE => {
165 if !value.is_bytes() {
166 err_invalid_type(field, "UTF-8 string", value.kind_str())?;
167 }
168 }
169 TIMESTAMP => {
170 if !(value.is_timestamp() || value.is_integer()) {
171 err_invalid_type(field, "timestamp or integer", value.kind_str())?;
172 }
173
174 if let Value::Timestamp(ts) = value {
176 let ts_millis = ts.timestamp_millis();
177 if ts_millis % 1000 != 0 {
178 *value = Value::Float(NotNan::new(ts_millis as f64 / 1000.0).unwrap());
179 } else {
180 *value = Value::Integer(ts.timestamp())
183 }
184 }
185 }
186 LEVEL => {
187 if !value.is_integer() {
188 err_invalid_type(field, "integer", value.kind_str())?;
189 }
190 }
191 LINE => {
192 if !(value.is_float() || value.is_integer()) {
193 err_invalid_type(field, "number", value.kind_str())?;
194 }
195 }
196 _ => {
197 if !VALID_FIELD_REGEX.is_match(field) {
199 return InvalidFieldSnafu {
200 field: field.clone(),
201 pattern: VALID_FIELD_REGEX.to_string(),
202 }
203 .fail()
204 .map_err(|e| e.to_string().into());
205 }
206
207 if !(value.is_integer() || value.is_float() || value.is_bytes()) {
209 err_invalid_type(field, "string or number", value.kind_str())?;
210 }
211
212 if !field.is_empty() && !field.starts_with('_') {
216 missing_prefix.push(field.to_string());
218 }
219 }
220 }
221 }
222 }
223 Ok((log, missing_prefix))
224}
225
226fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
228 let log = coerce_required_fields(log).and_then(|log| {
229 coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
230 for field in missing_prefix {
232 log.rename_key(
233 event_path!(field.as_str()),
234 event_path!(format!("_{}", &field).as_str()),
235 );
236 }
237 log
238 })
239 })?;
240
241 Ok(log)
242}
243
244#[cfg(test)]
245mod tests {
246 use crate::encoding::SerializerConfig;
247
248 use super::*;
249 use chrono::NaiveDateTime;
250 use vector_core::event::{Event, EventMetadata};
251 use vrl::btreemap;
252 use vrl::value::{ObjectMap, Value};
253
254 fn do_serialize(expect_success: bool, event_fields: ObjectMap) -> Option<serde_json::Value> {
255 let config = GelfSerializerConfig::new();
256 let mut serializer = config.build();
257 let event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
258 let mut buffer = BytesMut::new();
259
260 if expect_success {
261 assert!(serializer.encode(event, &mut buffer).is_ok());
262 let buffer_str = std::str::from_utf8(&buffer).unwrap();
263 let result = serde_json::from_str(buffer_str);
264 assert!(result.is_ok());
265 Some(result.unwrap())
266 } else {
267 assert!(serializer.encode(event, &mut buffer).is_err());
268 None
269 }
270 }
271
272 #[test]
273 fn gelf_serde_json_to_value_supported_success() {
274 let serializer = SerializerConfig::Gelf.build().unwrap();
275
276 let event_fields = btreemap! {
277 VERSION => "1.1",
278 HOST => "example.org",
279 SHORT_MESSAGE => "Some message",
280 };
281
282 let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
283 assert!(serializer.supports_json());
284 assert!(serializer.to_json_value(log_event).is_ok());
285 }
286
287 #[test]
288 fn gelf_serde_json_to_value_supported_failure_to_encode() {
289 let serializer = SerializerConfig::Gelf.build().unwrap();
290 let event_fields = btreemap! {};
291 let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
292 assert!(serializer.supports_json());
293 assert!(serializer.to_json_value(log_event).is_err());
294 }
295
296 #[test]
297 fn gelf_serializing_valid() {
298 let event_fields = btreemap! {
299 VERSION => "1.1",
300 HOST => "example.org",
301 SHORT_MESSAGE => "Some message",
302 FULL_MESSAGE => "Even more message",
303 FACILITY => "",
304 FILE => "/tmp/foobar",
305 LINE => Value::Float(ordered_float::NotNan::new(1.5).unwrap()),
306 LEVEL => 5,
307 };
308
309 let jsn = do_serialize(true, event_fields).unwrap();
310
311 assert_eq!(jsn.get(VERSION).unwrap(), "1.1");
312 assert_eq!(jsn.get(HOST).unwrap(), "example.org");
313 assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
314 }
315
316 #[test]
317 fn gelf_serializing_coerced() {
318 {
320 let event_fields = btreemap! {
321 VERSION => "1.1",
322 HOST => "example.org",
323 SHORT_MESSAGE => "Some message",
324 "noUnderScore" => 0,
325 };
326
327 let jsn = do_serialize(true, event_fields).unwrap();
328 assert_eq!(jsn.get("_noUnderScore").unwrap(), 0);
329 }
330
331 {
333 let event_fields = btreemap! {
334 VERSION => "1.1",
335 HOST => "example.org",
336 log_schema().message_key().unwrap().to_string() => "Some message",
337 };
338
339 let jsn = do_serialize(true, event_fields).unwrap();
340 assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
341 }
342 }
343
344 #[test]
345 fn gelf_serializing_timestamp() {
346 {
348 let naive_dt =
349 NaiveDateTime::parse_from_str("1970-01-01 00:00:00.1", "%Y-%m-%d %H:%M:%S%.f");
350 let dt = naive_dt.unwrap().and_utc();
351
352 let event_fields = btreemap! {
353 VERSION => "1.1",
354 SHORT_MESSAGE => "Some message",
355 HOST => "example.org",
356 TIMESTAMP => dt,
357 };
358
359 let jsn = do_serialize(true, event_fields).unwrap();
360 assert!(jsn.get(TIMESTAMP).unwrap().is_f64());
361 assert_eq!(jsn.get(TIMESTAMP).unwrap().as_f64().unwrap(), 0.1,);
362 }
363
364 {
366 let naive_dt =
367 NaiveDateTime::parse_from_str("1970-01-01 00:00:00.0", "%Y-%m-%d %H:%M:%S%.f");
368 let dt = naive_dt.unwrap().and_utc();
369
370 let event_fields = btreemap! {
371 VERSION => "1.1",
372 SHORT_MESSAGE => "Some message",
373 HOST => "example.org",
374 TIMESTAMP => dt,
375 };
376
377 let jsn = do_serialize(true, event_fields).unwrap();
378 assert!(jsn.get(TIMESTAMP).unwrap().is_i64());
379 assert_eq!(jsn.get(TIMESTAMP).unwrap().as_i64().unwrap(), 0);
380 }
381 }
382
383 #[test]
384 fn gelf_serializing_invalid_error() {
385 {
387 let event_fields = btreemap! {
388 VERSION => "1.1",
389 SHORT_MESSAGE => "Some message",
390 };
391 do_serialize(false, event_fields);
392 }
393 {
395 let event_fields = btreemap! {
396 HOST => "example.org",
397 VERSION => "1.1",
398 };
399 do_serialize(false, event_fields);
400 }
401 {
403 let event_fields = btreemap! {
404 HOST => "example.org",
405 VERSION => "1.1",
406 SHORT_MESSAGE => 0,
407 };
408 do_serialize(false, event_fields);
409 }
410 {
412 let event_fields = btreemap! {
413 HOST => "example.org",
414 VERSION => "1.1",
415 SHORT_MESSAGE => "Some message",
416 LEVEL => "1",
417 };
418 do_serialize(false, event_fields);
419 }
420 {
422 let event_fields = btreemap! {
423 HOST => "example.org",
424 VERSION => "1.1",
425 SHORT_MESSAGE => "Some message",
426 LINE => "1.2",
427 };
428 do_serialize(false, event_fields);
429 }
430 {
432 let event_fields = btreemap! {
433 HOST => "example.org",
434 VERSION => "1.1",
435 SHORT_MESSAGE => "Some message",
436 "invalid%field" => "foo",
437 };
438 do_serialize(false, event_fields);
439 }
440 {
442 let event_fields = btreemap! {
443 HOST => "example.org",
444 VERSION => "1.1",
445 SHORT_MESSAGE => "Some message",
446 "_foobar" => false,
447 };
448 do_serialize(false, event_fields);
449 }
450 {
452 let event_fields = btreemap! {
453 HOST => "example.org",
454 VERSION => "1.1",
455 SHORT_MESSAGE => "Some message",
456 "_foobar" => serde_json::Value::Null,
457 };
458 do_serialize(false, event_fields);
459 }
460 }
461}