1use crate::{VALID_FIELD_REGEX, encoding::GelfChunker, gelf::GELF_TARGET_PATHS, gelf_fields::*};
2use bytes::{BufMut, BytesMut};
3use lookup::event_path;
4use ordered_float::NotNan;
5use snafu::Snafu;
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9 config::{DataType, log_schema},
10 event::{Event, KeyString, LogEvent, Value},
11 schema,
12};
13
14#[configurable_component]
16#[derive(Debug, Clone)]
17pub struct GelfSerializerOptions {
18 #[configurable(validation(range(min = 13)))]
22 #[serde(default = "default_max_chunk_size")]
23 pub max_chunk_size: usize,
24}
25
26const fn default_max_chunk_size() -> usize {
27 8192
28}
29
30impl Default for GelfSerializerOptions {
31 fn default() -> Self {
32 Self {
33 max_chunk_size: default_max_chunk_size(),
34 }
35 }
36}
37
38#[derive(Debug, Snafu)]
48pub enum GelfSerializerError {
49 #[snafu(display(r#"LogEvent does not contain required field: "{}""#, field))]
50 MissingField { field: KeyString },
51 #[snafu(display(
52 r#"LogEvent contains field with invalid name not matching pattern '{}': "{}""#,
53 pattern,
54 field,
55 ))]
56 InvalidField { field: KeyString, pattern: String },
57 #[snafu(display(
58 r#"LogEvent contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#,
59 field,
60 actual_type,
61 expected_type
62 ))]
63 InvalidValueType {
64 field: String,
65 actual_type: String,
66 expected_type: String,
67 },
68}
69
70#[configurable_component]
72#[derive(Debug, Clone, Default)]
73pub struct GelfSerializerConfig {
74 #[serde(default, rename = "gelf")]
76 pub options: GelfSerializerOptions,
77}
78
79impl GelfSerializerConfig {
80 pub const fn new(options: GelfSerializerOptions) -> Self {
82 Self { options }
83 }
84
85 pub fn build(&self) -> GelfSerializer {
87 GelfSerializer::new(self.options.clone())
88 }
89
90 pub fn input_type(&self) -> DataType {
92 DataType::Log
93 }
94
95 pub fn schema_requirement(&self) -> schema::Requirement {
97 schema::Requirement::empty()
100 }
101}
102
103#[derive(Debug, Clone)]
106pub struct GelfSerializer {
107 options: GelfSerializerOptions,
108}
109
110impl GelfSerializer {
111 pub fn new(options: GelfSerializerOptions) -> Self {
113 GelfSerializer { options }
114 }
115
116 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
118 let log = to_gelf_event(event.into_log())?;
120 serde_json::to_value(&log).map_err(|e| e.to_string().into())
121 }
122
123 pub fn chunker(&self) -> GelfChunker {
125 GelfChunker {
126 max_chunk_size: self.options.max_chunk_size,
127 }
128 }
129}
130
131impl Encoder<Event> for GelfSerializer {
132 type Error = vector_common::Error;
133
134 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
135 let log = to_gelf_event(event.into_log())?;
136 let writer = buffer.writer();
137 serde_json::to_writer(writer, &log)?;
138 Ok(())
139 }
140}
141
142fn err_invalid_type(
144 field: &str,
145 expected_type: &str,
146 actual_type: &str,
147) -> vector_common::Result<()> {
148 InvalidValueTypeSnafu {
149 field,
150 actual_type,
151 expected_type,
152 }
153 .fail()
154 .map_err(|e| e.to_string().into())
155}
156
157fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent> {
159 fn err_missing_field(field: &str) -> vector_common::Result<()> {
161 MissingFieldSnafu { field }
162 .fail()
163 .map_err(|e| e.to_string().into())
164 }
165
166 if !log.contains(&GELF_TARGET_PATHS.version) {
168 log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
169 }
170
171 if !log.contains(&GELF_TARGET_PATHS.host) {
172 err_missing_field(HOST)?;
173 }
174
175 if !log.contains(&GELF_TARGET_PATHS.short_message)
176 && let Some(message_key) = log_schema().message_key_target_path()
177 {
178 if log.contains(message_key) {
179 log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
180 } else {
181 err_missing_field(SHORT_MESSAGE)?;
182 }
183 }
184 Ok(log)
185}
186
187fn coerce_field_names_and_values(
189 mut log: LogEvent,
190) -> vector_common::Result<(LogEvent, Vec<String>)> {
191 let mut missing_prefix = vec![];
192 if let Some(event_data) = log.as_map_mut() {
193 for (field, value) in event_data.iter_mut() {
194 match field.as_str() {
195 VERSION | HOST | SHORT_MESSAGE | FULL_MESSAGE | FACILITY | FILE => {
196 if !value.is_bytes() {
197 err_invalid_type(field, "UTF-8 string", value.kind_str())?;
198 }
199 }
200 TIMESTAMP => {
201 if !(value.is_timestamp() || value.is_integer()) {
202 err_invalid_type(field, "timestamp or integer", value.kind_str())?;
203 }
204
205 if let Value::Timestamp(ts) = value {
207 let ts_millis = ts.timestamp_millis();
208 if ts_millis % 1000 != 0 {
209 *value = Value::Float(NotNan::new(ts_millis as f64 / 1000.0).unwrap());
210 } else {
211 *value = Value::Integer(ts.timestamp())
214 }
215 }
216 }
217 LEVEL => {
218 if !value.is_integer() {
219 err_invalid_type(field, "integer", value.kind_str())?;
220 }
221 }
222 LINE => {
223 if !(value.is_float() || value.is_integer()) {
224 err_invalid_type(field, "number", value.kind_str())?;
225 }
226 }
227 _ => {
228 if !VALID_FIELD_REGEX.is_match(field) {
230 return InvalidFieldSnafu {
231 field: field.clone(),
232 pattern: VALID_FIELD_REGEX.to_string(),
233 }
234 .fail()
235 .map_err(|e| e.to_string().into());
236 }
237
238 if !(value.is_integer() || value.is_float() || value.is_bytes()) {
240 err_invalid_type(field, "string or number", value.kind_str())?;
241 }
242
243 if !field.is_empty() && !field.starts_with('_') {
247 missing_prefix.push(field.to_string());
249 }
250 }
251 }
252 }
253 }
254 Ok((log, missing_prefix))
255}
256
257fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
259 let log = coerce_required_fields(log).and_then(|log| {
260 coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
261 for field in missing_prefix {
263 log.rename_key(
264 event_path!(field.as_str()),
265 event_path!(format!("_{}", &field).as_str()),
266 );
267 }
268 log
269 })
270 })?;
271
272 Ok(log)
273}
274
275#[cfg(test)]
276mod tests {
277 use chrono::NaiveDateTime;
278 use vector_core::event::{Event, EventMetadata};
279 use vrl::{
280 btreemap,
281 value::{ObjectMap, Value},
282 };
283
284 use super::*;
285 use crate::encoding::SerializerConfig;
286
287 fn do_serialize(expect_success: bool, event_fields: ObjectMap) -> Option<serde_json::Value> {
288 let config = GelfSerializerConfig::new(GelfSerializerOptions::default());
289 let mut serializer = config.build();
290 let event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
291 let mut buffer = BytesMut::new();
292
293 if expect_success {
294 assert!(serializer.encode(event, &mut buffer).is_ok());
295 let buffer_str = std::str::from_utf8(&buffer).unwrap();
296 let result = serde_json::from_str(buffer_str);
297 assert!(result.is_ok());
298 Some(result.unwrap())
299 } else {
300 assert!(serializer.encode(event, &mut buffer).is_err());
301 None
302 }
303 }
304
305 #[test]
306 fn gelf_serde_json_to_value_supported_success() {
307 let serializer = SerializerConfig::Gelf(Default::default()).build().unwrap();
308
309 let event_fields = btreemap! {
310 VERSION => "1.1",
311 HOST => "example.org",
312 SHORT_MESSAGE => "Some message",
313 };
314
315 let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
316 assert!(serializer.supports_json());
317 assert!(serializer.to_json_value(log_event).is_ok());
318 }
319
320 #[test]
321 fn gelf_serde_json_to_value_supported_failure_to_encode() {
322 let serializer = SerializerConfig::Gelf(Default::default()).build().unwrap();
323 let event_fields = btreemap! {};
324 let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
325 assert!(serializer.supports_json());
326 assert!(serializer.to_json_value(log_event).is_err());
327 }
328
329 #[test]
330 fn gelf_serializing_valid() {
331 let event_fields = btreemap! {
332 VERSION => "1.1",
333 HOST => "example.org",
334 SHORT_MESSAGE => "Some message",
335 FULL_MESSAGE => "Even more message",
336 FACILITY => "",
337 FILE => "/tmp/foobar",
338 LINE => Value::Float(ordered_float::NotNan::new(1.5).unwrap()),
339 LEVEL => 5,
340 };
341
342 let jsn = do_serialize(true, event_fields).unwrap();
343
344 assert_eq!(jsn.get(VERSION).unwrap(), "1.1");
345 assert_eq!(jsn.get(HOST).unwrap(), "example.org");
346 assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
347 }
348
349 #[test]
350 fn gelf_serializing_coerced() {
351 {
353 let event_fields = btreemap! {
354 VERSION => "1.1",
355 HOST => "example.org",
356 SHORT_MESSAGE => "Some message",
357 "noUnderScore" => 0,
358 };
359
360 let jsn = do_serialize(true, event_fields).unwrap();
361 assert_eq!(jsn.get("_noUnderScore").unwrap(), 0);
362 }
363
364 {
366 let event_fields = btreemap! {
367 VERSION => "1.1",
368 HOST => "example.org",
369 log_schema().message_key().unwrap().to_string() => "Some message",
370 };
371
372 let jsn = do_serialize(true, event_fields).unwrap();
373 assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
374 }
375 }
376
377 #[test]
378 fn gelf_serializing_timestamp() {
379 {
381 let naive_dt =
382 NaiveDateTime::parse_from_str("1970-01-01 00:00:00.1", "%Y-%m-%d %H:%M:%S%.f");
383 let dt = naive_dt.unwrap().and_utc();
384
385 let event_fields = btreemap! {
386 VERSION => "1.1",
387 SHORT_MESSAGE => "Some message",
388 HOST => "example.org",
389 TIMESTAMP => dt,
390 };
391
392 let jsn = do_serialize(true, event_fields).unwrap();
393 assert!(jsn.get(TIMESTAMP).unwrap().is_f64());
394 assert_eq!(jsn.get(TIMESTAMP).unwrap().as_f64().unwrap(), 0.1,);
395 }
396
397 {
399 let naive_dt =
400 NaiveDateTime::parse_from_str("1970-01-01 00:00:00.0", "%Y-%m-%d %H:%M:%S%.f");
401 let dt = naive_dt.unwrap().and_utc();
402
403 let event_fields = btreemap! {
404 VERSION => "1.1",
405 SHORT_MESSAGE => "Some message",
406 HOST => "example.org",
407 TIMESTAMP => dt,
408 };
409
410 let jsn = do_serialize(true, event_fields).unwrap();
411 assert!(jsn.get(TIMESTAMP).unwrap().is_i64());
412 assert_eq!(jsn.get(TIMESTAMP).unwrap().as_i64().unwrap(), 0);
413 }
414 }
415
416 #[test]
417 fn gelf_serializing_invalid_error() {
418 {
420 let event_fields = btreemap! {
421 VERSION => "1.1",
422 SHORT_MESSAGE => "Some message",
423 };
424 do_serialize(false, event_fields);
425 }
426 {
428 let event_fields = btreemap! {
429 HOST => "example.org",
430 VERSION => "1.1",
431 };
432 do_serialize(false, event_fields);
433 }
434 {
436 let event_fields = btreemap! {
437 HOST => "example.org",
438 VERSION => "1.1",
439 SHORT_MESSAGE => 0,
440 };
441 do_serialize(false, event_fields);
442 }
443 {
445 let event_fields = btreemap! {
446 HOST => "example.org",
447 VERSION => "1.1",
448 SHORT_MESSAGE => "Some message",
449 LEVEL => "1",
450 };
451 do_serialize(false, event_fields);
452 }
453 {
455 let event_fields = btreemap! {
456 HOST => "example.org",
457 VERSION => "1.1",
458 SHORT_MESSAGE => "Some message",
459 LINE => "1.2",
460 };
461 do_serialize(false, event_fields);
462 }
463 {
465 let event_fields = btreemap! {
466 HOST => "example.org",
467 VERSION => "1.1",
468 SHORT_MESSAGE => "Some message",
469 "invalid%field" => "foo",
470 };
471 do_serialize(false, event_fields);
472 }
473 {
475 let event_fields = btreemap! {
476 HOST => "example.org",
477 VERSION => "1.1",
478 SHORT_MESSAGE => "Some message",
479 "_foobar" => false,
480 };
481 do_serialize(false, event_fields);
482 }
483 {
485 let event_fields = btreemap! {
486 HOST => "example.org",
487 VERSION => "1.1",
488 SHORT_MESSAGE => "Some message",
489 "_foobar" => serde_json::Value::Null,
490 };
491 do_serialize(false, event_fields);
492 }
493 }
494}