vector/sources/
syslog.rs

1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use chrono::Utc;
7use futures::StreamExt;
8use listenfd::ListenFd;
9use smallvec::SmallVec;
10use tokio_util::udp::UdpFramed;
11use vector_lib::{
12    codecs::{
13        BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
14        decoding::{Deserializer, Framer},
15    },
16    config::{LegacyKey, LogNamespace},
17    configurable::configurable_component,
18    ipallowlist::IpAllowlistConfig,
19    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, path},
20};
21use vrl::event_path;
22
23#[cfg(unix)]
24use crate::sources::util::build_unix_stream_source;
25use crate::{
26    SourceSender,
27    codecs::Decoder,
28    config::{
29        DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput, log_schema,
30    },
31    event::Event,
32    internal_events::{SocketBindError, SocketMode, SocketReceiveError, StreamClosedError},
33    net,
34    shutdown::ShutdownSignal,
35    sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket},
36    tcp::TcpKeepaliveConfig,
37    tls::{MaybeTlsSettings, TlsSourceConfig},
38};
39
40/// Configuration for the `syslog` source.
41#[configurable_component(source("syslog", "Collect logs sent via Syslog."))]
42#[derive(Clone, Debug)]
43pub struct SyslogConfig {
44    #[serde(flatten)]
45    mode: Mode,
46
47    /// The maximum buffer size of incoming messages, in bytes.
48    ///
49    /// Messages larger than this are truncated.
50    #[serde(default = "crate::serde::default_max_length")]
51    #[configurable(metadata(docs::type_unit = "bytes"))]
52    max_length: usize,
53
54    /// Overrides the name of the log field used to add the peer host to each event.
55    ///
56    /// If using TCP or UDP, the value is the peer host's address, including the port. For example, `1.2.3.4:9000`. If using
57    /// UDS, the value is the socket path itself.
58    ///
59    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
60    ///
61    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
62    host_key: Option<OptionalValuePath>,
63
64    /// The namespace to use for logs. This overrides the global setting.
65    #[configurable(metadata(docs::hidden))]
66    #[serde(default)]
67    pub log_namespace: Option<bool>,
68}
69
70/// Listener mode for the `syslog` source.
71#[configurable_component]
72#[derive(Clone, Debug)]
73#[serde(tag = "mode", rename_all = "snake_case")]
74#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
75#[allow(clippy::large_enum_variant)]
76pub enum Mode {
77    /// Listen on TCP.
78    Tcp {
79        #[configurable(derived)]
80        address: SocketListenAddr,
81
82        #[configurable(derived)]
83        keepalive: Option<TcpKeepaliveConfig>,
84
85        #[configurable(derived)]
86        permit_origin: Option<IpAllowlistConfig>,
87
88        #[configurable(derived)]
89        tls: Option<TlsSourceConfig>,
90
91        /// The size of the receive buffer used for each connection.
92        ///
93        /// This should not typically needed to be changed.
94        #[configurable(metadata(docs::type_unit = "bytes"))]
95        receive_buffer_bytes: Option<usize>,
96
97        /// The maximum number of TCP connections that are allowed at any given time.
98        connection_limit: Option<u32>,
99    },
100
101    /// Listen on UDP.
102    Udp {
103        #[configurable(derived)]
104        address: SocketListenAddr,
105
106        /// The size of the receive buffer used for the listening socket.
107        ///
108        /// This should not typically needed to be changed.
109        #[configurable(metadata(docs::type_unit = "bytes"))]
110        receive_buffer_bytes: Option<usize>,
111    },
112
113    /// Listen on UDS (Unix domain socket). This only supports Unix stream sockets.
114    ///
115    /// For Unix datagram sockets, use the `socket` source instead.
116    #[cfg(unix)]
117    Unix {
118        /// The Unix socket path.
119        ///
120        /// This should be an absolute path.
121        #[configurable(metadata(docs::examples = "/path/to/socket"))]
122        path: PathBuf,
123
124        /// Unix file mode bits to be applied to the unix socket file as its designated file permissions.
125        ///
126        /// The file mode value can be specified in any numeric format supported by your configuration
127        /// language, but it is most intuitive to use an octal number.
128        socket_file_mode: Option<u32>,
129    },
130}
131
132impl SyslogConfig {
133    #[cfg(test)]
134    pub fn from_mode(mode: Mode) -> Self {
135        Self {
136            mode,
137            host_key: None,
138            max_length: crate::serde::default_max_length(),
139            log_namespace: None,
140        }
141    }
142}
143
144impl Default for SyslogConfig {
145    fn default() -> Self {
146        Self {
147            mode: Mode::Tcp {
148                address: SocketListenAddr::SocketAddr("0.0.0.0:514".parse().unwrap()),
149                keepalive: None,
150                permit_origin: None,
151                tls: None,
152                receive_buffer_bytes: None,
153                connection_limit: None,
154            },
155            host_key: None,
156            max_length: crate::serde::default_max_length(),
157            log_namespace: None,
158        }
159    }
160}
161
162impl GenerateConfig for SyslogConfig {
163    fn generate_config() -> toml::Value {
164        toml::Value::try_from(SyslogConfig::default()).unwrap()
165    }
166}
167
168#[async_trait::async_trait]
169#[typetag::serde(name = "syslog")]
170impl SourceConfig for SyslogConfig {
171    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
172        let log_namespace = cx.log_namespace(self.log_namespace);
173        let host_key = self
174            .host_key
175            .clone()
176            .and_then(|k| k.path)
177            .or(log_schema().host_key().cloned());
178
179        match self.mode.clone() {
180            Mode::Tcp {
181                address,
182                keepalive,
183                permit_origin,
184                tls,
185                receive_buffer_bytes,
186                connection_limit,
187            } => {
188                let source = SyslogTcpSource {
189                    max_length: self.max_length,
190                    host_key,
191                    log_namespace,
192                };
193                let shutdown_secs = Duration::from_secs(30);
194                let tls_config = tls.as_ref().map(|tls| tls.tls_config.clone());
195                let tls_client_metadata_key = tls
196                    .as_ref()
197                    .and_then(|tls| tls.client_metadata_key.clone())
198                    .and_then(|k| k.path);
199                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
200                source.run(
201                    address,
202                    keepalive,
203                    shutdown_secs,
204                    tls,
205                    tls_client_metadata_key,
206                    receive_buffer_bytes,
207                    None,
208                    cx,
209                    false.into(),
210                    connection_limit,
211                    permit_origin.map(Into::into),
212                    SyslogConfig::NAME,
213                    log_namespace,
214                )
215            }
216            Mode::Udp {
217                address,
218                receive_buffer_bytes,
219            } => Ok(udp(
220                address,
221                self.max_length,
222                host_key,
223                receive_buffer_bytes,
224                cx.shutdown,
225                log_namespace,
226                cx.out,
227            )),
228            #[cfg(unix)]
229            Mode::Unix {
230                path,
231                socket_file_mode,
232            } => {
233                let decoder = Decoder::new(
234                    Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(
235                        self.max_length,
236                    )),
237                    Deserializer::Syslog(
238                        SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
239                    ),
240                );
241
242                build_unix_stream_source(
243                    path,
244                    socket_file_mode,
245                    decoder,
246                    move |events, host| handle_events(events, &host_key, host, log_namespace),
247                    cx.shutdown,
248                    cx.out,
249                )
250            }
251        }
252    }
253
254    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
255        let log_namespace = global_log_namespace.merge(self.log_namespace);
256        let schema_definition = SyslogDeserializerConfig::from_source(SyslogConfig::NAME)
257            .schema_definition(log_namespace)
258            .with_standard_vector_source_metadata();
259
260        vec![SourceOutput::new_maybe_logs(
261            DataType::Log,
262            schema_definition,
263        )]
264    }
265
266    fn resources(&self) -> Vec<Resource> {
267        match self.mode.clone() {
268            Mode::Tcp { address, .. } => vec![address.as_tcp_resource()],
269            Mode::Udp { address, .. } => vec![address.as_udp_resource()],
270            #[cfg(unix)]
271            Mode::Unix { .. } => vec![],
272        }
273    }
274
275    fn can_acknowledge(&self) -> bool {
276        false
277    }
278}
279
280#[derive(Debug, Clone)]
281struct SyslogTcpSource {
282    max_length: usize,
283    host_key: Option<OwnedValuePath>,
284    log_namespace: LogNamespace,
285}
286
287impl TcpSource for SyslogTcpSource {
288    type Error = vector_lib::codecs::decoding::Error;
289    type Item = SmallVec<[Event; 1]>;
290    type Decoder = Decoder;
291    type Acker = TcpNullAcker;
292
293    fn decoder(&self) -> Self::Decoder {
294        Decoder::new(
295            Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(self.max_length)),
296            Deserializer::Syslog(SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build()),
297        )
298    }
299
300    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
301        handle_events(
302            events,
303            &self.host_key,
304            Some(host.ip().to_string().into()),
305            self.log_namespace,
306        );
307    }
308
309    fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
310        TcpNullAcker
311    }
312}
313
314pub fn udp(
315    addr: SocketListenAddr,
316    _max_length: usize,
317    host_key: Option<OwnedValuePath>,
318    receive_buffer_bytes: Option<usize>,
319    shutdown: ShutdownSignal,
320    log_namespace: LogNamespace,
321    mut out: SourceSender,
322) -> super::Source {
323    Box::pin(async move {
324        let listenfd = ListenFd::from_env();
325        let socket = try_bind_udp_socket(addr, listenfd).await.map_err(|error| {
326            emit!(SocketBindError {
327                mode: SocketMode::Udp,
328                error: &error,
329            })
330        })?;
331
332        if let Some(receive_buffer_bytes) = receive_buffer_bytes
333            && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
334        {
335            warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
336        }
337
338        info!(
339            message = "Listening.",
340            addr = %addr,
341            r#type = "udp"
342        );
343
344        let mut stream = UdpFramed::new(
345            socket,
346            Decoder::new(
347                Framer::Bytes(BytesDecoder::new()),
348                Deserializer::Syslog(
349                    SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
350                ),
351            ),
352        )
353        .take_until(shutdown)
354        .filter_map(|frame| {
355            let host_key = host_key.clone();
356            async move {
357                match frame {
358                    Ok(((mut events, _byte_size), received_from)) => {
359                        let received_from = received_from.ip().to_string().into();
360                        handle_events(&mut events, &host_key, Some(received_from), log_namespace);
361                        Some(events.remove(0))
362                    }
363                    Err(error) => {
364                        emit!(SocketReceiveError {
365                            mode: SocketMode::Udp,
366                            error: &error,
367                        });
368                        None
369                    }
370                }
371            }
372        })
373        .boxed();
374
375        match out.send_event_stream(&mut stream).await {
376            Ok(()) => {
377                debug!("Finished sending.");
378                Ok(())
379            }
380            Err(_) => {
381                let (count, _) = stream.size_hint();
382                emit!(StreamClosedError { count });
383                Err(())
384            }
385        }
386    })
387}
388
389fn handle_events(
390    events: &mut [Event],
391    host_key: &Option<OwnedValuePath>,
392    default_host: Option<Bytes>,
393    log_namespace: LogNamespace,
394) {
395    for event in events {
396        enrich_syslog_event(event, host_key, default_host.clone(), log_namespace);
397    }
398}
399
400fn enrich_syslog_event(
401    event: &mut Event,
402    host_key: &Option<OwnedValuePath>,
403    default_host: Option<Bytes>,
404    log_namespace: LogNamespace,
405) {
406    let log = event.as_mut_log();
407
408    if let Some(default_host) = &default_host {
409        log_namespace.insert_source_metadata(
410            SyslogConfig::NAME,
411            log,
412            Some(LegacyKey::Overwrite(path!("source_ip"))),
413            path!("source_ip"),
414            default_host.clone(),
415        );
416    }
417
418    let parsed_hostname = log
419        .get(event_path!("hostname"))
420        .map(|hostname| hostname.coerce_to_bytes());
421
422    if let Some(parsed_host) = parsed_hostname.or(default_host) {
423        let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
424
425        log_namespace.insert_source_metadata(
426            SyslogConfig::NAME,
427            log,
428            legacy_host_key,
429            path!("host"),
430            parsed_host,
431        );
432    }
433
434    log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now());
435
436    if log_namespace == LogNamespace::Legacy {
437        let timestamp = log
438            .get(event_path!("timestamp"))
439            .and_then(|timestamp| timestamp.as_timestamp().cloned())
440            .unwrap_or_else(Utc::now);
441        log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
442    }
443
444    trace!(
445        message = "Processing one event.",
446        event = ?event
447    );
448}
449
450#[cfg(test)]
451mod test {
452    use std::{collections::HashMap, fmt, str::FromStr};
453
454    use chrono::prelude::*;
455    use rand::{Rng, rng};
456    use serde::Deserialize;
457    use tokio::time::{Duration, Instant, sleep};
458    use tokio_util::codec::BytesCodec;
459    use vector_lib::{
460        assert_event_data_eq,
461        codecs::decoding::format::Deserializer,
462        config::ComponentKey,
463        lookup::{OwnedTargetPath, PathPrefix, event_path, owned_value_path},
464        schema::Definition,
465    };
466    use vrl::value::{Kind, ObjectMap, Value, kind::Collection};
467
468    use super::*;
469    use crate::{
470        config::log_schema,
471        event::{Event, LogEvent},
472        test_util::{
473            CountReceiver,
474            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
475            next_addr, random_maps, random_string, send_encodable, send_lines, wait_for_tcp,
476        },
477    };
478
479    fn event_from_bytes(
480        host_key: &str,
481        default_host: Option<Bytes>,
482        bytes: Bytes,
483        log_namespace: LogNamespace,
484    ) -> Option<Event> {
485        let parser = SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build();
486        let mut events = parser.parse(bytes, LogNamespace::Legacy).ok()?;
487        handle_events(
488            &mut events,
489            &Some(owned_value_path!(host_key)),
490            default_host,
491            log_namespace,
492        );
493        Some(events.remove(0))
494    }
495
496    #[test]
497    fn generate_config() {
498        crate::test_util::test_generate_config::<SyslogConfig>();
499    }
500
501    #[test]
502    fn output_schema_definition_vector_namespace() {
503        let config = SyslogConfig {
504            log_namespace: Some(true),
505            ..Default::default()
506        };
507
508        let definitions = config
509            .outputs(LogNamespace::Vector)
510            .remove(0)
511            .schema_definition(true);
512
513        let expected_definition =
514            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
515                .with_meaning(OwnedTargetPath::event_root(), "message")
516                .with_metadata_field(
517                    &owned_value_path!("vector", "source_type"),
518                    Kind::bytes(),
519                    None,
520                )
521                .with_metadata_field(
522                    &owned_value_path!("vector", "ingest_timestamp"),
523                    Kind::timestamp(),
524                    None,
525                )
526                .with_metadata_field(
527                    &owned_value_path!("syslog", "timestamp"),
528                    Kind::timestamp(),
529                    Some("timestamp"),
530                )
531                .with_metadata_field(
532                    &owned_value_path!("syslog", "hostname"),
533                    Kind::bytes().or_undefined(),
534                    Some("host"),
535                )
536                .with_metadata_field(
537                    &owned_value_path!("syslog", "source_ip"),
538                    Kind::bytes().or_undefined(),
539                    None,
540                )
541                .with_metadata_field(
542                    &owned_value_path!("syslog", "severity"),
543                    Kind::bytes().or_undefined(),
544                    Some("severity"),
545                )
546                .with_metadata_field(
547                    &owned_value_path!("syslog", "facility"),
548                    Kind::bytes().or_undefined(),
549                    None,
550                )
551                .with_metadata_field(
552                    &owned_value_path!("syslog", "version"),
553                    Kind::integer().or_undefined(),
554                    None,
555                )
556                .with_metadata_field(
557                    &owned_value_path!("syslog", "appname"),
558                    Kind::bytes().or_undefined(),
559                    Some("service"),
560                )
561                .with_metadata_field(
562                    &owned_value_path!("syslog", "msgid"),
563                    Kind::bytes().or_undefined(),
564                    None,
565                )
566                .with_metadata_field(
567                    &owned_value_path!("syslog", "procid"),
568                    Kind::integer().or_bytes().or_undefined(),
569                    None,
570                )
571                .with_metadata_field(
572                    &owned_value_path!("syslog", "structured_data"),
573                    Kind::object(Collection::from_unknown(Kind::object(
574                        Collection::from_unknown(Kind::bytes()),
575                    ))),
576                    None,
577                )
578                .with_metadata_field(
579                    &owned_value_path!("syslog", "tls_client_metadata"),
580                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
581                    None,
582                );
583
584        assert_eq!(definitions, Some(expected_definition));
585    }
586
587    #[test]
588    fn output_schema_definition_legacy_namespace() {
589        let config = SyslogConfig::default();
590
591        let definitions = config
592            .outputs(LogNamespace::Legacy)
593            .remove(0)
594            .schema_definition(true);
595
596        let expected_definition = Definition::new_with_default_metadata(
597            Kind::object(Collection::empty()),
598            [LogNamespace::Legacy],
599        )
600        .with_event_field(
601            &owned_value_path!("message"),
602            Kind::bytes(),
603            Some("message"),
604        )
605        .with_event_field(
606            &owned_value_path!("timestamp"),
607            Kind::timestamp(),
608            Some("timestamp"),
609        )
610        .with_event_field(
611            &owned_value_path!("hostname"),
612            Kind::bytes().or_undefined(),
613            Some("host"),
614        )
615        .with_event_field(
616            &owned_value_path!("source_ip"),
617            Kind::bytes().or_undefined(),
618            None,
619        )
620        .with_event_field(
621            &owned_value_path!("severity"),
622            Kind::bytes().or_undefined(),
623            Some("severity"),
624        )
625        .with_event_field(
626            &owned_value_path!("facility"),
627            Kind::bytes().or_undefined(),
628            None,
629        )
630        .with_event_field(
631            &owned_value_path!("version"),
632            Kind::integer().or_undefined(),
633            None,
634        )
635        .with_event_field(
636            &owned_value_path!("appname"),
637            Kind::bytes().or_undefined(),
638            Some("service"),
639        )
640        .with_event_field(
641            &owned_value_path!("msgid"),
642            Kind::bytes().or_undefined(),
643            None,
644        )
645        .with_event_field(
646            &owned_value_path!("procid"),
647            Kind::integer().or_bytes().or_undefined(),
648            None,
649        )
650        .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
651        .with_standard_vector_source_metadata();
652
653        assert_eq!(definitions, Some(expected_definition));
654    }
655
656    #[test]
657    fn config_tcp() {
658        let config: SyslogConfig = toml::from_str(
659            r#"
660            mode = "tcp"
661            address = "127.0.0.1:1235"
662          "#,
663        )
664        .unwrap();
665        assert!(matches!(config.mode, Mode::Tcp { .. }));
666    }
667
668    #[test]
669    fn config_tcp_with_receive_buffer_size() {
670        let config: SyslogConfig = toml::from_str(
671            r#"
672            mode = "tcp"
673            address = "127.0.0.1:1235"
674            receive_buffer_bytes = 256
675          "#,
676        )
677        .unwrap();
678
679        let receive_buffer_bytes = match config.mode {
680            Mode::Tcp {
681                receive_buffer_bytes,
682                ..
683            } => receive_buffer_bytes,
684            _ => panic!("expected Mode::Tcp"),
685        };
686
687        assert_eq!(receive_buffer_bytes, Some(256));
688    }
689
690    #[test]
691    fn config_tcp_keepalive_empty() {
692        let config: SyslogConfig = toml::from_str(
693            r#"
694            mode = "tcp"
695            address = "127.0.0.1:1235"
696          "#,
697        )
698        .unwrap();
699
700        let keepalive = match config.mode {
701            Mode::Tcp { keepalive, .. } => keepalive,
702            _ => panic!("expected Mode::Tcp"),
703        };
704
705        assert_eq!(keepalive, None);
706    }
707
708    #[test]
709    fn config_tcp_keepalive_full() {
710        let config: SyslogConfig = toml::from_str(
711            r#"
712            mode = "tcp"
713            address = "127.0.0.1:1235"
714            keepalive.time_secs = 7200
715          "#,
716        )
717        .unwrap();
718
719        let keepalive = match config.mode {
720            Mode::Tcp { keepalive, .. } => keepalive,
721            _ => panic!("expected Mode::Tcp"),
722        };
723
724        let keepalive = keepalive.expect("keepalive config not set");
725
726        assert_eq!(keepalive.time_secs, Some(7200));
727    }
728
729    #[test]
730    fn config_udp() {
731        let config: SyslogConfig = toml::from_str(
732            r#"
733            mode = "udp"
734            address = "127.0.0.1:1235"
735            max_length = 32187
736          "#,
737        )
738        .unwrap();
739        assert!(matches!(config.mode, Mode::Udp { .. }));
740    }
741
742    #[test]
743    fn config_udp_with_receive_buffer_size() {
744        let config: SyslogConfig = toml::from_str(
745            r#"
746            mode = "udp"
747            address = "127.0.0.1:1235"
748            max_length = 32187
749            receive_buffer_bytes = 256
750          "#,
751        )
752        .unwrap();
753
754        let receive_buffer_bytes = match config.mode {
755            Mode::Udp {
756                receive_buffer_bytes,
757                ..
758            } => receive_buffer_bytes,
759            _ => panic!("expected Mode::Udp"),
760        };
761
762        assert_eq!(receive_buffer_bytes, Some(256));
763    }
764
765    #[cfg(unix)]
766    #[test]
767    fn config_unix() {
768        let config: SyslogConfig = toml::from_str(
769            r#"
770            mode = "unix"
771            path = "127.0.0.1:1235"
772          "#,
773        )
774        .unwrap();
775        assert!(matches!(config.mode, Mode::Unix { .. }));
776    }
777
778    #[cfg(unix)]
779    #[test]
780    fn config_unix_permissions() {
781        let config: SyslogConfig = toml::from_str(
782            r#"
783            mode = "unix"
784            path = "127.0.0.1:1235"
785            socket_file_mode = 0o777
786          "#,
787        )
788        .unwrap();
789        let socket_file_mode = match config.mode {
790            Mode::Unix {
791                path: _,
792                socket_file_mode,
793            } => socket_file_mode,
794            _ => panic!("expected Mode::Unix"),
795        };
796
797        assert_eq!(socket_file_mode, Some(0o777));
798    }
799
800    #[test]
801    fn syslog_ng_network_syslog_protocol() {
802        // this should also match rsyslog omfwd with template=RSYSLOG_SyslogProtocol23Format
803        let msg = "i am foobar";
804        let raw = format!(
805            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {}{} {}"#,
806            r#"[meta sequenceId="1" sysUpTime="37" language="EN"]"#,
807            r#"[origin ip="192.168.0.1" software="test"]"#,
808            msg
809        );
810
811        let mut expected = Event::Log(LogEvent::from(msg));
812
813        {
814            let expected = expected.as_mut_log();
815            expected.insert(
816                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
817                Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
818                    .single()
819                    .expect("invalid timestamp"),
820            );
821            expected.insert(
822                log_schema().source_type_key_target_path().unwrap(),
823                "syslog",
824            );
825            expected.insert("host", "74794bfb6795");
826            expected.insert("hostname", "74794bfb6795");
827
828            expected.insert("meta.sequenceId", "1");
829            expected.insert("meta.sysUpTime", "37");
830            expected.insert("meta.language", "EN");
831            expected.insert("origin.software", "test");
832            expected.insert("origin.ip", "192.168.0.1");
833
834            expected.insert("severity", "notice");
835            expected.insert("facility", "user");
836            expected.insert("version", 1);
837            expected.insert("appname", "root");
838            expected.insert("procid", 8449);
839            expected.insert("source_ip", "192.168.0.254");
840        }
841
842        assert_event_data_eq!(
843            event_from_bytes(
844                "host",
845                Some(Bytes::from("192.168.0.254")),
846                raw.into(),
847                LogNamespace::Legacy
848            )
849            .unwrap(),
850            expected
851        );
852    }
853
854    #[test]
855    fn handles_incorrect_sd_element() {
856        let msg = "qwerty";
857        let raw = format!(
858            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
859            r"[incorrect x]", msg
860        );
861
862        let mut expected = Event::Log(LogEvent::from(msg));
863        {
864            let expected = expected.as_mut_log();
865            expected.insert(
866                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
867                Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
868                    .single()
869                    .expect("invalid timestamp"),
870            );
871            expected.insert(
872                log_schema().host_key().unwrap().to_string().as_str(),
873                "74794bfb6795",
874            );
875            expected.insert("hostname", "74794bfb6795");
876            expected.insert(
877                log_schema().source_type_key_target_path().unwrap(),
878                "syslog",
879            );
880            expected.insert("severity", "notice");
881            expected.insert("facility", "user");
882            expected.insert("version", 1);
883            expected.insert("appname", "root");
884            expected.insert("procid", 8449);
885            expected.insert("source_ip", "192.168.0.254");
886        }
887
888        let event = event_from_bytes(
889            "host",
890            Some(Bytes::from("192.168.0.254")),
891            raw.into(),
892            LogNamespace::Legacy,
893        )
894        .unwrap();
895        assert_event_data_eq!(event, expected);
896
897        let raw = format!(
898            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
899            r"[incorrect x=]", msg
900        );
901
902        let event = event_from_bytes(
903            "host",
904            Some(Bytes::from("192.168.0.254")),
905            raw.into(),
906            LogNamespace::Legacy,
907        )
908        .unwrap();
909        assert_event_data_eq!(event, expected);
910    }
911
912    #[test]
913    fn handles_empty_sd_element() {
914        fn there_is_map_called_empty(event: Event) -> bool {
915            event
916                .as_log()
917                .get("empty")
918                .expect("empty exists")
919                .is_object()
920        }
921
922        let msg = format!(
923            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
924            r"[empty]"
925        );
926
927        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
928        assert!(there_is_map_called_empty(event));
929
930        let msg = format!(
931            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
932            r#"[non_empty x="1"][empty]"#
933        );
934
935        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
936        assert!(there_is_map_called_empty(event));
937
938        let msg = format!(
939            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
940            r#"[empty][non_empty x="1"]"#
941        );
942
943        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
944        assert!(there_is_map_called_empty(event));
945
946        let msg = format!(
947            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
948            r#"[empty not_really="testing the test"]"#
949        );
950
951        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
952        assert!(there_is_map_called_empty(event));
953    }
954
955    #[test]
956    fn handles_weird_whitespace() {
957        // this should also match rsyslog omfwd with template=RSYSLOG_SyslogProtocol23Format
958        let raw = r#"
959            <13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
960            "#;
961        let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#;
962
963        assert_event_data_eq!(
964            event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap(),
965            event_from_bytes(
966                "host",
967                None,
968                cleaned.to_owned().into(),
969                LogNamespace::Legacy
970            )
971            .unwrap()
972        );
973    }
974
975    #[test]
976    fn handles_dots_in_sdata() {
977        let raw =
978            r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog:  [origin foo.bar="baz"] hello"#;
979        let event =
980            event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap();
981        assert_eq!(
982            event.as_log().get(r#"origin."foo.bar""#),
983            Some(&Value::from("baz"))
984        );
985    }
986
987    #[test]
988    fn syslog_ng_default_network() {
989        let msg = "i am foobar";
990        let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {msg}"#);
991        let event = event_from_bytes(
992            "host",
993            Some(Bytes::from("192.168.0.254")),
994            raw.into(),
995            LogNamespace::Legacy,
996        )
997        .unwrap();
998
999        let mut expected = Event::Log(LogEvent::from(msg));
1000        {
1001            let value = event.as_log().get("timestamp").unwrap();
1002            let year = value.as_timestamp().unwrap().naive_local().year();
1003
1004            let expected = expected.as_mut_log();
1005            let expected_date: DateTime<Utc> = Local
1006                .with_ymd_and_hms(year, 2, 13, 20, 7, 26)
1007                .single()
1008                .expect("invalid timestamp")
1009                .into();
1010
1011            expected.insert(
1012                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1013                expected_date,
1014            );
1015            expected.insert(
1016                log_schema().host_key().unwrap().to_string().as_str(),
1017                "74794bfb6795",
1018            );
1019            expected.insert(
1020                log_schema().source_type_key_target_path().unwrap(),
1021                "syslog",
1022            );
1023            expected.insert("hostname", "74794bfb6795");
1024            expected.insert("severity", "notice");
1025            expected.insert("facility", "user");
1026            expected.insert("appname", "root");
1027            expected.insert("procid", 8539);
1028            expected.insert("source_ip", "192.168.0.254");
1029        }
1030
1031        assert_event_data_eq!(event, expected);
1032    }
1033
1034    #[test]
1035    fn rsyslog_omfwd_tcp_default() {
1036        let msg = "start";
1037        let raw = format!(
1038            r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog:  [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {msg}"#
1039        );
1040        let event = event_from_bytes(
1041            "host",
1042            Some(Bytes::from("192.168.0.254")),
1043            raw.into(),
1044            LogNamespace::Legacy,
1045        )
1046        .unwrap();
1047
1048        let mut expected = Event::Log(LogEvent::from(msg));
1049        {
1050            let value = event.as_log().get("timestamp").unwrap();
1051            let year = value.as_timestamp().unwrap().naive_local().year();
1052
1053            let expected = expected.as_mut_log();
1054            let expected_date: DateTime<Utc> = Local
1055                .with_ymd_and_hms(year, 2, 13, 21, 31, 56)
1056                .single()
1057                .expect("invalid timestamp")
1058                .into();
1059            expected.insert(
1060                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1061                expected_date,
1062            );
1063            expected.insert(
1064                log_schema().source_type_key_target_path().unwrap(),
1065                "syslog",
1066            );
1067            expected.insert("host", "74794bfb6795");
1068            expected.insert("hostname", "74794bfb6795");
1069            expected.insert("severity", "info");
1070            expected.insert("facility", "local7");
1071            expected.insert("appname", "liblogging-stdlog");
1072            expected.insert("origin.software", "rsyslogd");
1073            expected.insert("origin.swVersion", "8.24.0");
1074            expected.insert("source_ip", "192.168.0.254");
1075            expected.insert(event_path!("origin", "x-pid"), "8979");
1076            expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1077        }
1078
1079        assert_event_data_eq!(event, expected);
1080    }
1081
1082    #[test]
1083    fn rsyslog_omfwd_tcp_forward_format() {
1084        let msg = "start";
1085        let raw = format!(
1086            r#"<190>2019-02-13T21:53:30.605850+00:00 74794bfb6795 liblogging-stdlog:  [origin software="rsyslogd" swVersion="8.24.0" x-pid="9043" x-info="http://www.rsyslog.com"] {msg}"#
1087        );
1088
1089        let mut expected = Event::Log(LogEvent::from(msg));
1090        {
1091            let expected = expected.as_mut_log();
1092            expected.insert(
1093                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1094                Utc.with_ymd_and_hms(2019, 2, 13, 21, 53, 30)
1095                    .single()
1096                    .and_then(|t| t.with_nanosecond(605_850 * 1000))
1097                    .expect("invalid timestamp"),
1098            );
1099            expected.insert(
1100                log_schema().source_type_key_target_path().unwrap(),
1101                "syslog",
1102            );
1103            expected.insert("host", "74794bfb6795");
1104            expected.insert("hostname", "74794bfb6795");
1105            expected.insert("severity", "info");
1106            expected.insert("facility", "local7");
1107            expected.insert("appname", "liblogging-stdlog");
1108            expected.insert("origin.software", "rsyslogd");
1109            expected.insert("origin.swVersion", "8.24.0");
1110            expected.insert(event_path!("origin", "x-pid"), "9043");
1111            expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1112        }
1113
1114        assert_event_data_eq!(
1115            event_from_bytes("host", None, raw.into(), LogNamespace::Legacy).unwrap(),
1116            expected
1117        );
1118    }
1119
1120    #[tokio::test]
1121    async fn test_tcp_syslog() {
1122        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1123            let num_messages: usize = 10000;
1124            let in_addr = next_addr();
1125
1126            // Create and spawn the source.
1127            let config = SyslogConfig::from_mode(Mode::Tcp {
1128                address: in_addr.into(),
1129                permit_origin: None,
1130                keepalive: None,
1131                tls: None,
1132                receive_buffer_bytes: None,
1133                connection_limit: None,
1134            });
1135
1136            let key = ComponentKey::from("in");
1137            let (tx, rx) = SourceSender::new_test();
1138            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1139            let shutdown_complete = shutdown.shutdown_tripwire();
1140
1141            let source = config
1142                .build(context)
1143                .await
1144                .expect("source should not fail to build");
1145            tokio::spawn(source);
1146
1147            // Wait for source to become ready to accept traffic.
1148            wait_for_tcp(in_addr).await;
1149
1150            let output_events = CountReceiver::receive_events(rx);
1151
1152            // Now craft and send syslog messages to the source, and collect them on the other side.
1153            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1154                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1155                .collect();
1156
1157            let input_lines: Vec<String> =
1158                input_messages.iter().map(|msg| msg.to_string()).collect();
1159
1160            send_lines(in_addr, input_lines).await.unwrap();
1161
1162            // Wait a short period of time to ensure the messages get sent.
1163            sleep(Duration::from_secs(2)).await;
1164
1165            // Shutdown the source, and make sure we've got all the messages we sent in.
1166            shutdown
1167                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1168                .await;
1169            shutdown_complete.await;
1170
1171            let output_events = output_events.await;
1172            assert_eq!(output_events.len(), num_messages);
1173
1174            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1175                .into_iter()
1176                .map(|mut e| {
1177                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1178                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1179                    e.into()
1180                })
1181                .collect();
1182            assert_eq!(output_messages, input_messages);
1183        })
1184        .await;
1185    }
1186
1187    #[cfg(unix)]
1188    #[tokio::test]
1189    async fn test_unix_stream_syslog() {
1190        use std::os::unix::net::UnixStream as StdUnixStream;
1191
1192        use futures_util::{SinkExt, stream};
1193        use tokio::{io::AsyncWriteExt, net::UnixStream};
1194        use tokio_util::codec::{FramedWrite, LinesCodec};
1195
1196        use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
1197
1198        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1199            let num_messages: usize = 1;
1200            let in_path = tempfile::tempdir().unwrap().keep().join("stream_test");
1201
1202            // Create and spawn the source.
1203            let config = SyslogConfig::from_mode(Mode::Unix {
1204                path: in_path.clone(),
1205                socket_file_mode: None,
1206            });
1207
1208            let key = ComponentKey::from("in");
1209            let (tx, rx) = SourceSender::new_test();
1210            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1211            let shutdown_complete = shutdown.shutdown_tripwire();
1212
1213            let source = config
1214                .build(context)
1215                .await
1216                .expect("source should not fail to build");
1217            tokio::spawn(source);
1218
1219            // Wait for source to become ready to accept traffic.
1220            while StdUnixStream::connect(&in_path).is_err() {
1221                tokio::task::yield_now().await;
1222            }
1223
1224            let output_events = CountReceiver::receive_events(rx);
1225
1226            // Now craft and send syslog messages to the source, and collect them on the other side.
1227            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1228                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1229                .collect();
1230
1231            let stream = UnixStream::connect(&in_path).await.unwrap();
1232            let mut sink = FramedWrite::new(stream, LinesCodec::new());
1233
1234            let lines: Vec<String> = input_messages.iter().map(|msg| msg.to_string()).collect();
1235            let mut lines = stream::iter(lines).map(Ok);
1236            sink.send_all(&mut lines).await.unwrap();
1237
1238            let stream = sink.get_mut();
1239            stream.shutdown().await.unwrap();
1240
1241            // Wait a short period of time to ensure the messages get sent.
1242            sleep(Duration::from_secs(1)).await;
1243
1244            shutdown
1245                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1246                .await;
1247            shutdown_complete.await;
1248
1249            let output_events = output_events.await;
1250            assert_eq!(output_events.len(), num_messages);
1251
1252            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1253                .into_iter()
1254                .map(|mut e| {
1255                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1256                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1257                    e.into()
1258                })
1259                .collect();
1260            assert_eq!(output_messages, input_messages);
1261        })
1262        .await;
1263    }
1264
1265    #[tokio::test]
1266    async fn test_octet_counting_syslog() {
1267        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1268            let num_messages: usize = 10000;
1269            let in_addr = next_addr();
1270
1271            // Create and spawn the source.
1272            let config = SyslogConfig::from_mode(Mode::Tcp {
1273                address: in_addr.into(),
1274                permit_origin: None,
1275                keepalive: None,
1276                tls: None,
1277                receive_buffer_bytes: None,
1278                connection_limit: None,
1279            });
1280
1281            let key = ComponentKey::from("in");
1282            let (tx, rx) = SourceSender::new_test();
1283            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1284            let shutdown_complete = shutdown.shutdown_tripwire();
1285
1286            let source = config
1287                .build(context)
1288                .await
1289                .expect("source should not fail to build");
1290            tokio::spawn(source);
1291
1292            // Wait for source to become ready to accept traffic.
1293            wait_for_tcp(in_addr).await;
1294
1295            let output_events = CountReceiver::receive_events(rx);
1296
1297            // Now craft and send syslog messages to the source, and collect them on the other side.
1298            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1299                .map(|i| {
1300                    let mut msg = SyslogMessageRfc5424::random(i, 30, 4, 3, 3);
1301                    msg.message.push('\n');
1302                    msg.message.push_str(&random_string(30));
1303                    msg
1304                })
1305                .collect();
1306
1307            let codec = BytesCodec::new();
1308            let input_lines: Vec<Bytes> = input_messages
1309                .iter()
1310                .map(|msg| {
1311                    let s = msg.to_string();
1312                    format!("{} {}", s.len(), s).into()
1313                })
1314                .collect();
1315
1316            send_encodable(in_addr, codec, input_lines).await.unwrap();
1317
1318            // Wait a short period of time to ensure the messages get sent.
1319            sleep(Duration::from_secs(2)).await;
1320
1321            // Shutdown the source, and make sure we've got all the messages we sent in.
1322            shutdown
1323                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1324                .await;
1325            shutdown_complete.await;
1326
1327            let output_events = output_events.await;
1328            assert_eq!(output_events.len(), num_messages);
1329
1330            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1331                .into_iter()
1332                .map(|mut e| {
1333                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1334                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1335                    e.into()
1336                })
1337                .collect();
1338            assert_eq!(output_messages, input_messages);
1339        })
1340        .await;
1341    }
1342
1343    #[derive(Deserialize, PartialEq, Clone, Debug)]
1344    struct SyslogMessageRfc5424 {
1345        msgid: String,
1346        severity: Severity,
1347        facility: Facility,
1348        version: u8,
1349        timestamp: String,
1350        host: String,
1351        source_type: String,
1352        appname: String,
1353        procid: usize,
1354        message: String,
1355        #[serde(flatten)]
1356        structured_data: StructuredData,
1357    }
1358
1359    impl SyslogMessageRfc5424 {
1360        fn random(
1361            id: usize,
1362            msg_len: usize,
1363            field_len: usize,
1364            max_map_size: usize,
1365            max_children: usize,
1366        ) -> Self {
1367            let msg = random_string(msg_len);
1368            let structured_data = random_structured_data(max_map_size, max_children, field_len);
1369
1370            let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
1371            //"secfrac" can contain up to 6 digits, but TCP sinks uses `AutoSi`
1372
1373            Self {
1374                msgid: format!("test{id}"),
1375                severity: Severity::LOG_INFO,
1376                facility: Facility::LOG_USER,
1377                version: 1,
1378                timestamp,
1379                host: "hogwarts".to_owned(),
1380                source_type: "syslog".to_owned(),
1381                appname: "harry".to_owned(),
1382                procid: rng().random_range(0..32768),
1383                structured_data,
1384                message: msg,
1385            }
1386        }
1387    }
1388
1389    impl fmt::Display for SyslogMessageRfc5424 {
1390        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1391            write!(
1392                f,
1393                "<{}>{} {} {} {} {} {} {} {}",
1394                encode_priority(self.severity, self.facility),
1395                self.version,
1396                self.timestamp,
1397                self.host,
1398                self.appname,
1399                self.procid,
1400                self.msgid,
1401                format_structured_data_rfc5424(&self.structured_data),
1402                self.message
1403            )
1404        }
1405    }
1406
1407    impl From<Event> for SyslogMessageRfc5424 {
1408        fn from(e: Event) -> Self {
1409            let (value, _) = e.into_log().into_parts();
1410            let mut fields = value.into_object().unwrap();
1411
1412            Self {
1413                msgid: fields.remove("msgid").map(value_to_string).unwrap(),
1414                severity: fields
1415                    .remove("severity")
1416                    .map(value_to_string)
1417                    .and_then(|s| Severity::from_str(s.as_str()))
1418                    .unwrap(),
1419                facility: fields
1420                    .remove("facility")
1421                    .map(value_to_string)
1422                    .and_then(|s| Facility::from_str(s.as_str()))
1423                    .unwrap(),
1424                version: fields
1425                    .remove("version")
1426                    .map(value_to_string)
1427                    .map(|s| u8::from_str(s.as_str()).unwrap())
1428                    .unwrap(),
1429                timestamp: fields.remove("timestamp").map(value_to_string).unwrap(),
1430                host: fields.remove("host").map(value_to_string).unwrap(),
1431                source_type: fields.remove("source_type").map(value_to_string).unwrap(),
1432                appname: fields.remove("appname").map(value_to_string).unwrap(),
1433                procid: fields
1434                    .remove("procid")
1435                    .map(value_to_string)
1436                    .map(|s| usize::from_str(s.as_str()).unwrap())
1437                    .unwrap(),
1438                message: fields.remove("message").map(value_to_string).unwrap(),
1439                structured_data: structured_data_from_fields(fields),
1440            }
1441        }
1442    }
1443
1444    fn structured_data_from_fields(fields: ObjectMap) -> StructuredData {
1445        let mut structured_data = StructuredData::default();
1446
1447        for (key, value) in fields.into_iter() {
1448            let subfields = value
1449                .into_object()
1450                .unwrap()
1451                .into_iter()
1452                .map(|(k, v)| (k.into(), value_to_string(v)))
1453                .collect();
1454
1455            structured_data.insert(key.into(), subfields);
1456        }
1457
1458        structured_data
1459    }
1460
1461    #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1462    #[derive(Copy, Clone, Deserialize, PartialEq, Eq, Debug)]
1463    pub enum Severity {
1464        #[serde(rename(deserialize = "emergency"))]
1465        LOG_EMERG,
1466        #[serde(rename(deserialize = "alert"))]
1467        LOG_ALERT,
1468        #[serde(rename(deserialize = "critical"))]
1469        LOG_CRIT,
1470        #[serde(rename(deserialize = "error"))]
1471        LOG_ERR,
1472        #[serde(rename(deserialize = "warn"))]
1473        LOG_WARNING,
1474        #[serde(rename(deserialize = "notice"))]
1475        LOG_NOTICE,
1476        #[serde(rename(deserialize = "info"))]
1477        LOG_INFO,
1478        #[serde(rename(deserialize = "debug"))]
1479        LOG_DEBUG,
1480    }
1481
1482    impl Severity {
1483        fn from_str(s: &str) -> Option<Self> {
1484            match s {
1485                "emergency" => Some(Self::LOG_EMERG),
1486                "alert" => Some(Self::LOG_ALERT),
1487                "critical" => Some(Self::LOG_CRIT),
1488                "error" => Some(Self::LOG_ERR),
1489                "warn" => Some(Self::LOG_WARNING),
1490                "notice" => Some(Self::LOG_NOTICE),
1491                "info" => Some(Self::LOG_INFO),
1492                "debug" => Some(Self::LOG_DEBUG),
1493
1494                x => {
1495                    #[allow(clippy::print_stdout)]
1496                    {
1497                        println!("converting severity str, got {x}");
1498                    }
1499                    None
1500                }
1501            }
1502        }
1503    }
1504
1505    #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1506    #[derive(Copy, Clone, PartialEq, Eq, Deserialize, Debug)]
1507    pub enum Facility {
1508        #[serde(rename(deserialize = "kernel"))]
1509        LOG_KERN = 0 << 3,
1510        #[serde(rename(deserialize = "user"))]
1511        LOG_USER = 1 << 3,
1512        #[serde(rename(deserialize = "mail"))]
1513        LOG_MAIL = 2 << 3,
1514        #[serde(rename(deserialize = "daemon"))]
1515        LOG_DAEMON = 3 << 3,
1516        #[serde(rename(deserialize = "auth"))]
1517        LOG_AUTH = 4 << 3,
1518        #[serde(rename(deserialize = "syslog"))]
1519        LOG_SYSLOG = 5 << 3,
1520    }
1521
1522    impl Facility {
1523        fn from_str(s: &str) -> Option<Self> {
1524            match s {
1525                "kernel" => Some(Self::LOG_KERN),
1526                "user" => Some(Self::LOG_USER),
1527                "mail" => Some(Self::LOG_MAIL),
1528                "daemon" => Some(Self::LOG_DAEMON),
1529                "auth" => Some(Self::LOG_AUTH),
1530                "syslog" => Some(Self::LOG_SYSLOG),
1531                _ => None,
1532            }
1533        }
1534    }
1535
1536    type StructuredData = HashMap<String, HashMap<String, String>>;
1537
1538    fn random_structured_data(
1539        max_map_size: usize,
1540        max_children: usize,
1541        field_len: usize,
1542    ) -> StructuredData {
1543        let amount = rng().random_range(0..max_children);
1544
1545        random_maps(max_map_size, field_len)
1546            .filter(|m| !m.is_empty()) //syslog_rfc5424 ignores empty maps, tested separately
1547            .take(amount)
1548            .enumerate()
1549            .map(|(i, map)| (format!("id{i}"), map))
1550            .collect()
1551    }
1552
1553    fn format_structured_data_rfc5424(data: &StructuredData) -> String {
1554        if data.is_empty() {
1555            "-".to_string()
1556        } else {
1557            let mut res = String::new();
1558            for (id, params) in data {
1559                res = res + "[" + id;
1560                for (name, value) in params {
1561                    res = res + " " + name + "=\"" + value + "\"";
1562                }
1563                res += "]";
1564            }
1565
1566            res
1567        }
1568    }
1569
1570    const fn encode_priority(severity: Severity, facility: Facility) -> u8 {
1571        facility as u8 | severity as u8
1572    }
1573
1574    fn value_to_string(v: Value) -> String {
1575        if v.is_bytes() {
1576            let buf = v.as_bytes().unwrap();
1577            String::from_utf8_lossy(buf).to_string()
1578        } else if v.is_timestamp() {
1579            let ts = v.as_timestamp().unwrap();
1580            ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1581        } else {
1582            v.to_string()
1583        }
1584    }
1585}