vector/sources/
syslog.rs

1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{net::SocketAddr, time::Duration};
4use vector_lib::ipallowlist::IpAllowlistConfig;
5
6use bytes::Bytes;
7use chrono::Utc;
8use futures::StreamExt;
9use listenfd::ListenFd;
10use smallvec::SmallVec;
11use tokio_util::udp::UdpFramed;
12use vector_lib::codecs::{
13    decoding::{Deserializer, Framer},
14    BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
15};
16use vector_lib::config::{LegacyKey, LogNamespace};
17use vector_lib::configurable::configurable_component;
18use vector_lib::lookup::{lookup_v2::OptionalValuePath, path, OwnedValuePath};
19use vrl::event_path;
20
21#[cfg(unix)]
22use crate::sources::util::build_unix_stream_source;
23use crate::{
24    codecs::Decoder,
25    config::{
26        log_schema, DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput,
27    },
28    event::Event,
29    internal_events::StreamClosedError,
30    internal_events::{SocketBindError, SocketMode, SocketReceiveError},
31    net,
32    shutdown::ShutdownSignal,
33    sources::util::net::{try_bind_udp_socket, SocketListenAddr, TcpNullAcker, TcpSource},
34    tcp::TcpKeepaliveConfig,
35    tls::{MaybeTlsSettings, TlsSourceConfig},
36    SourceSender,
37};
38
39/// Configuration for the `syslog` source.
40#[configurable_component(source("syslog", "Collect logs sent via Syslog."))]
41#[derive(Clone, Debug)]
42pub struct SyslogConfig {
43    #[serde(flatten)]
44    mode: Mode,
45
46    /// The maximum buffer size of incoming messages, in bytes.
47    ///
48    /// Messages larger than this are truncated.
49    #[serde(default = "crate::serde::default_max_length")]
50    #[configurable(metadata(docs::type_unit = "bytes"))]
51    max_length: usize,
52
53    /// Overrides the name of the log field used to add the peer host to each event.
54    ///
55    /// If using TCP or UDP, the value is the peer host's address, including the port. For example, `1.2.3.4:9000`. If using
56    /// UDS, the value is the socket path itself.
57    ///
58    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
59    ///
60    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
61    host_key: Option<OptionalValuePath>,
62
63    /// The namespace to use for logs. This overrides the global setting.
64    #[configurable(metadata(docs::hidden))]
65    #[serde(default)]
66    pub log_namespace: Option<bool>,
67}
68
69/// Listener mode for the `syslog` source.
70#[configurable_component]
71#[derive(Clone, Debug)]
72#[serde(tag = "mode", rename_all = "snake_case")]
73#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
74#[allow(clippy::large_enum_variant)]
75pub enum Mode {
76    /// Listen on TCP.
77    Tcp {
78        #[configurable(derived)]
79        address: SocketListenAddr,
80
81        #[configurable(derived)]
82        keepalive: Option<TcpKeepaliveConfig>,
83
84        #[configurable(derived)]
85        permit_origin: Option<IpAllowlistConfig>,
86
87        #[configurable(derived)]
88        tls: Option<TlsSourceConfig>,
89
90        /// The size of the receive buffer used for each connection.
91        ///
92        /// This should not typically needed to be changed.
93        #[configurable(metadata(docs::type_unit = "bytes"))]
94        receive_buffer_bytes: Option<usize>,
95
96        /// The maximum number of TCP connections that are allowed at any given time.
97        connection_limit: Option<u32>,
98    },
99
100    /// Listen on UDP.
101    Udp {
102        #[configurable(derived)]
103        address: SocketListenAddr,
104
105        /// The size of the receive buffer used for the listening socket.
106        ///
107        /// This should not typically needed to be changed.
108        #[configurable(metadata(docs::type_unit = "bytes"))]
109        receive_buffer_bytes: Option<usize>,
110    },
111
112    /// Listen on UDS (Unix domain socket). This only supports Unix stream sockets.
113    ///
114    /// For Unix datagram sockets, use the `socket` source instead.
115    #[cfg(unix)]
116    Unix {
117        /// The Unix socket path.
118        ///
119        /// This should be an absolute path.
120        #[configurable(metadata(docs::examples = "/path/to/socket"))]
121        path: PathBuf,
122
123        /// Unix file mode bits to be applied to the unix socket file as its designated file permissions.
124        ///
125        /// The file mode value can be specified in any numeric format supported by your configuration
126        /// language, but it is most intuitive to use an octal number.
127        socket_file_mode: Option<u32>,
128    },
129}
130
131impl SyslogConfig {
132    #[cfg(test)]
133    pub fn from_mode(mode: Mode) -> Self {
134        Self {
135            mode,
136            host_key: None,
137            max_length: crate::serde::default_max_length(),
138            log_namespace: None,
139        }
140    }
141}
142
143impl Default for SyslogConfig {
144    fn default() -> Self {
145        Self {
146            mode: Mode::Tcp {
147                address: SocketListenAddr::SocketAddr("0.0.0.0:514".parse().unwrap()),
148                keepalive: None,
149                permit_origin: None,
150                tls: None,
151                receive_buffer_bytes: None,
152                connection_limit: None,
153            },
154            host_key: None,
155            max_length: crate::serde::default_max_length(),
156            log_namespace: None,
157        }
158    }
159}
160
161impl GenerateConfig for SyslogConfig {
162    fn generate_config() -> toml::Value {
163        toml::Value::try_from(SyslogConfig::default()).unwrap()
164    }
165}
166
167#[async_trait::async_trait]
168#[typetag::serde(name = "syslog")]
169impl SourceConfig for SyslogConfig {
170    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
171        let log_namespace = cx.log_namespace(self.log_namespace);
172        let host_key = self
173            .host_key
174            .clone()
175            .and_then(|k| k.path)
176            .or(log_schema().host_key().cloned());
177
178        match self.mode.clone() {
179            Mode::Tcp {
180                address,
181                keepalive,
182                permit_origin,
183                tls,
184                receive_buffer_bytes,
185                connection_limit,
186            } => {
187                let source = SyslogTcpSource {
188                    max_length: self.max_length,
189                    host_key,
190                    log_namespace,
191                };
192                let shutdown_secs = Duration::from_secs(30);
193                let tls_config = tls.as_ref().map(|tls| tls.tls_config.clone());
194                let tls_client_metadata_key = tls
195                    .as_ref()
196                    .and_then(|tls| tls.client_metadata_key.clone())
197                    .and_then(|k| k.path);
198                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
199                source.run(
200                    address,
201                    keepalive,
202                    shutdown_secs,
203                    tls,
204                    tls_client_metadata_key,
205                    receive_buffer_bytes,
206                    None,
207                    cx,
208                    false.into(),
209                    connection_limit,
210                    permit_origin.map(Into::into),
211                    SyslogConfig::NAME,
212                    log_namespace,
213                )
214            }
215            Mode::Udp {
216                address,
217                receive_buffer_bytes,
218            } => Ok(udp(
219                address,
220                self.max_length,
221                host_key,
222                receive_buffer_bytes,
223                cx.shutdown,
224                log_namespace,
225                cx.out,
226            )),
227            #[cfg(unix)]
228            Mode::Unix {
229                path,
230                socket_file_mode,
231            } => {
232                let decoder = Decoder::new(
233                    Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(
234                        self.max_length,
235                    )),
236                    Deserializer::Syslog(
237                        SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
238                    ),
239                );
240
241                build_unix_stream_source(
242                    path,
243                    socket_file_mode,
244                    decoder,
245                    move |events, host| handle_events(events, &host_key, host, log_namespace),
246                    cx.shutdown,
247                    cx.out,
248                )
249            }
250        }
251    }
252
253    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
254        let log_namespace = global_log_namespace.merge(self.log_namespace);
255        let schema_definition = SyslogDeserializerConfig::from_source(SyslogConfig::NAME)
256            .schema_definition(log_namespace)
257            .with_standard_vector_source_metadata();
258
259        vec![SourceOutput::new_maybe_logs(
260            DataType::Log,
261            schema_definition,
262        )]
263    }
264
265    fn resources(&self) -> Vec<Resource> {
266        match self.mode.clone() {
267            Mode::Tcp { address, .. } => vec![address.as_tcp_resource()],
268            Mode::Udp { address, .. } => vec![address.as_udp_resource()],
269            #[cfg(unix)]
270            Mode::Unix { .. } => vec![],
271        }
272    }
273
274    fn can_acknowledge(&self) -> bool {
275        false
276    }
277}
278
279#[derive(Debug, Clone)]
280struct SyslogTcpSource {
281    max_length: usize,
282    host_key: Option<OwnedValuePath>,
283    log_namespace: LogNamespace,
284}
285
286impl TcpSource for SyslogTcpSource {
287    type Error = vector_lib::codecs::decoding::Error;
288    type Item = SmallVec<[Event; 1]>;
289    type Decoder = Decoder;
290    type Acker = TcpNullAcker;
291
292    fn decoder(&self) -> Self::Decoder {
293        Decoder::new(
294            Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(self.max_length)),
295            Deserializer::Syslog(SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build()),
296        )
297    }
298
299    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
300        handle_events(
301            events,
302            &self.host_key,
303            Some(host.ip().to_string().into()),
304            self.log_namespace,
305        );
306    }
307
308    fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
309        TcpNullAcker
310    }
311}
312
313pub fn udp(
314    addr: SocketListenAddr,
315    _max_length: usize,
316    host_key: Option<OwnedValuePath>,
317    receive_buffer_bytes: Option<usize>,
318    shutdown: ShutdownSignal,
319    log_namespace: LogNamespace,
320    mut out: SourceSender,
321) -> super::Source {
322    Box::pin(async move {
323        let listenfd = ListenFd::from_env();
324        let socket = try_bind_udp_socket(addr, listenfd).await.map_err(|error| {
325            emit!(SocketBindError {
326                mode: SocketMode::Udp,
327                error: &error,
328            })
329        })?;
330
331        if let Some(receive_buffer_bytes) = receive_buffer_bytes {
332            if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
333                warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
334            }
335        }
336
337        info!(
338            message = "Listening.",
339            addr = %addr,
340            r#type = "udp"
341        );
342
343        let mut stream = UdpFramed::new(
344            socket,
345            Decoder::new(
346                Framer::Bytes(BytesDecoder::new()),
347                Deserializer::Syslog(
348                    SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
349                ),
350            ),
351        )
352        .take_until(shutdown)
353        .filter_map(|frame| {
354            let host_key = host_key.clone();
355            async move {
356                match frame {
357                    Ok(((mut events, _byte_size), received_from)) => {
358                        let received_from = received_from.ip().to_string().into();
359                        handle_events(&mut events, &host_key, Some(received_from), log_namespace);
360                        Some(events.remove(0))
361                    }
362                    Err(error) => {
363                        emit!(SocketReceiveError {
364                            mode: SocketMode::Udp,
365                            error: &error,
366                        });
367                        None
368                    }
369                }
370            }
371        })
372        .boxed();
373
374        match out.send_event_stream(&mut stream).await {
375            Ok(()) => {
376                debug!("Finished sending.");
377                Ok(())
378            }
379            Err(_) => {
380                let (count, _) = stream.size_hint();
381                emit!(StreamClosedError { count });
382                Err(())
383            }
384        }
385    })
386}
387
388fn handle_events(
389    events: &mut [Event],
390    host_key: &Option<OwnedValuePath>,
391    default_host: Option<Bytes>,
392    log_namespace: LogNamespace,
393) {
394    for event in events {
395        enrich_syslog_event(event, host_key, default_host.clone(), log_namespace);
396    }
397}
398
399fn enrich_syslog_event(
400    event: &mut Event,
401    host_key: &Option<OwnedValuePath>,
402    default_host: Option<Bytes>,
403    log_namespace: LogNamespace,
404) {
405    let log = event.as_mut_log();
406
407    if let Some(default_host) = &default_host {
408        log_namespace.insert_source_metadata(
409            SyslogConfig::NAME,
410            log,
411            Some(LegacyKey::Overwrite(path!("source_ip"))),
412            path!("source_ip"),
413            default_host.clone(),
414        );
415    }
416
417    let parsed_hostname = log
418        .get(event_path!("hostname"))
419        .map(|hostname| hostname.coerce_to_bytes());
420
421    if let Some(parsed_host) = parsed_hostname.or(default_host) {
422        let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
423
424        log_namespace.insert_source_metadata(
425            SyslogConfig::NAME,
426            log,
427            legacy_host_key,
428            path!("host"),
429            parsed_host,
430        );
431    }
432
433    log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now());
434
435    if log_namespace == LogNamespace::Legacy {
436        let timestamp = log
437            .get(event_path!("timestamp"))
438            .and_then(|timestamp| timestamp.as_timestamp().cloned())
439            .unwrap_or_else(Utc::now);
440        log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
441    }
442
443    trace!(
444        message = "Processing one event.",
445        event = ?event
446    );
447}
448
449#[cfg(test)]
450mod test {
451    use std::{collections::HashMap, fmt, str::FromStr};
452    use vector_lib::lookup::{event_path, owned_value_path, OwnedTargetPath};
453
454    use chrono::prelude::*;
455    use rand::{rng, Rng};
456    use serde::Deserialize;
457    use tokio::time::{sleep, Duration, Instant};
458    use tokio_util::codec::BytesCodec;
459    use vector_lib::assert_event_data_eq;
460    use vector_lib::codecs::decoding::format::Deserializer;
461    use vector_lib::lookup::PathPrefix;
462    use vector_lib::{config::ComponentKey, schema::Definition};
463    use vrl::value::{kind::Collection, Kind, ObjectMap, Value};
464
465    use super::*;
466    use crate::{
467        config::log_schema,
468        event::{Event, LogEvent},
469        test_util::{
470            components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
471            next_addr, random_maps, random_string, send_encodable, send_lines, wait_for_tcp,
472            CountReceiver,
473        },
474    };
475
476    fn event_from_bytes(
477        host_key: &str,
478        default_host: Option<Bytes>,
479        bytes: Bytes,
480        log_namespace: LogNamespace,
481    ) -> Option<Event> {
482        let parser = SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build();
483        let mut events = parser.parse(bytes, LogNamespace::Legacy).ok()?;
484        handle_events(
485            &mut events,
486            &Some(owned_value_path!(host_key)),
487            default_host,
488            log_namespace,
489        );
490        Some(events.remove(0))
491    }
492
493    #[test]
494    fn generate_config() {
495        crate::test_util::test_generate_config::<SyslogConfig>();
496    }
497
498    #[test]
499    fn output_schema_definition_vector_namespace() {
500        let config = SyslogConfig {
501            log_namespace: Some(true),
502            ..Default::default()
503        };
504
505        let definitions = config
506            .outputs(LogNamespace::Vector)
507            .remove(0)
508            .schema_definition(true);
509
510        let expected_definition =
511            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
512                .with_meaning(OwnedTargetPath::event_root(), "message")
513                .with_metadata_field(
514                    &owned_value_path!("vector", "source_type"),
515                    Kind::bytes(),
516                    None,
517                )
518                .with_metadata_field(
519                    &owned_value_path!("vector", "ingest_timestamp"),
520                    Kind::timestamp(),
521                    None,
522                )
523                .with_metadata_field(
524                    &owned_value_path!("syslog", "timestamp"),
525                    Kind::timestamp(),
526                    Some("timestamp"),
527                )
528                .with_metadata_field(
529                    &owned_value_path!("syslog", "hostname"),
530                    Kind::bytes().or_undefined(),
531                    Some("host"),
532                )
533                .with_metadata_field(
534                    &owned_value_path!("syslog", "source_ip"),
535                    Kind::bytes().or_undefined(),
536                    None,
537                )
538                .with_metadata_field(
539                    &owned_value_path!("syslog", "severity"),
540                    Kind::bytes().or_undefined(),
541                    Some("severity"),
542                )
543                .with_metadata_field(
544                    &owned_value_path!("syslog", "facility"),
545                    Kind::bytes().or_undefined(),
546                    None,
547                )
548                .with_metadata_field(
549                    &owned_value_path!("syslog", "version"),
550                    Kind::integer().or_undefined(),
551                    None,
552                )
553                .with_metadata_field(
554                    &owned_value_path!("syslog", "appname"),
555                    Kind::bytes().or_undefined(),
556                    Some("service"),
557                )
558                .with_metadata_field(
559                    &owned_value_path!("syslog", "msgid"),
560                    Kind::bytes().or_undefined(),
561                    None,
562                )
563                .with_metadata_field(
564                    &owned_value_path!("syslog", "procid"),
565                    Kind::integer().or_bytes().or_undefined(),
566                    None,
567                )
568                .with_metadata_field(
569                    &owned_value_path!("syslog", "structured_data"),
570                    Kind::object(Collection::from_unknown(Kind::object(
571                        Collection::from_unknown(Kind::bytes()),
572                    ))),
573                    None,
574                )
575                .with_metadata_field(
576                    &owned_value_path!("syslog", "tls_client_metadata"),
577                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
578                    None,
579                );
580
581        assert_eq!(definitions, Some(expected_definition));
582    }
583
584    #[test]
585    fn output_schema_definition_legacy_namespace() {
586        let config = SyslogConfig::default();
587
588        let definitions = config
589            .outputs(LogNamespace::Legacy)
590            .remove(0)
591            .schema_definition(true);
592
593        let expected_definition = Definition::new_with_default_metadata(
594            Kind::object(Collection::empty()),
595            [LogNamespace::Legacy],
596        )
597        .with_event_field(
598            &owned_value_path!("message"),
599            Kind::bytes(),
600            Some("message"),
601        )
602        .with_event_field(
603            &owned_value_path!("timestamp"),
604            Kind::timestamp(),
605            Some("timestamp"),
606        )
607        .with_event_field(
608            &owned_value_path!("hostname"),
609            Kind::bytes().or_undefined(),
610            Some("host"),
611        )
612        .with_event_field(
613            &owned_value_path!("source_ip"),
614            Kind::bytes().or_undefined(),
615            None,
616        )
617        .with_event_field(
618            &owned_value_path!("severity"),
619            Kind::bytes().or_undefined(),
620            Some("severity"),
621        )
622        .with_event_field(
623            &owned_value_path!("facility"),
624            Kind::bytes().or_undefined(),
625            None,
626        )
627        .with_event_field(
628            &owned_value_path!("version"),
629            Kind::integer().or_undefined(),
630            None,
631        )
632        .with_event_field(
633            &owned_value_path!("appname"),
634            Kind::bytes().or_undefined(),
635            Some("service"),
636        )
637        .with_event_field(
638            &owned_value_path!("msgid"),
639            Kind::bytes().or_undefined(),
640            None,
641        )
642        .with_event_field(
643            &owned_value_path!("procid"),
644            Kind::integer().or_bytes().or_undefined(),
645            None,
646        )
647        .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
648        .with_standard_vector_source_metadata();
649
650        assert_eq!(definitions, Some(expected_definition));
651    }
652
653    #[test]
654    fn config_tcp() {
655        let config: SyslogConfig = toml::from_str(
656            r#"
657            mode = "tcp"
658            address = "127.0.0.1:1235"
659          "#,
660        )
661        .unwrap();
662        assert!(matches!(config.mode, Mode::Tcp { .. }));
663    }
664
665    #[test]
666    fn config_tcp_with_receive_buffer_size() {
667        let config: SyslogConfig = toml::from_str(
668            r#"
669            mode = "tcp"
670            address = "127.0.0.1:1235"
671            receive_buffer_bytes = 256
672          "#,
673        )
674        .unwrap();
675
676        let receive_buffer_bytes = match config.mode {
677            Mode::Tcp {
678                receive_buffer_bytes,
679                ..
680            } => receive_buffer_bytes,
681            _ => panic!("expected Mode::Tcp"),
682        };
683
684        assert_eq!(receive_buffer_bytes, Some(256));
685    }
686
687    #[test]
688    fn config_tcp_keepalive_empty() {
689        let config: SyslogConfig = toml::from_str(
690            r#"
691            mode = "tcp"
692            address = "127.0.0.1:1235"
693          "#,
694        )
695        .unwrap();
696
697        let keepalive = match config.mode {
698            Mode::Tcp { keepalive, .. } => keepalive,
699            _ => panic!("expected Mode::Tcp"),
700        };
701
702        assert_eq!(keepalive, None);
703    }
704
705    #[test]
706    fn config_tcp_keepalive_full() {
707        let config: SyslogConfig = toml::from_str(
708            r#"
709            mode = "tcp"
710            address = "127.0.0.1:1235"
711            keepalive.time_secs = 7200
712          "#,
713        )
714        .unwrap();
715
716        let keepalive = match config.mode {
717            Mode::Tcp { keepalive, .. } => keepalive,
718            _ => panic!("expected Mode::Tcp"),
719        };
720
721        let keepalive = keepalive.expect("keepalive config not set");
722
723        assert_eq!(keepalive.time_secs, Some(7200));
724    }
725
726    #[test]
727    fn config_udp() {
728        let config: SyslogConfig = toml::from_str(
729            r#"
730            mode = "udp"
731            address = "127.0.0.1:1235"
732            max_length = 32187
733          "#,
734        )
735        .unwrap();
736        assert!(matches!(config.mode, Mode::Udp { .. }));
737    }
738
739    #[test]
740    fn config_udp_with_receive_buffer_size() {
741        let config: SyslogConfig = toml::from_str(
742            r#"
743            mode = "udp"
744            address = "127.0.0.1:1235"
745            max_length = 32187
746            receive_buffer_bytes = 256
747          "#,
748        )
749        .unwrap();
750
751        let receive_buffer_bytes = match config.mode {
752            Mode::Udp {
753                receive_buffer_bytes,
754                ..
755            } => receive_buffer_bytes,
756            _ => panic!("expected Mode::Udp"),
757        };
758
759        assert_eq!(receive_buffer_bytes, Some(256));
760    }
761
762    #[cfg(unix)]
763    #[test]
764    fn config_unix() {
765        let config: SyslogConfig = toml::from_str(
766            r#"
767            mode = "unix"
768            path = "127.0.0.1:1235"
769          "#,
770        )
771        .unwrap();
772        assert!(matches!(config.mode, Mode::Unix { .. }));
773    }
774
775    #[cfg(unix)]
776    #[test]
777    fn config_unix_permissions() {
778        let config: SyslogConfig = toml::from_str(
779            r#"
780            mode = "unix"
781            path = "127.0.0.1:1235"
782            socket_file_mode = 0o777
783          "#,
784        )
785        .unwrap();
786        let socket_file_mode = match config.mode {
787            Mode::Unix {
788                path: _,
789                socket_file_mode,
790            } => socket_file_mode,
791            _ => panic!("expected Mode::Unix"),
792        };
793
794        assert_eq!(socket_file_mode, Some(0o777));
795    }
796
797    #[test]
798    fn syslog_ng_network_syslog_protocol() {
799        // this should also match rsyslog omfwd with template=RSYSLOG_SyslogProtocol23Format
800        let msg = "i am foobar";
801        let raw = format!(
802            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {}{} {}"#,
803            r#"[meta sequenceId="1" sysUpTime="37" language="EN"]"#,
804            r#"[origin ip="192.168.0.1" software="test"]"#,
805            msg
806        );
807
808        let mut expected = Event::Log(LogEvent::from(msg));
809
810        {
811            let expected = expected.as_mut_log();
812            expected.insert(
813                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
814                Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
815                    .single()
816                    .expect("invalid timestamp"),
817            );
818            expected.insert(
819                log_schema().source_type_key_target_path().unwrap(),
820                "syslog",
821            );
822            expected.insert("host", "74794bfb6795");
823            expected.insert("hostname", "74794bfb6795");
824
825            expected.insert("meta.sequenceId", "1");
826            expected.insert("meta.sysUpTime", "37");
827            expected.insert("meta.language", "EN");
828            expected.insert("origin.software", "test");
829            expected.insert("origin.ip", "192.168.0.1");
830
831            expected.insert("severity", "notice");
832            expected.insert("facility", "user");
833            expected.insert("version", 1);
834            expected.insert("appname", "root");
835            expected.insert("procid", 8449);
836            expected.insert("source_ip", "192.168.0.254");
837        }
838
839        assert_event_data_eq!(
840            event_from_bytes(
841                "host",
842                Some(Bytes::from("192.168.0.254")),
843                raw.into(),
844                LogNamespace::Legacy
845            )
846            .unwrap(),
847            expected
848        );
849    }
850
851    #[test]
852    fn handles_incorrect_sd_element() {
853        let msg = "qwerty";
854        let raw = format!(
855            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
856            r"[incorrect x]", msg
857        );
858
859        let mut expected = Event::Log(LogEvent::from(msg));
860        {
861            let expected = expected.as_mut_log();
862            expected.insert(
863                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
864                Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
865                    .single()
866                    .expect("invalid timestamp"),
867            );
868            expected.insert(
869                log_schema().host_key().unwrap().to_string().as_str(),
870                "74794bfb6795",
871            );
872            expected.insert("hostname", "74794bfb6795");
873            expected.insert(
874                log_schema().source_type_key_target_path().unwrap(),
875                "syslog",
876            );
877            expected.insert("severity", "notice");
878            expected.insert("facility", "user");
879            expected.insert("version", 1);
880            expected.insert("appname", "root");
881            expected.insert("procid", 8449);
882            expected.insert("source_ip", "192.168.0.254");
883        }
884
885        let event = event_from_bytes(
886            "host",
887            Some(Bytes::from("192.168.0.254")),
888            raw.into(),
889            LogNamespace::Legacy,
890        )
891        .unwrap();
892        assert_event_data_eq!(event, expected);
893
894        let raw = format!(
895            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
896            r"[incorrect x=]", msg
897        );
898
899        let event = event_from_bytes(
900            "host",
901            Some(Bytes::from("192.168.0.254")),
902            raw.into(),
903            LogNamespace::Legacy,
904        )
905        .unwrap();
906        assert_event_data_eq!(event, expected);
907    }
908
909    #[test]
910    fn handles_empty_sd_element() {
911        fn there_is_map_called_empty(event: Event) -> bool {
912            event
913                .as_log()
914                .get("empty")
915                .expect("empty exists")
916                .is_object()
917        }
918
919        let msg = format!(
920            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
921            r"[empty]"
922        );
923
924        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
925        assert!(there_is_map_called_empty(event));
926
927        let msg = format!(
928            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
929            r#"[non_empty x="1"][empty]"#
930        );
931
932        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
933        assert!(there_is_map_called_empty(event));
934
935        let msg = format!(
936            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
937            r#"[empty][non_empty x="1"]"#
938        );
939
940        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
941        assert!(there_is_map_called_empty(event));
942
943        let msg = format!(
944            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
945            r#"[empty not_really="testing the test"]"#
946        );
947
948        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
949        assert!(there_is_map_called_empty(event));
950    }
951
952    #[test]
953    fn handles_weird_whitespace() {
954        // this should also match rsyslog omfwd with template=RSYSLOG_SyslogProtocol23Format
955        let raw = r#"
956            <13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
957            "#;
958        let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#;
959
960        assert_event_data_eq!(
961            event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap(),
962            event_from_bytes(
963                "host",
964                None,
965                cleaned.to_owned().into(),
966                LogNamespace::Legacy
967            )
968            .unwrap()
969        );
970    }
971
972    #[test]
973    fn handles_dots_in_sdata() {
974        let raw =
975            r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog:  [origin foo.bar="baz"] hello"#;
976        let event =
977            event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap();
978        assert_eq!(
979            event.as_log().get(r#"origin."foo.bar""#),
980            Some(&Value::from("baz"))
981        );
982    }
983
984    #[test]
985    fn syslog_ng_default_network() {
986        let msg = "i am foobar";
987        let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {msg}"#);
988        let event = event_from_bytes(
989            "host",
990            Some(Bytes::from("192.168.0.254")),
991            raw.into(),
992            LogNamespace::Legacy,
993        )
994        .unwrap();
995
996        let mut expected = Event::Log(LogEvent::from(msg));
997        {
998            let value = event.as_log().get("timestamp").unwrap();
999            let year = value.as_timestamp().unwrap().naive_local().year();
1000
1001            let expected = expected.as_mut_log();
1002            let expected_date: DateTime<Utc> = Local
1003                .with_ymd_and_hms(year, 2, 13, 20, 7, 26)
1004                .single()
1005                .expect("invalid timestamp")
1006                .into();
1007
1008            expected.insert(
1009                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1010                expected_date,
1011            );
1012            expected.insert(
1013                log_schema().host_key().unwrap().to_string().as_str(),
1014                "74794bfb6795",
1015            );
1016            expected.insert(
1017                log_schema().source_type_key_target_path().unwrap(),
1018                "syslog",
1019            );
1020            expected.insert("hostname", "74794bfb6795");
1021            expected.insert("severity", "notice");
1022            expected.insert("facility", "user");
1023            expected.insert("appname", "root");
1024            expected.insert("procid", 8539);
1025            expected.insert("source_ip", "192.168.0.254");
1026        }
1027
1028        assert_event_data_eq!(event, expected);
1029    }
1030
1031    #[test]
1032    fn rsyslog_omfwd_tcp_default() {
1033        let msg = "start";
1034        let raw = format!(
1035            r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog:  [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {msg}"#
1036        );
1037        let event = event_from_bytes(
1038            "host",
1039            Some(Bytes::from("192.168.0.254")),
1040            raw.into(),
1041            LogNamespace::Legacy,
1042        )
1043        .unwrap();
1044
1045        let mut expected = Event::Log(LogEvent::from(msg));
1046        {
1047            let value = event.as_log().get("timestamp").unwrap();
1048            let year = value.as_timestamp().unwrap().naive_local().year();
1049
1050            let expected = expected.as_mut_log();
1051            let expected_date: DateTime<Utc> = Local
1052                .with_ymd_and_hms(year, 2, 13, 21, 31, 56)
1053                .single()
1054                .expect("invalid timestamp")
1055                .into();
1056            expected.insert(
1057                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1058                expected_date,
1059            );
1060            expected.insert(
1061                log_schema().source_type_key_target_path().unwrap(),
1062                "syslog",
1063            );
1064            expected.insert("host", "74794bfb6795");
1065            expected.insert("hostname", "74794bfb6795");
1066            expected.insert("severity", "info");
1067            expected.insert("facility", "local7");
1068            expected.insert("appname", "liblogging-stdlog");
1069            expected.insert("origin.software", "rsyslogd");
1070            expected.insert("origin.swVersion", "8.24.0");
1071            expected.insert("source_ip", "192.168.0.254");
1072            expected.insert(event_path!("origin", "x-pid"), "8979");
1073            expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1074        }
1075
1076        assert_event_data_eq!(event, expected);
1077    }
1078
1079    #[test]
1080    fn rsyslog_omfwd_tcp_forward_format() {
1081        let msg = "start";
1082        let raw = format!(
1083            r#"<190>2019-02-13T21:53:30.605850+00:00 74794bfb6795 liblogging-stdlog:  [origin software="rsyslogd" swVersion="8.24.0" x-pid="9043" x-info="http://www.rsyslog.com"] {msg}"#
1084        );
1085
1086        let mut expected = Event::Log(LogEvent::from(msg));
1087        {
1088            let expected = expected.as_mut_log();
1089            expected.insert(
1090                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1091                Utc.with_ymd_and_hms(2019, 2, 13, 21, 53, 30)
1092                    .single()
1093                    .and_then(|t| t.with_nanosecond(605_850 * 1000))
1094                    .expect("invalid timestamp"),
1095            );
1096            expected.insert(
1097                log_schema().source_type_key_target_path().unwrap(),
1098                "syslog",
1099            );
1100            expected.insert("host", "74794bfb6795");
1101            expected.insert("hostname", "74794bfb6795");
1102            expected.insert("severity", "info");
1103            expected.insert("facility", "local7");
1104            expected.insert("appname", "liblogging-stdlog");
1105            expected.insert("origin.software", "rsyslogd");
1106            expected.insert("origin.swVersion", "8.24.0");
1107            expected.insert(event_path!("origin", "x-pid"), "9043");
1108            expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1109        }
1110
1111        assert_event_data_eq!(
1112            event_from_bytes("host", None, raw.into(), LogNamespace::Legacy).unwrap(),
1113            expected
1114        );
1115    }
1116
1117    #[tokio::test]
1118    async fn test_tcp_syslog() {
1119        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1120            let num_messages: usize = 10000;
1121            let in_addr = next_addr();
1122
1123            // Create and spawn the source.
1124            let config = SyslogConfig::from_mode(Mode::Tcp {
1125                address: in_addr.into(),
1126                permit_origin: None,
1127                keepalive: None,
1128                tls: None,
1129                receive_buffer_bytes: None,
1130                connection_limit: None,
1131            });
1132
1133            let key = ComponentKey::from("in");
1134            let (tx, rx) = SourceSender::new_test();
1135            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1136            let shutdown_complete = shutdown.shutdown_tripwire();
1137
1138            let source = config
1139                .build(context)
1140                .await
1141                .expect("source should not fail to build");
1142            tokio::spawn(source);
1143
1144            // Wait for source to become ready to accept traffic.
1145            wait_for_tcp(in_addr).await;
1146
1147            let output_events = CountReceiver::receive_events(rx);
1148
1149            // Now craft and send syslog messages to the source, and collect them on the other side.
1150            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1151                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1152                .collect();
1153
1154            let input_lines: Vec<String> =
1155                input_messages.iter().map(|msg| msg.to_string()).collect();
1156
1157            send_lines(in_addr, input_lines).await.unwrap();
1158
1159            // Wait a short period of time to ensure the messages get sent.
1160            sleep(Duration::from_secs(2)).await;
1161
1162            // Shutdown the source, and make sure we've got all the messages we sent in.
1163            shutdown
1164                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1165                .await;
1166            shutdown_complete.await;
1167
1168            let output_events = output_events.await;
1169            assert_eq!(output_events.len(), num_messages);
1170
1171            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1172                .into_iter()
1173                .map(|mut e| {
1174                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1175                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1176                    e.into()
1177                })
1178                .collect();
1179            assert_eq!(output_messages, input_messages);
1180        })
1181        .await;
1182    }
1183
1184    #[cfg(unix)]
1185    #[tokio::test]
1186    async fn test_unix_stream_syslog() {
1187        use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
1188        use futures_util::{stream, SinkExt};
1189        use std::os::unix::net::UnixStream as StdUnixStream;
1190        use tokio::io::AsyncWriteExt;
1191        use tokio::net::UnixStream;
1192        use tokio_util::codec::{FramedWrite, LinesCodec};
1193
1194        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1195            let num_messages: usize = 1;
1196            let in_path = tempfile::tempdir().unwrap().keep().join("stream_test");
1197
1198            // Create and spawn the source.
1199            let config = SyslogConfig::from_mode(Mode::Unix {
1200                path: in_path.clone(),
1201                socket_file_mode: None,
1202            });
1203
1204            let key = ComponentKey::from("in");
1205            let (tx, rx) = SourceSender::new_test();
1206            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1207            let shutdown_complete = shutdown.shutdown_tripwire();
1208
1209            let source = config
1210                .build(context)
1211                .await
1212                .expect("source should not fail to build");
1213            tokio::spawn(source);
1214
1215            // Wait for source to become ready to accept traffic.
1216            while StdUnixStream::connect(&in_path).is_err() {
1217                tokio::task::yield_now().await;
1218            }
1219
1220            let output_events = CountReceiver::receive_events(rx);
1221
1222            // Now craft and send syslog messages to the source, and collect them on the other side.
1223            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1224                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1225                .collect();
1226
1227            let stream = UnixStream::connect(&in_path).await.unwrap();
1228            let mut sink = FramedWrite::new(stream, LinesCodec::new());
1229
1230            let lines: Vec<String> = input_messages.iter().map(|msg| msg.to_string()).collect();
1231            let mut lines = stream::iter(lines).map(Ok);
1232            sink.send_all(&mut lines).await.unwrap();
1233
1234            let stream = sink.get_mut();
1235            stream.shutdown().await.unwrap();
1236
1237            // Wait a short period of time to ensure the messages get sent.
1238            sleep(Duration::from_secs(1)).await;
1239
1240            shutdown
1241                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1242                .await;
1243            shutdown_complete.await;
1244
1245            let output_events = output_events.await;
1246            assert_eq!(output_events.len(), num_messages);
1247
1248            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1249                .into_iter()
1250                .map(|mut e| {
1251                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1252                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1253                    e.into()
1254                })
1255                .collect();
1256            assert_eq!(output_messages, input_messages);
1257        })
1258        .await;
1259    }
1260
1261    #[tokio::test]
1262    async fn test_octet_counting_syslog() {
1263        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1264            let num_messages: usize = 10000;
1265            let in_addr = next_addr();
1266
1267            // Create and spawn the source.
1268            let config = SyslogConfig::from_mode(Mode::Tcp {
1269                address: in_addr.into(),
1270                permit_origin: None,
1271                keepalive: None,
1272                tls: None,
1273                receive_buffer_bytes: None,
1274                connection_limit: None,
1275            });
1276
1277            let key = ComponentKey::from("in");
1278            let (tx, rx) = SourceSender::new_test();
1279            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1280            let shutdown_complete = shutdown.shutdown_tripwire();
1281
1282            let source = config
1283                .build(context)
1284                .await
1285                .expect("source should not fail to build");
1286            tokio::spawn(source);
1287
1288            // Wait for source to become ready to accept traffic.
1289            wait_for_tcp(in_addr).await;
1290
1291            let output_events = CountReceiver::receive_events(rx);
1292
1293            // Now craft and send syslog messages to the source, and collect them on the other side.
1294            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1295                .map(|i| {
1296                    let mut msg = SyslogMessageRfc5424::random(i, 30, 4, 3, 3);
1297                    msg.message.push('\n');
1298                    msg.message.push_str(&random_string(30));
1299                    msg
1300                })
1301                .collect();
1302
1303            let codec = BytesCodec::new();
1304            let input_lines: Vec<Bytes> = input_messages
1305                .iter()
1306                .map(|msg| {
1307                    let s = msg.to_string();
1308                    format!("{} {}", s.len(), s).into()
1309                })
1310                .collect();
1311
1312            send_encodable(in_addr, codec, input_lines).await.unwrap();
1313
1314            // Wait a short period of time to ensure the messages get sent.
1315            sleep(Duration::from_secs(2)).await;
1316
1317            // Shutdown the source, and make sure we've got all the messages we sent in.
1318            shutdown
1319                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1320                .await;
1321            shutdown_complete.await;
1322
1323            let output_events = output_events.await;
1324            assert_eq!(output_events.len(), num_messages);
1325
1326            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1327                .into_iter()
1328                .map(|mut e| {
1329                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1330                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1331                    e.into()
1332                })
1333                .collect();
1334            assert_eq!(output_messages, input_messages);
1335        })
1336        .await;
1337    }
1338
1339    #[derive(Deserialize, PartialEq, Clone, Debug)]
1340    struct SyslogMessageRfc5424 {
1341        msgid: String,
1342        severity: Severity,
1343        facility: Facility,
1344        version: u8,
1345        timestamp: String,
1346        host: String,
1347        source_type: String,
1348        appname: String,
1349        procid: usize,
1350        message: String,
1351        #[serde(flatten)]
1352        structured_data: StructuredData,
1353    }
1354
1355    impl SyslogMessageRfc5424 {
1356        fn random(
1357            id: usize,
1358            msg_len: usize,
1359            field_len: usize,
1360            max_map_size: usize,
1361            max_children: usize,
1362        ) -> Self {
1363            let msg = random_string(msg_len);
1364            let structured_data = random_structured_data(max_map_size, max_children, field_len);
1365
1366            let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
1367            //"secfrac" can contain up to 6 digits, but TCP sinks uses `AutoSi`
1368
1369            Self {
1370                msgid: format!("test{id}"),
1371                severity: Severity::LOG_INFO,
1372                facility: Facility::LOG_USER,
1373                version: 1,
1374                timestamp,
1375                host: "hogwarts".to_owned(),
1376                source_type: "syslog".to_owned(),
1377                appname: "harry".to_owned(),
1378                procid: rng().random_range(0..32768),
1379                structured_data,
1380                message: msg,
1381            }
1382        }
1383    }
1384
1385    impl fmt::Display for SyslogMessageRfc5424 {
1386        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1387            write!(
1388                f,
1389                "<{}>{} {} {} {} {} {} {} {}",
1390                encode_priority(self.severity, self.facility),
1391                self.version,
1392                self.timestamp,
1393                self.host,
1394                self.appname,
1395                self.procid,
1396                self.msgid,
1397                format_structured_data_rfc5424(&self.structured_data),
1398                self.message
1399            )
1400        }
1401    }
1402
1403    impl From<Event> for SyslogMessageRfc5424 {
1404        fn from(e: Event) -> Self {
1405            let (value, _) = e.into_log().into_parts();
1406            let mut fields = value.into_object().unwrap();
1407
1408            Self {
1409                msgid: fields.remove("msgid").map(value_to_string).unwrap(),
1410                severity: fields
1411                    .remove("severity")
1412                    .map(value_to_string)
1413                    .and_then(|s| Severity::from_str(s.as_str()))
1414                    .unwrap(),
1415                facility: fields
1416                    .remove("facility")
1417                    .map(value_to_string)
1418                    .and_then(|s| Facility::from_str(s.as_str()))
1419                    .unwrap(),
1420                version: fields
1421                    .remove("version")
1422                    .map(value_to_string)
1423                    .map(|s| u8::from_str(s.as_str()).unwrap())
1424                    .unwrap(),
1425                timestamp: fields.remove("timestamp").map(value_to_string).unwrap(),
1426                host: fields.remove("host").map(value_to_string).unwrap(),
1427                source_type: fields.remove("source_type").map(value_to_string).unwrap(),
1428                appname: fields.remove("appname").map(value_to_string).unwrap(),
1429                procid: fields
1430                    .remove("procid")
1431                    .map(value_to_string)
1432                    .map(|s| usize::from_str(s.as_str()).unwrap())
1433                    .unwrap(),
1434                message: fields.remove("message").map(value_to_string).unwrap(),
1435                structured_data: structured_data_from_fields(fields),
1436            }
1437        }
1438    }
1439
1440    fn structured_data_from_fields(fields: ObjectMap) -> StructuredData {
1441        let mut structured_data = StructuredData::default();
1442
1443        for (key, value) in fields.into_iter() {
1444            let subfields = value
1445                .into_object()
1446                .unwrap()
1447                .into_iter()
1448                .map(|(k, v)| (k.into(), value_to_string(v)))
1449                .collect();
1450
1451            structured_data.insert(key.into(), subfields);
1452        }
1453
1454        structured_data
1455    }
1456
1457    #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1458    #[derive(Copy, Clone, Deserialize, PartialEq, Eq, Debug)]
1459    pub enum Severity {
1460        #[serde(rename(deserialize = "emergency"))]
1461        LOG_EMERG,
1462        #[serde(rename(deserialize = "alert"))]
1463        LOG_ALERT,
1464        #[serde(rename(deserialize = "critical"))]
1465        LOG_CRIT,
1466        #[serde(rename(deserialize = "error"))]
1467        LOG_ERR,
1468        #[serde(rename(deserialize = "warn"))]
1469        LOG_WARNING,
1470        #[serde(rename(deserialize = "notice"))]
1471        LOG_NOTICE,
1472        #[serde(rename(deserialize = "info"))]
1473        LOG_INFO,
1474        #[serde(rename(deserialize = "debug"))]
1475        LOG_DEBUG,
1476    }
1477
1478    impl Severity {
1479        fn from_str(s: &str) -> Option<Self> {
1480            match s {
1481                "emergency" => Some(Self::LOG_EMERG),
1482                "alert" => Some(Self::LOG_ALERT),
1483                "critical" => Some(Self::LOG_CRIT),
1484                "error" => Some(Self::LOG_ERR),
1485                "warn" => Some(Self::LOG_WARNING),
1486                "notice" => Some(Self::LOG_NOTICE),
1487                "info" => Some(Self::LOG_INFO),
1488                "debug" => Some(Self::LOG_DEBUG),
1489
1490                x => {
1491                    #[allow(clippy::print_stdout)]
1492                    {
1493                        println!("converting severity str, got {x}");
1494                    }
1495                    None
1496                }
1497            }
1498        }
1499    }
1500
1501    #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1502    #[derive(Copy, Clone, PartialEq, Eq, Deserialize, Debug)]
1503    pub enum Facility {
1504        #[serde(rename(deserialize = "kernel"))]
1505        LOG_KERN = 0 << 3,
1506        #[serde(rename(deserialize = "user"))]
1507        LOG_USER = 1 << 3,
1508        #[serde(rename(deserialize = "mail"))]
1509        LOG_MAIL = 2 << 3,
1510        #[serde(rename(deserialize = "daemon"))]
1511        LOG_DAEMON = 3 << 3,
1512        #[serde(rename(deserialize = "auth"))]
1513        LOG_AUTH = 4 << 3,
1514        #[serde(rename(deserialize = "syslog"))]
1515        LOG_SYSLOG = 5 << 3,
1516    }
1517
1518    impl Facility {
1519        fn from_str(s: &str) -> Option<Self> {
1520            match s {
1521                "kernel" => Some(Self::LOG_KERN),
1522                "user" => Some(Self::LOG_USER),
1523                "mail" => Some(Self::LOG_MAIL),
1524                "daemon" => Some(Self::LOG_DAEMON),
1525                "auth" => Some(Self::LOG_AUTH),
1526                "syslog" => Some(Self::LOG_SYSLOG),
1527                _ => None,
1528            }
1529        }
1530    }
1531
1532    type StructuredData = HashMap<String, HashMap<String, String>>;
1533
1534    fn random_structured_data(
1535        max_map_size: usize,
1536        max_children: usize,
1537        field_len: usize,
1538    ) -> StructuredData {
1539        let amount = rng().random_range(0..max_children);
1540
1541        random_maps(max_map_size, field_len)
1542            .filter(|m| !m.is_empty()) //syslog_rfc5424 ignores empty maps, tested separately
1543            .take(amount)
1544            .enumerate()
1545            .map(|(i, map)| (format!("id{i}"), map))
1546            .collect()
1547    }
1548
1549    fn format_structured_data_rfc5424(data: &StructuredData) -> String {
1550        if data.is_empty() {
1551            "-".to_string()
1552        } else {
1553            let mut res = String::new();
1554            for (id, params) in data {
1555                res = res + "[" + id;
1556                for (name, value) in params {
1557                    res = res + " " + name + "=\"" + value + "\"";
1558                }
1559                res += "]";
1560            }
1561
1562            res
1563        }
1564    }
1565
1566    const fn encode_priority(severity: Severity, facility: Facility) -> u8 {
1567        facility as u8 | severity as u8
1568    }
1569
1570    fn value_to_string(v: Value) -> String {
1571        if v.is_bytes() {
1572            let buf = v.as_bytes().unwrap();
1573            String::from_utf8_lossy(buf).to_string()
1574        } else if v.is_timestamp() {
1575            let ts = v.as_timestamp().unwrap();
1576            ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1577        } else {
1578            v.to_string()
1579        }
1580    }
1581}