codecs/decoding/format/
syslog.rs

1use bytes::Bytes;
2use chrono::{DateTime, Datelike, Utc};
3use derivative::Derivative;
4use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath};
5use smallvec::{smallvec, SmallVec};
6use std::borrow::Cow;
7use syslog_loose::{IncompleteDate, Message, ProcId, Protocol, Variant};
8use vector_config::configurable_component;
9use vector_core::config::{LegacyKey, LogNamespace};
10use vector_core::{
11    config::{log_schema, DataType},
12    event::{Event, LogEvent, ObjectMap, Value},
13    schema,
14};
15use vrl::value::{kind::Collection, Kind};
16
17use super::{default_lossy, Deserializer};
18
19/// Config used to build a `SyslogDeserializer`.
20#[configurable_component]
21#[derive(Debug, Clone, Default)]
22pub struct SyslogDeserializerConfig {
23    #[serde(skip)]
24    source: Option<&'static str>,
25
26    /// Syslog-specific decoding options.
27    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
28    pub syslog: SyslogDeserializerOptions,
29}
30
31impl SyslogDeserializerConfig {
32    /// Creates a new `SyslogDeserializerConfig`.
33    pub fn new(options: SyslogDeserializerOptions) -> Self {
34        Self {
35            source: None,
36            syslog: options,
37        }
38    }
39
40    /// Create the `SyslogDeserializer` from the given source name.
41    pub fn from_source(source: &'static str) -> Self {
42        Self {
43            source: Some(source),
44            ..Default::default()
45        }
46    }
47
48    /// Build the `SyslogDeserializer` from this configuration.
49    pub const fn build(&self) -> SyslogDeserializer {
50        SyslogDeserializer {
51            source: self.source,
52            lossy: self.syslog.lossy,
53        }
54    }
55
56    /// Return the type of event build by this deserializer.
57    pub fn output_type(&self) -> DataType {
58        DataType::Log
59    }
60
61    /// The schema produced by the deserializer.
62    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
63        match (log_namespace, self.source) {
64            (LogNamespace::Legacy, _) => {
65                let mut definition = schema::Definition::empty_legacy_namespace()
66                    // The `message` field is always defined. If parsing fails, the entire body becomes the
67                    // message.
68                    .with_event_field(
69                        log_schema().message_key().expect("valid message key"),
70                        Kind::bytes(),
71                        Some("message"),
72                    );
73
74                if let Some(timestamp_key) = log_schema().timestamp_key() {
75                    // All other fields are optional.
76                    definition = definition.optional_field(
77                        timestamp_key,
78                        Kind::timestamp(),
79                        Some("timestamp"),
80                    )
81                }
82
83                definition = definition
84                    .optional_field(&owned_value_path!("hostname"), Kind::bytes(), Some("host"))
85                    .optional_field(
86                        &owned_value_path!("severity"),
87                        Kind::bytes(),
88                        Some("severity"),
89                    )
90                    .optional_field(&owned_value_path!("facility"), Kind::bytes(), None)
91                    .optional_field(&owned_value_path!("version"), Kind::integer(), None)
92                    .optional_field(
93                        &owned_value_path!("appname"),
94                        Kind::bytes(),
95                        Some("service"),
96                    )
97                    .optional_field(&owned_value_path!("msgid"), Kind::bytes(), None)
98                    .optional_field(
99                        &owned_value_path!("procid"),
100                        Kind::integer().or_bytes(),
101                        None,
102                    )
103                    // "structured data" is placed at the root. It will always be a map of strings
104                    .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())));
105
106                if self.source.is_some() {
107                    // This field is added by the syslog source. It will not be present if the data
108                    // is coming from the codec.
109                    definition.optional_field(&owned_value_path!("source_ip"), Kind::bytes(), None)
110                } else {
111                    definition
112                }
113            }
114            (LogNamespace::Vector, None) => {
115                schema::Definition::new_with_default_metadata(
116                    Kind::object(Collection::empty()),
117                    [log_namespace],
118                )
119                .with_event_field(
120                    &owned_value_path!("message"),
121                    Kind::bytes(),
122                    Some("message"),
123                )
124                .optional_field(
125                    &owned_value_path!("timestamp"),
126                    Kind::timestamp(),
127                    Some("timestamp"),
128                )
129                .optional_field(&owned_value_path!("hostname"), Kind::bytes(), Some("host"))
130                .optional_field(
131                    &owned_value_path!("severity"),
132                    Kind::bytes(),
133                    Some("severity"),
134                )
135                .optional_field(&owned_value_path!("facility"), Kind::bytes(), None)
136                .optional_field(&owned_value_path!("version"), Kind::integer(), None)
137                .optional_field(
138                    &owned_value_path!("appname"),
139                    Kind::bytes(),
140                    Some("service"),
141                )
142                .optional_field(&owned_value_path!("msgid"), Kind::bytes(), None)
143                .optional_field(
144                    &owned_value_path!("procid"),
145                    Kind::integer().or_bytes(),
146                    None,
147                )
148                // "structured data" is placed at the root. It will always be a map strings
149                .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
150            }
151            (LogNamespace::Vector, Some(source)) => {
152                schema::Definition::new_with_default_metadata(Kind::bytes(), [log_namespace])
153                    .with_meaning(OwnedTargetPath::event_root(), "message")
154                    .with_source_metadata(
155                        source,
156                        None,
157                        &owned_value_path!("timestamp"),
158                        Kind::timestamp(),
159                        Some("timestamp"),
160                    )
161                    .with_source_metadata(
162                        source,
163                        None,
164                        &owned_value_path!("hostname"),
165                        Kind::bytes().or_undefined(),
166                        Some("host"),
167                    )
168                    .with_source_metadata(
169                        source,
170                        None,
171                        &owned_value_path!("source_ip"),
172                        Kind::bytes().or_undefined(),
173                        None,
174                    )
175                    .with_source_metadata(
176                        source,
177                        None,
178                        &owned_value_path!("severity"),
179                        Kind::bytes().or_undefined(),
180                        Some("severity"),
181                    )
182                    .with_source_metadata(
183                        source,
184                        None,
185                        &owned_value_path!("facility"),
186                        Kind::bytes().or_undefined(),
187                        None,
188                    )
189                    .with_source_metadata(
190                        source,
191                        None,
192                        &owned_value_path!("version"),
193                        Kind::integer().or_undefined(),
194                        None,
195                    )
196                    .with_source_metadata(
197                        source,
198                        None,
199                        &owned_value_path!("appname"),
200                        Kind::bytes().or_undefined(),
201                        Some("service"),
202                    )
203                    .with_source_metadata(
204                        source,
205                        None,
206                        &owned_value_path!("msgid"),
207                        Kind::bytes().or_undefined(),
208                        None,
209                    )
210                    .with_source_metadata(
211                        source,
212                        None,
213                        &owned_value_path!("procid"),
214                        Kind::integer().or_bytes().or_undefined(),
215                        None,
216                    )
217                    .with_source_metadata(
218                        source,
219                        None,
220                        &owned_value_path!("structured_data"),
221                        Kind::object(Collection::from_unknown(Kind::object(
222                            Collection::from_unknown(Kind::bytes()),
223                        ))),
224                        None,
225                    )
226                    .with_source_metadata(
227                        source,
228                        None,
229                        &owned_value_path!("tls_client_metadata"),
230                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
231                            .or_undefined(),
232                        None,
233                    )
234            }
235        }
236    }
237}
238
239/// Syslog-specific decoding options.
240#[configurable_component]
241#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
242#[derivative(Default)]
243pub struct SyslogDeserializerOptions {
244    /// Determines whether to replace invalid UTF-8 sequences instead of failing.
245    ///
246    /// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
247    ///
248    /// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
249    #[serde(
250        default = "default_lossy",
251        skip_serializing_if = "vector_core::serde::is_default"
252    )]
253    #[derivative(Default(value = "default_lossy()"))]
254    pub lossy: bool,
255}
256
257/// Deserializer that builds an `Event` from a byte frame containing a syslog
258/// message.
259#[derive(Debug, Clone, Derivative)]
260#[derivative(Default)]
261pub struct SyslogDeserializer {
262    /// The syslog source needs it's own syslog deserializer separate from the
263    /// syslog codec since it needs to handle the structured of the decoded data
264    /// differently when using the Vector lognamespace.
265    pub source: Option<&'static str>,
266    #[derivative(Default(value = "default_lossy()"))]
267    lossy: bool,
268}
269
270impl Deserializer for SyslogDeserializer {
271    fn parse(
272        &self,
273        bytes: Bytes,
274        log_namespace: LogNamespace,
275    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
276        let line: Cow<str> = match self.lossy {
277            true => String::from_utf8_lossy(&bytes),
278            false => Cow::from(std::str::from_utf8(&bytes)?),
279        };
280        let line = line.trim();
281        let parsed =
282            syslog_loose::parse_message_with_year_exact(line, resolve_year, Variant::Either)?;
283
284        let log = match (self.source, log_namespace) {
285            (Some(source), LogNamespace::Vector) => {
286                let mut log = LogEvent::from(Value::Bytes(Bytes::from(parsed.msg.to_string())));
287                insert_metadata_fields_from_syslog(&mut log, source, parsed, log_namespace);
288                log
289            }
290            _ => {
291                let mut log = LogEvent::from(Value::Object(ObjectMap::new()));
292                insert_fields_from_syslog(&mut log, parsed, log_namespace);
293                log
294            }
295        };
296
297        Ok(smallvec![Event::from(log)])
298    }
299}
300
301/// Function used to resolve the year for syslog messages that don't include the
302/// year.
303///
304/// If the current month is January, and the syslog message is for December, it
305/// will take the previous year.
306///
307/// Otherwise, take the current year.
308fn resolve_year((month, _date, _hour, _min, _sec): IncompleteDate) -> i32 {
309    let now = Utc::now();
310    if now.month() == 1 && month == 12 {
311        now.year() - 1
312    } else {
313        now.year()
314    }
315}
316
317fn insert_metadata_fields_from_syslog(
318    log: &mut LogEvent,
319    source: &'static str,
320    parsed: Message<&str>,
321    log_namespace: LogNamespace,
322) {
323    if let Some(timestamp) = parsed.timestamp {
324        let timestamp = DateTime::<Utc>::from(timestamp);
325        log_namespace.insert_source_metadata(
326            source,
327            log,
328            None::<LegacyKey<&OwnedValuePath>>,
329            &owned_value_path!("timestamp"),
330            timestamp,
331        );
332    }
333    if let Some(host) = parsed.hostname {
334        log_namespace.insert_source_metadata(
335            source,
336            log,
337            None::<LegacyKey<&OwnedValuePath>>,
338            &owned_value_path!("hostname"),
339            host.to_string(),
340        );
341    }
342    if let Some(severity) = parsed.severity {
343        log_namespace.insert_source_metadata(
344            source,
345            log,
346            None::<LegacyKey<&OwnedValuePath>>,
347            &owned_value_path!("severity"),
348            severity.as_str().to_owned(),
349        );
350    }
351    if let Some(facility) = parsed.facility {
352        log_namespace.insert_source_metadata(
353            source,
354            log,
355            None::<LegacyKey<&OwnedValuePath>>,
356            &owned_value_path!("facility"),
357            facility.as_str().to_owned(),
358        );
359    }
360    if let Protocol::RFC5424(version) = parsed.protocol {
361        log_namespace.insert_source_metadata(
362            source,
363            log,
364            None::<LegacyKey<&OwnedValuePath>>,
365            &owned_value_path!("version"),
366            version as i64,
367        );
368    }
369    if let Some(app_name) = parsed.appname {
370        log_namespace.insert_source_metadata(
371            source,
372            log,
373            None::<LegacyKey<&OwnedValuePath>>,
374            &owned_value_path!("appname"),
375            app_name.to_owned(),
376        );
377    }
378    if let Some(msg_id) = parsed.msgid {
379        log_namespace.insert_source_metadata(
380            source,
381            log,
382            None::<LegacyKey<&OwnedValuePath>>,
383            &owned_value_path!("msgid"),
384            msg_id.to_owned(),
385        );
386    }
387    if let Some(procid) = parsed.procid {
388        let value: Value = match procid {
389            ProcId::PID(pid) => pid.into(),
390            ProcId::Name(name) => name.to_string().into(),
391        };
392        log_namespace.insert_source_metadata(
393            source,
394            log,
395            None::<LegacyKey<&OwnedValuePath>>,
396            &owned_value_path!("procid"),
397            value,
398        );
399    }
400
401    let mut sdata = ObjectMap::new();
402    for element in parsed.structured_data.into_iter() {
403        let mut data = ObjectMap::new();
404
405        for (name, value) in element.params() {
406            data.insert(name.to_string().into(), value.into());
407        }
408
409        sdata.insert(element.id.into(), data.into());
410    }
411
412    log_namespace.insert_source_metadata(
413        source,
414        log,
415        None::<LegacyKey<&OwnedValuePath>>,
416        &owned_value_path!("structured_data"),
417        sdata,
418    );
419}
420
421fn insert_fields_from_syslog(
422    log: &mut LogEvent,
423    parsed: Message<&str>,
424    log_namespace: LogNamespace,
425) {
426    match log_namespace {
427        LogNamespace::Legacy => {
428            log.maybe_insert(log_schema().message_key_target_path(), parsed.msg);
429        }
430        LogNamespace::Vector => {
431            log.insert(event_path!("message"), parsed.msg);
432        }
433    }
434
435    if let Some(timestamp) = parsed.timestamp {
436        let timestamp = DateTime::<Utc>::from(timestamp);
437        match log_namespace {
438            LogNamespace::Legacy => {
439                log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
440            }
441            LogNamespace::Vector => {
442                log.insert(event_path!("timestamp"), timestamp);
443            }
444        };
445    }
446    if let Some(host) = parsed.hostname {
447        log.insert(event_path!("hostname"), host.to_string());
448    }
449    if let Some(severity) = parsed.severity {
450        log.insert(event_path!("severity"), severity.as_str().to_owned());
451    }
452    if let Some(facility) = parsed.facility {
453        log.insert(event_path!("facility"), facility.as_str().to_owned());
454    }
455    if let Protocol::RFC5424(version) = parsed.protocol {
456        log.insert(event_path!("version"), version as i64);
457    }
458    if let Some(app_name) = parsed.appname {
459        log.insert(event_path!("appname"), app_name.to_owned());
460    }
461    if let Some(msg_id) = parsed.msgid {
462        log.insert(event_path!("msgid"), msg_id.to_owned());
463    }
464    if let Some(procid) = parsed.procid {
465        let value: Value = match procid {
466            ProcId::PID(pid) => pid.into(),
467            ProcId::Name(name) => name.to_string().into(),
468        };
469        log.insert(event_path!("procid"), value);
470    }
471
472    for element in parsed.structured_data.into_iter() {
473        let mut sdata = ObjectMap::new();
474        for (name, value) in element.params() {
475            sdata.insert(name.to_string().into(), value.into());
476        }
477        log.insert(event_path!(element.id), sdata);
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use vector_core::config::{init_log_schema, log_schema, LogSchema};
485
486    #[test]
487    fn deserialize_syslog_legacy_namespace() {
488        init();
489
490        let input =
491            Bytes::from("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - MSG");
492        let deserializer = SyslogDeserializer::default();
493
494        let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
495        assert_eq!(events.len(), 1);
496        assert_eq!(
497            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
498            "MSG".into()
499        );
500        assert!(
501            events[0].as_log()[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()
502        );
503    }
504
505    #[test]
506    fn deserialize_syslog_vector_namespace() {
507        init();
508
509        let input =
510            Bytes::from("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - MSG");
511        let deserializer = SyslogDeserializer::default();
512
513        let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
514        assert_eq!(events.len(), 1);
515        assert_eq!(events[0].as_log()["message"], "MSG".into());
516        assert!(events[0].as_log()["timestamp"].is_timestamp());
517    }
518
519    fn init() {
520        let mut schema = LogSchema::default();
521        schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
522            "legacy_message"
523        ))));
524        schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
525            "legacy_timestamp"
526        ))));
527        init_log_schema(schema, false);
528    }
529}