1use bytes::Bytes;
2use chrono::{DateTime, Datelike, Utc};
3use derivative::Derivative;
4use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath};
5use smallvec::{smallvec, SmallVec};
6use std::borrow::Cow;
7use syslog_loose::{IncompleteDate, Message, ProcId, Protocol, Variant};
8use vector_config::configurable_component;
9use vector_core::config::{LegacyKey, LogNamespace};
10use vector_core::{
11 config::{log_schema, DataType},
12 event::{Event, LogEvent, ObjectMap, Value},
13 schema,
14};
15use vrl::value::{kind::Collection, Kind};
16
17use super::{default_lossy, Deserializer};
18
19#[configurable_component]
21#[derive(Debug, Clone, Default)]
22pub struct SyslogDeserializerConfig {
23 #[serde(skip)]
24 source: Option<&'static str>,
25
26 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
28 pub syslog: SyslogDeserializerOptions,
29}
30
31impl SyslogDeserializerConfig {
32 pub fn new(options: SyslogDeserializerOptions) -> Self {
34 Self {
35 source: None,
36 syslog: options,
37 }
38 }
39
40 pub fn from_source(source: &'static str) -> Self {
42 Self {
43 source: Some(source),
44 ..Default::default()
45 }
46 }
47
48 pub const fn build(&self) -> SyslogDeserializer {
50 SyslogDeserializer {
51 source: self.source,
52 lossy: self.syslog.lossy,
53 }
54 }
55
56 pub fn output_type(&self) -> DataType {
58 DataType::Log
59 }
60
61 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
63 match (log_namespace, self.source) {
64 (LogNamespace::Legacy, _) => {
65 let mut definition = schema::Definition::empty_legacy_namespace()
66 .with_event_field(
69 log_schema().message_key().expect("valid message key"),
70 Kind::bytes(),
71 Some("message"),
72 );
73
74 if let Some(timestamp_key) = log_schema().timestamp_key() {
75 definition = definition.optional_field(
77 timestamp_key,
78 Kind::timestamp(),
79 Some("timestamp"),
80 )
81 }
82
83 definition = definition
84 .optional_field(&owned_value_path!("hostname"), Kind::bytes(), Some("host"))
85 .optional_field(
86 &owned_value_path!("severity"),
87 Kind::bytes(),
88 Some("severity"),
89 )
90 .optional_field(&owned_value_path!("facility"), Kind::bytes(), None)
91 .optional_field(&owned_value_path!("version"), Kind::integer(), None)
92 .optional_field(
93 &owned_value_path!("appname"),
94 Kind::bytes(),
95 Some("service"),
96 )
97 .optional_field(&owned_value_path!("msgid"), Kind::bytes(), None)
98 .optional_field(
99 &owned_value_path!("procid"),
100 Kind::integer().or_bytes(),
101 None,
102 )
103 .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())));
105
106 if self.source.is_some() {
107 definition.optional_field(&owned_value_path!("source_ip"), Kind::bytes(), None)
110 } else {
111 definition
112 }
113 }
114 (LogNamespace::Vector, None) => {
115 schema::Definition::new_with_default_metadata(
116 Kind::object(Collection::empty()),
117 [log_namespace],
118 )
119 .with_event_field(
120 &owned_value_path!("message"),
121 Kind::bytes(),
122 Some("message"),
123 )
124 .optional_field(
125 &owned_value_path!("timestamp"),
126 Kind::timestamp(),
127 Some("timestamp"),
128 )
129 .optional_field(&owned_value_path!("hostname"), Kind::bytes(), Some("host"))
130 .optional_field(
131 &owned_value_path!("severity"),
132 Kind::bytes(),
133 Some("severity"),
134 )
135 .optional_field(&owned_value_path!("facility"), Kind::bytes(), None)
136 .optional_field(&owned_value_path!("version"), Kind::integer(), None)
137 .optional_field(
138 &owned_value_path!("appname"),
139 Kind::bytes(),
140 Some("service"),
141 )
142 .optional_field(&owned_value_path!("msgid"), Kind::bytes(), None)
143 .optional_field(
144 &owned_value_path!("procid"),
145 Kind::integer().or_bytes(),
146 None,
147 )
148 .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
150 }
151 (LogNamespace::Vector, Some(source)) => {
152 schema::Definition::new_with_default_metadata(Kind::bytes(), [log_namespace])
153 .with_meaning(OwnedTargetPath::event_root(), "message")
154 .with_source_metadata(
155 source,
156 None,
157 &owned_value_path!("timestamp"),
158 Kind::timestamp(),
159 Some("timestamp"),
160 )
161 .with_source_metadata(
162 source,
163 None,
164 &owned_value_path!("hostname"),
165 Kind::bytes().or_undefined(),
166 Some("host"),
167 )
168 .with_source_metadata(
169 source,
170 None,
171 &owned_value_path!("source_ip"),
172 Kind::bytes().or_undefined(),
173 None,
174 )
175 .with_source_metadata(
176 source,
177 None,
178 &owned_value_path!("severity"),
179 Kind::bytes().or_undefined(),
180 Some("severity"),
181 )
182 .with_source_metadata(
183 source,
184 None,
185 &owned_value_path!("facility"),
186 Kind::bytes().or_undefined(),
187 None,
188 )
189 .with_source_metadata(
190 source,
191 None,
192 &owned_value_path!("version"),
193 Kind::integer().or_undefined(),
194 None,
195 )
196 .with_source_metadata(
197 source,
198 None,
199 &owned_value_path!("appname"),
200 Kind::bytes().or_undefined(),
201 Some("service"),
202 )
203 .with_source_metadata(
204 source,
205 None,
206 &owned_value_path!("msgid"),
207 Kind::bytes().or_undefined(),
208 None,
209 )
210 .with_source_metadata(
211 source,
212 None,
213 &owned_value_path!("procid"),
214 Kind::integer().or_bytes().or_undefined(),
215 None,
216 )
217 .with_source_metadata(
218 source,
219 None,
220 &owned_value_path!("structured_data"),
221 Kind::object(Collection::from_unknown(Kind::object(
222 Collection::from_unknown(Kind::bytes()),
223 ))),
224 None,
225 )
226 .with_source_metadata(
227 source,
228 None,
229 &owned_value_path!("tls_client_metadata"),
230 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
231 .or_undefined(),
232 None,
233 )
234 }
235 }
236 }
237}
238
239#[configurable_component]
241#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
242#[derivative(Default)]
243pub struct SyslogDeserializerOptions {
244 #[serde(
250 default = "default_lossy",
251 skip_serializing_if = "vector_core::serde::is_default"
252 )]
253 #[derivative(Default(value = "default_lossy()"))]
254 pub lossy: bool,
255}
256
257#[derive(Debug, Clone, Derivative)]
260#[derivative(Default)]
261pub struct SyslogDeserializer {
262 pub source: Option<&'static str>,
266 #[derivative(Default(value = "default_lossy()"))]
267 lossy: bool,
268}
269
270impl Deserializer for SyslogDeserializer {
271 fn parse(
272 &self,
273 bytes: Bytes,
274 log_namespace: LogNamespace,
275 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
276 let line: Cow<str> = match self.lossy {
277 true => String::from_utf8_lossy(&bytes),
278 false => Cow::from(std::str::from_utf8(&bytes)?),
279 };
280 let line = line.trim();
281 let parsed =
282 syslog_loose::parse_message_with_year_exact(line, resolve_year, Variant::Either)?;
283
284 let log = match (self.source, log_namespace) {
285 (Some(source), LogNamespace::Vector) => {
286 let mut log = LogEvent::from(Value::Bytes(Bytes::from(parsed.msg.to_string())));
287 insert_metadata_fields_from_syslog(&mut log, source, parsed, log_namespace);
288 log
289 }
290 _ => {
291 let mut log = LogEvent::from(Value::Object(ObjectMap::new()));
292 insert_fields_from_syslog(&mut log, parsed, log_namespace);
293 log
294 }
295 };
296
297 Ok(smallvec![Event::from(log)])
298 }
299}
300
301fn resolve_year((month, _date, _hour, _min, _sec): IncompleteDate) -> i32 {
309 let now = Utc::now();
310 if now.month() == 1 && month == 12 {
311 now.year() - 1
312 } else {
313 now.year()
314 }
315}
316
317fn insert_metadata_fields_from_syslog(
318 log: &mut LogEvent,
319 source: &'static str,
320 parsed: Message<&str>,
321 log_namespace: LogNamespace,
322) {
323 if let Some(timestamp) = parsed.timestamp {
324 let timestamp = DateTime::<Utc>::from(timestamp);
325 log_namespace.insert_source_metadata(
326 source,
327 log,
328 None::<LegacyKey<&OwnedValuePath>>,
329 &owned_value_path!("timestamp"),
330 timestamp,
331 );
332 }
333 if let Some(host) = parsed.hostname {
334 log_namespace.insert_source_metadata(
335 source,
336 log,
337 None::<LegacyKey<&OwnedValuePath>>,
338 &owned_value_path!("hostname"),
339 host.to_string(),
340 );
341 }
342 if let Some(severity) = parsed.severity {
343 log_namespace.insert_source_metadata(
344 source,
345 log,
346 None::<LegacyKey<&OwnedValuePath>>,
347 &owned_value_path!("severity"),
348 severity.as_str().to_owned(),
349 );
350 }
351 if let Some(facility) = parsed.facility {
352 log_namespace.insert_source_metadata(
353 source,
354 log,
355 None::<LegacyKey<&OwnedValuePath>>,
356 &owned_value_path!("facility"),
357 facility.as_str().to_owned(),
358 );
359 }
360 if let Protocol::RFC5424(version) = parsed.protocol {
361 log_namespace.insert_source_metadata(
362 source,
363 log,
364 None::<LegacyKey<&OwnedValuePath>>,
365 &owned_value_path!("version"),
366 version as i64,
367 );
368 }
369 if let Some(app_name) = parsed.appname {
370 log_namespace.insert_source_metadata(
371 source,
372 log,
373 None::<LegacyKey<&OwnedValuePath>>,
374 &owned_value_path!("appname"),
375 app_name.to_owned(),
376 );
377 }
378 if let Some(msg_id) = parsed.msgid {
379 log_namespace.insert_source_metadata(
380 source,
381 log,
382 None::<LegacyKey<&OwnedValuePath>>,
383 &owned_value_path!("msgid"),
384 msg_id.to_owned(),
385 );
386 }
387 if let Some(procid) = parsed.procid {
388 let value: Value = match procid {
389 ProcId::PID(pid) => pid.into(),
390 ProcId::Name(name) => name.to_string().into(),
391 };
392 log_namespace.insert_source_metadata(
393 source,
394 log,
395 None::<LegacyKey<&OwnedValuePath>>,
396 &owned_value_path!("procid"),
397 value,
398 );
399 }
400
401 let mut sdata = ObjectMap::new();
402 for element in parsed.structured_data.into_iter() {
403 let mut data = ObjectMap::new();
404
405 for (name, value) in element.params() {
406 data.insert(name.to_string().into(), value.into());
407 }
408
409 sdata.insert(element.id.into(), data.into());
410 }
411
412 log_namespace.insert_source_metadata(
413 source,
414 log,
415 None::<LegacyKey<&OwnedValuePath>>,
416 &owned_value_path!("structured_data"),
417 sdata,
418 );
419}
420
421fn insert_fields_from_syslog(
422 log: &mut LogEvent,
423 parsed: Message<&str>,
424 log_namespace: LogNamespace,
425) {
426 match log_namespace {
427 LogNamespace::Legacy => {
428 log.maybe_insert(log_schema().message_key_target_path(), parsed.msg);
429 }
430 LogNamespace::Vector => {
431 log.insert(event_path!("message"), parsed.msg);
432 }
433 }
434
435 if let Some(timestamp) = parsed.timestamp {
436 let timestamp = DateTime::<Utc>::from(timestamp);
437 match log_namespace {
438 LogNamespace::Legacy => {
439 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
440 }
441 LogNamespace::Vector => {
442 log.insert(event_path!("timestamp"), timestamp);
443 }
444 };
445 }
446 if let Some(host) = parsed.hostname {
447 log.insert(event_path!("hostname"), host.to_string());
448 }
449 if let Some(severity) = parsed.severity {
450 log.insert(event_path!("severity"), severity.as_str().to_owned());
451 }
452 if let Some(facility) = parsed.facility {
453 log.insert(event_path!("facility"), facility.as_str().to_owned());
454 }
455 if let Protocol::RFC5424(version) = parsed.protocol {
456 log.insert(event_path!("version"), version as i64);
457 }
458 if let Some(app_name) = parsed.appname {
459 log.insert(event_path!("appname"), app_name.to_owned());
460 }
461 if let Some(msg_id) = parsed.msgid {
462 log.insert(event_path!("msgid"), msg_id.to_owned());
463 }
464 if let Some(procid) = parsed.procid {
465 let value: Value = match procid {
466 ProcId::PID(pid) => pid.into(),
467 ProcId::Name(name) => name.to_string().into(),
468 };
469 log.insert(event_path!("procid"), value);
470 }
471
472 for element in parsed.structured_data.into_iter() {
473 let mut sdata = ObjectMap::new();
474 for (name, value) in element.params() {
475 sdata.insert(name.to_string().into(), value.into());
476 }
477 log.insert(event_path!(element.id), sdata);
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use vector_core::config::{init_log_schema, log_schema, LogSchema};
485
486 #[test]
487 fn deserialize_syslog_legacy_namespace() {
488 init();
489
490 let input =
491 Bytes::from("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - MSG");
492 let deserializer = SyslogDeserializer::default();
493
494 let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
495 assert_eq!(events.len(), 1);
496 assert_eq!(
497 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
498 "MSG".into()
499 );
500 assert!(
501 events[0].as_log()[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()
502 );
503 }
504
505 #[test]
506 fn deserialize_syslog_vector_namespace() {
507 init();
508
509 let input =
510 Bytes::from("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - MSG");
511 let deserializer = SyslogDeserializer::default();
512
513 let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
514 assert_eq!(events.len(), 1);
515 assert_eq!(events[0].as_log()["message"], "MSG".into());
516 assert!(events[0].as_log()["timestamp"].is_timestamp());
517 }
518
519 fn init() {
520 let mut schema = LogSchema::default();
521 schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
522 "legacy_message"
523 ))));
524 schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
525 "legacy_timestamp"
526 ))));
527 init_log_schema(schema, false);
528 }
529}