1use crate::{VALID_FIELD_REGEX, encoding::GelfChunker, gelf::GELF_TARGET_PATHS, gelf_fields::*};
2use bytes::{BufMut, BytesMut};
3use lookup::event_path;
4use ordered_float::NotNan;
5use snafu::Snafu;
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9 config::{DataType, log_schema},
10 event::{Event, KeyString, LogEvent, Value},
11 schema,
12};
13
14#[configurable_component]
16#[derive(Debug, Clone)]
17pub struct GelfSerializerOptions {
18 #[configurable(validation(range(min = 13)))]
22 #[serde(default = "default_max_chunk_size")]
23 pub max_chunk_size: usize,
24}
25
26const fn default_max_chunk_size() -> usize {
27 8192
28}
29
30impl Default for GelfSerializerOptions {
31 fn default() -> Self {
32 Self {
33 max_chunk_size: default_max_chunk_size(),
34 }
35 }
36}
37
38#[derive(Debug, Snafu)]
48pub enum GelfSerializerError {
49 #[snafu(display(r#"LogEvent does not contain required field: "{}""#, field))]
50 MissingField { field: KeyString },
51 #[snafu(display(
52 r#"LogEvent contains field with invalid name not matching pattern '{}': "{}""#,
53 pattern,
54 field,
55 ))]
56 InvalidField { field: KeyString, pattern: String },
57 #[snafu(display(
58 r#"LogEvent contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#,
59 field,
60 actual_type,
61 expected_type
62 ))]
63 InvalidValueType {
64 field: String,
65 actual_type: String,
66 expected_type: String,
67 },
68}
69
70#[configurable_component]
72#[derive(Debug, Clone, Default)]
73pub struct GelfSerializerConfig {
74 #[serde(default, rename = "gelf")]
76 pub options: GelfSerializerOptions,
77}
78
79impl GelfSerializerConfig {
80 pub const fn new(options: GelfSerializerOptions) -> Self {
82 Self { options }
83 }
84
85 pub fn build(&self) -> GelfSerializer {
87 GelfSerializer::new(self.options.clone())
88 }
89
90 pub fn input_type(&self) -> DataType {
92 DataType::Log
93 }
94
95 pub fn schema_requirement(&self) -> schema::Requirement {
97 schema::Requirement::empty()
100 }
101}
102
103#[derive(Debug, Clone)]
106pub struct GelfSerializer {
107 options: GelfSerializerOptions,
108}
109
110impl GelfSerializer {
111 pub fn new(options: GelfSerializerOptions) -> Self {
113 GelfSerializer { options }
114 }
115
116 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
118 let log = to_gelf_event(event.into_log())?;
120 serde_json::to_value(&log).map_err(|e| e.to_string().into())
121 }
122
123 pub fn chunker(&self) -> GelfChunker {
125 GelfChunker {
126 max_chunk_size: self.options.max_chunk_size,
127 }
128 }
129}
130
131impl Encoder<Event> for GelfSerializer {
132 type Error = vector_common::Error;
133
134 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
135 let log = to_gelf_event(event.into_log())?;
136 let writer = buffer.writer();
137 serde_json::to_writer(writer, &log)?;
138 Ok(())
139 }
140}
141
142fn err_invalid_type(
144 field: &str,
145 expected_type: &str,
146 actual_type: &str,
147) -> vector_common::Result<()> {
148 InvalidValueTypeSnafu {
149 field,
150 actual_type,
151 expected_type,
152 }
153 .fail()
154 .map_err(|e| e.to_string().into())
155}
156
157fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent> {
159 fn err_missing_field(field: &str) -> vector_common::Result<()> {
161 MissingFieldSnafu { field }
162 .fail()
163 .map_err(|e| e.to_string().into())
164 }
165
166 if !log.contains(&GELF_TARGET_PATHS.version) {
168 log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
169 }
170
171 if !log.contains(&GELF_TARGET_PATHS.host) {
172 err_missing_field(HOST)?;
173 }
174
175 if !log.contains(&GELF_TARGET_PATHS.short_message)
176 && let Some(message_key) = log_schema().message_key_target_path()
177 {
178 if log.contains(message_key) {
179 log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
180 } else {
181 err_missing_field(SHORT_MESSAGE)?;
182 }
183 }
184 Ok(log)
185}
186
187fn coerce_field_names_and_values(
189 mut log: LogEvent,
190) -> vector_common::Result<(LogEvent, Vec<String>)> {
191 let mut missing_prefix = vec![];
192 if let Some(event_data) = log.as_map_mut() {
193 for (field, value) in event_data.iter_mut() {
194 match field.as_str() {
195 VERSION | HOST | SHORT_MESSAGE | FULL_MESSAGE | FACILITY | FILE => {
196 if !value.is_bytes() {
197 err_invalid_type(field, "UTF-8 string", value.kind_str())?;
198 }
199 }
200 TIMESTAMP => {
201 if !(value.is_timestamp() || value.is_integer()) {
202 err_invalid_type(field, "timestamp or integer", value.kind_str())?;
203 }
204
205 if let Value::Timestamp(ts) = value {
207 let ts_millis = ts.timestamp_millis();
208 if ts_millis % 1000 != 0 {
209 *value = Value::Float(
211 NotNan::new(ts_millis as f64 / 1000.0)
212 .expect("i64 -> f64 produced NaN"),
213 );
214 } else {
215 *value = Value::Integer(ts.timestamp())
218 }
219 }
220 }
221 LEVEL => {
222 if !value.is_integer() {
223 err_invalid_type(field, "integer", value.kind_str())?;
224 }
225 }
226 LINE => {
227 if !(value.is_float() || value.is_integer()) {
228 err_invalid_type(field, "number", value.kind_str())?;
229 }
230 }
231 _ => {
232 if !VALID_FIELD_REGEX.is_match(field) {
234 return InvalidFieldSnafu {
235 field: field.clone(),
236 pattern: VALID_FIELD_REGEX.to_string(),
237 }
238 .fail()
239 .map_err(|e| e.to_string().into());
240 }
241
242 if !(value.is_integer() || value.is_float() || value.is_bytes()) {
244 err_invalid_type(field, "string or number", value.kind_str())?;
245 }
246
247 if !field.is_empty() && !field.starts_with('_') {
251 missing_prefix.push(field.to_string());
253 }
254 }
255 }
256 }
257 }
258 Ok((log, missing_prefix))
259}
260
261fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
263 let log = coerce_required_fields(log).and_then(|log| {
264 coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
265 for field in missing_prefix {
267 log.rename_key(
268 event_path!(field.as_str()),
269 event_path!(format!("_{}", &field).as_str()),
270 );
271 }
272 log
273 })
274 })?;
275
276 Ok(log)
277}
278
279#[cfg(test)]
280mod tests {
281 use chrono::NaiveDateTime;
282 use vector_core::event::{Event, EventMetadata};
283 use vrl::{
284 btreemap,
285 value::{ObjectMap, Value},
286 };
287
288 use super::*;
289 use crate::encoding::SerializerConfig;
290
291 fn do_serialize(expect_success: bool, event_fields: ObjectMap) -> Option<serde_json::Value> {
292 let config = GelfSerializerConfig::new(GelfSerializerOptions::default());
293 let mut serializer = config.build();
294 let event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
295 let mut buffer = BytesMut::new();
296
297 if expect_success {
298 assert!(serializer.encode(event, &mut buffer).is_ok());
299 let buffer_str = std::str::from_utf8(&buffer).unwrap();
300 let result = serde_json::from_str(buffer_str);
301 assert!(result.is_ok());
302 Some(result.unwrap())
303 } else {
304 assert!(serializer.encode(event, &mut buffer).is_err());
305 None
306 }
307 }
308
309 #[test]
310 fn gelf_serde_json_to_value_supported_success() {
311 let serializer = SerializerConfig::Gelf(Default::default()).build().unwrap();
312
313 let event_fields = btreemap! {
314 VERSION => "1.1",
315 HOST => "example.org",
316 SHORT_MESSAGE => "Some message",
317 };
318
319 let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
320 assert!(serializer.supports_json());
321 assert!(serializer.to_json_value(log_event).is_ok());
322 }
323
324 #[test]
325 fn gelf_serde_json_to_value_supported_failure_to_encode() {
326 let serializer = SerializerConfig::Gelf(Default::default()).build().unwrap();
327 let event_fields = btreemap! {};
328 let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
329 assert!(serializer.supports_json());
330 assert!(serializer.to_json_value(log_event).is_err());
331 }
332
333 #[test]
334 fn gelf_serializing_valid() {
335 let event_fields = btreemap! {
336 VERSION => "1.1",
337 HOST => "example.org",
338 SHORT_MESSAGE => "Some message",
339 FULL_MESSAGE => "Even more message",
340 FACILITY => "",
341 FILE => "/tmp/foobar",
342 LINE => Value::Float(ordered_float::NotNan::new(1.5).unwrap()),
343 LEVEL => 5,
344 };
345
346 let jsn = do_serialize(true, event_fields).unwrap();
347
348 assert_eq!(jsn.get(VERSION).unwrap(), "1.1");
349 assert_eq!(jsn.get(HOST).unwrap(), "example.org");
350 assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
351 }
352
353 #[test]
354 fn gelf_serializing_coerced() {
355 {
357 let event_fields = btreemap! {
358 VERSION => "1.1",
359 HOST => "example.org",
360 SHORT_MESSAGE => "Some message",
361 "noUnderScore" => 0,
362 };
363
364 let jsn = do_serialize(true, event_fields).unwrap();
365 assert_eq!(jsn.get("_noUnderScore").unwrap(), 0);
366 }
367
368 {
370 let event_fields = btreemap! {
371 VERSION => "1.1",
372 HOST => "example.org",
373 log_schema().message_key().unwrap().to_string() => "Some message",
374 };
375
376 let jsn = do_serialize(true, event_fields).unwrap();
377 assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
378 }
379 }
380
381 #[test]
382 fn gelf_serializing_timestamp() {
383 {
385 let naive_dt =
386 NaiveDateTime::parse_from_str("1970-01-01 00:00:00.1", "%Y-%m-%d %H:%M:%S%.f");
387 let dt = naive_dt.unwrap().and_utc();
388
389 let event_fields = btreemap! {
390 VERSION => "1.1",
391 SHORT_MESSAGE => "Some message",
392 HOST => "example.org",
393 TIMESTAMP => dt,
394 };
395
396 let jsn = do_serialize(true, event_fields).unwrap();
397 assert!(jsn.get(TIMESTAMP).unwrap().is_f64());
398 assert_eq!(jsn.get(TIMESTAMP).unwrap().as_f64().unwrap(), 0.1,);
399 }
400
401 {
403 let naive_dt =
404 NaiveDateTime::parse_from_str("1970-01-01 00:00:00.0", "%Y-%m-%d %H:%M:%S%.f");
405 let dt = naive_dt.unwrap().and_utc();
406
407 let event_fields = btreemap! {
408 VERSION => "1.1",
409 SHORT_MESSAGE => "Some message",
410 HOST => "example.org",
411 TIMESTAMP => dt,
412 };
413
414 let jsn = do_serialize(true, event_fields).unwrap();
415 assert!(jsn.get(TIMESTAMP).unwrap().is_i64());
416 assert_eq!(jsn.get(TIMESTAMP).unwrap().as_i64().unwrap(), 0);
417 }
418 }
419
420 #[test]
421 fn gelf_serializing_invalid_error() {
422 {
424 let event_fields = btreemap! {
425 VERSION => "1.1",
426 SHORT_MESSAGE => "Some message",
427 };
428 do_serialize(false, event_fields);
429 }
430 {
432 let event_fields = btreemap! {
433 HOST => "example.org",
434 VERSION => "1.1",
435 };
436 do_serialize(false, event_fields);
437 }
438 {
440 let event_fields = btreemap! {
441 HOST => "example.org",
442 VERSION => "1.1",
443 SHORT_MESSAGE => 0,
444 };
445 do_serialize(false, event_fields);
446 }
447 {
449 let event_fields = btreemap! {
450 HOST => "example.org",
451 VERSION => "1.1",
452 SHORT_MESSAGE => "Some message",
453 LEVEL => "1",
454 };
455 do_serialize(false, event_fields);
456 }
457 {
459 let event_fields = btreemap! {
460 HOST => "example.org",
461 VERSION => "1.1",
462 SHORT_MESSAGE => "Some message",
463 LINE => "1.2",
464 };
465 do_serialize(false, event_fields);
466 }
467 {
469 let event_fields = btreemap! {
470 HOST => "example.org",
471 VERSION => "1.1",
472 SHORT_MESSAGE => "Some message",
473 "invalid%field" => "foo",
474 };
475 do_serialize(false, event_fields);
476 }
477 {
479 let event_fields = btreemap! {
480 HOST => "example.org",
481 VERSION => "1.1",
482 SHORT_MESSAGE => "Some message",
483 "_foobar" => false,
484 };
485 do_serialize(false, event_fields);
486 }
487 {
489 let event_fields = btreemap! {
490 HOST => "example.org",
491 VERSION => "1.1",
492 SHORT_MESSAGE => "Some message",
493 "_foobar" => serde_json::Value::Null,
494 };
495 do_serialize(false, event_fields);
496 }
497 }
498}