vector/sources/
syslog.rs

1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use chrono::Utc;
7use futures::StreamExt;
8use listenfd::ListenFd;
9use smallvec::SmallVec;
10use tokio_util::udp::UdpFramed;
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::{
14        BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
15        decoding::{Deserializer, Framer},
16    },
17    config::{LegacyKey, LogNamespace},
18    configurable::configurable_component,
19    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
20    ipallowlist::IpAllowlistConfig,
21    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, path},
22};
23use vrl::event_path;
24
25#[cfg(unix)]
26use crate::sources::util::build_unix_stream_source;
27use crate::{
28    SourceSender,
29    codecs::Decoder,
30    config::{
31        DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput, log_schema,
32    },
33    event::Event,
34    internal_events::{
35        SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
36    },
37    net,
38    shutdown::ShutdownSignal,
39    sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket},
40    tcp::TcpKeepaliveConfig,
41    tls::{MaybeTlsSettings, TlsSourceConfig},
42};
43
44/// Configuration for the `syslog` source.
45#[configurable_component(source("syslog", "Collect logs sent via Syslog."))]
46#[derive(Clone, Debug)]
47pub struct SyslogConfig {
48    #[serde(flatten)]
49    mode: Mode,
50
51    /// The maximum buffer size of incoming messages, in bytes.
52    ///
53    /// Messages larger than this are truncated.
54    #[serde(default = "crate::serde::default_max_length")]
55    #[configurable(metadata(docs::type_unit = "bytes"))]
56    max_length: usize,
57
58    /// Overrides the name of the log field used to add the peer host to each event.
59    ///
60    /// If using TCP or UDP, the value is the peer host's address, including the port. For example, `1.2.3.4:9000`. If using
61    /// UDS, the value is the socket path itself.
62    ///
63    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
64    ///
65    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
66    host_key: Option<OptionalValuePath>,
67
68    /// The namespace to use for logs. This overrides the global setting.
69    #[configurable(metadata(docs::hidden))]
70    #[serde(default)]
71    pub log_namespace: Option<bool>,
72}
73
74/// Listener mode for the `syslog` source.
75#[configurable_component]
76#[derive(Clone, Debug)]
77#[serde(tag = "mode", rename_all = "snake_case")]
78#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
79#[allow(clippy::large_enum_variant)]
80pub enum Mode {
81    /// Listen on TCP.
82    Tcp {
83        #[configurable(derived)]
84        address: SocketListenAddr,
85
86        #[configurable(derived)]
87        keepalive: Option<TcpKeepaliveConfig>,
88
89        #[configurable(derived)]
90        permit_origin: Option<IpAllowlistConfig>,
91
92        #[configurable(derived)]
93        tls: Option<TlsSourceConfig>,
94
95        /// The size of the receive buffer used for each connection.
96        ///
97        /// This should not typically needed to be changed.
98        #[configurable(metadata(docs::type_unit = "bytes"))]
99        receive_buffer_bytes: Option<usize>,
100
101        /// The maximum number of TCP connections that are allowed at any given time.
102        connection_limit: Option<u32>,
103    },
104
105    /// Listen on UDP.
106    Udp {
107        #[configurable(derived)]
108        address: SocketListenAddr,
109
110        /// The size of the receive buffer used for the listening socket.
111        ///
112        /// This should not typically needed to be changed.
113        #[configurable(metadata(docs::type_unit = "bytes"))]
114        receive_buffer_bytes: Option<usize>,
115    },
116
117    /// Listen on UDS (Unix domain socket). This only supports Unix stream sockets.
118    ///
119    /// For Unix datagram sockets, use the `socket` source instead.
120    #[cfg(unix)]
121    Unix {
122        /// The Unix socket path.
123        ///
124        /// This should be an absolute path.
125        #[configurable(metadata(docs::examples = "/path/to/socket"))]
126        path: PathBuf,
127
128        /// Unix file mode bits to be applied to the unix socket file as its designated file permissions.
129        ///
130        /// The file mode value can be specified in any numeric format supported by your configuration
131        /// language, but it is most intuitive to use an octal number.
132        socket_file_mode: Option<u32>,
133    },
134}
135
136impl SyslogConfig {
137    #[cfg(test)]
138    pub fn from_mode(mode: Mode) -> Self {
139        Self {
140            mode,
141            host_key: None,
142            max_length: crate::serde::default_max_length(),
143            log_namespace: None,
144        }
145    }
146}
147
148impl Default for SyslogConfig {
149    fn default() -> Self {
150        Self {
151            mode: Mode::Tcp {
152                address: SocketListenAddr::SocketAddr("0.0.0.0:514".parse().unwrap()),
153                keepalive: None,
154                permit_origin: None,
155                tls: None,
156                receive_buffer_bytes: None,
157                connection_limit: None,
158            },
159            host_key: None,
160            max_length: crate::serde::default_max_length(),
161            log_namespace: None,
162        }
163    }
164}
165
166impl GenerateConfig for SyslogConfig {
167    fn generate_config() -> toml::Value {
168        toml::Value::try_from(SyslogConfig::default()).unwrap()
169    }
170}
171
172#[async_trait::async_trait]
173#[typetag::serde(name = "syslog")]
174impl SourceConfig for SyslogConfig {
175    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
176        let log_namespace = cx.log_namespace(self.log_namespace);
177        let host_key = self
178            .host_key
179            .clone()
180            .and_then(|k| k.path)
181            .or(log_schema().host_key().cloned());
182
183        match self.mode.clone() {
184            Mode::Tcp {
185                address,
186                keepalive,
187                permit_origin,
188                tls,
189                receive_buffer_bytes,
190                connection_limit,
191            } => {
192                let source = SyslogTcpSource {
193                    max_length: self.max_length,
194                    host_key,
195                    log_namespace,
196                };
197                let shutdown_secs = Duration::from_secs(30);
198                let tls_config = tls.as_ref().map(|tls| tls.tls_config.clone());
199                let tls_client_metadata_key = tls
200                    .as_ref()
201                    .and_then(|tls| tls.client_metadata_key.clone())
202                    .and_then(|k| k.path);
203                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
204                source.run(
205                    address,
206                    keepalive,
207                    shutdown_secs,
208                    tls,
209                    tls_client_metadata_key,
210                    receive_buffer_bytes,
211                    None,
212                    cx,
213                    false.into(),
214                    connection_limit,
215                    permit_origin.map(Into::into),
216                    SyslogConfig::NAME,
217                    log_namespace,
218                )
219            }
220            Mode::Udp {
221                address,
222                receive_buffer_bytes,
223            } => Ok(udp(
224                address,
225                self.max_length,
226                host_key,
227                receive_buffer_bytes,
228                cx.shutdown,
229                log_namespace,
230                cx.out,
231            )),
232            #[cfg(unix)]
233            Mode::Unix {
234                path,
235                socket_file_mode,
236            } => {
237                let decoder = Decoder::new(
238                    Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(
239                        self.max_length,
240                    )),
241                    Deserializer::Syslog(
242                        SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
243                    ),
244                );
245
246                build_unix_stream_source(
247                    path,
248                    socket_file_mode,
249                    decoder,
250                    move |events, host| handle_events(events, &host_key, host, log_namespace),
251                    cx.shutdown,
252                    cx.out,
253                )
254            }
255        }
256    }
257
258    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
259        let log_namespace = global_log_namespace.merge(self.log_namespace);
260        let schema_definition = SyslogDeserializerConfig::from_source(SyslogConfig::NAME)
261            .schema_definition(log_namespace)
262            .with_standard_vector_source_metadata();
263
264        vec![SourceOutput::new_maybe_logs(
265            DataType::Log,
266            schema_definition,
267        )]
268    }
269
270    fn resources(&self) -> Vec<Resource> {
271        match self.mode.clone() {
272            Mode::Tcp { address, .. } => vec![address.as_tcp_resource()],
273            Mode::Udp { address, .. } => vec![address.as_udp_resource()],
274            #[cfg(unix)]
275            Mode::Unix { .. } => vec![],
276        }
277    }
278
279    fn can_acknowledge(&self) -> bool {
280        false
281    }
282}
283
284#[derive(Debug, Clone)]
285struct SyslogTcpSource {
286    max_length: usize,
287    host_key: Option<OwnedValuePath>,
288    log_namespace: LogNamespace,
289}
290
291impl TcpSource for SyslogTcpSource {
292    type Error = vector_lib::codecs::decoding::Error;
293    type Item = SmallVec<[Event; 1]>;
294    type Decoder = Decoder;
295    type Acker = TcpNullAcker;
296
297    fn decoder(&self) -> Self::Decoder {
298        Decoder::new(
299            Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(self.max_length)),
300            Deserializer::Syslog(SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build()),
301        )
302    }
303
304    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
305        handle_events(
306            events,
307            &self.host_key,
308            Some(host.ip().to_string().into()),
309            self.log_namespace,
310        );
311    }
312
313    fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
314        TcpNullAcker
315    }
316}
317
318pub fn udp(
319    addr: SocketListenAddr,
320    _max_length: usize,
321    host_key: Option<OwnedValuePath>,
322    receive_buffer_bytes: Option<usize>,
323    shutdown: ShutdownSignal,
324    log_namespace: LogNamespace,
325    mut out: SourceSender,
326) -> super::Source {
327    Box::pin(async move {
328        let listenfd = ListenFd::from_env();
329        let socket = try_bind_udp_socket(addr, listenfd).await.map_err(|error| {
330            emit!(SocketBindError {
331                mode: SocketMode::Udp,
332                error: &error,
333            })
334        })?;
335
336        if let Some(receive_buffer_bytes) = receive_buffer_bytes
337            && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
338        {
339            warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
340        }
341
342        info!(
343            message = "Listening.",
344            addr = %addr,
345            r#type = "udp"
346        );
347
348        let bytes_received = register!(BytesReceived::from(Protocol::UDP));
349
350        let mut stream = UdpFramed::new(
351            socket,
352            Decoder::new(
353                Framer::Bytes(BytesDecoder::new()),
354                Deserializer::Syslog(
355                    SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
356                ),
357            ),
358        )
359        .take_until(shutdown)
360        .filter_map(|frame| {
361            let host_key = host_key.clone();
362            let bytes_received = bytes_received.clone();
363            async move {
364                match frame {
365                    Ok(((mut events, byte_size), received_from)) => {
366                        let count = events.len();
367                        bytes_received.emit(ByteSize(byte_size));
368                        emit!(SocketEventsReceived {
369                            mode: SocketMode::Udp,
370                            byte_size: events.estimated_json_encoded_size_of(),
371                            count,
372                        });
373                        let received_from = received_from.ip().to_string().into();
374                        handle_events(&mut events, &host_key, Some(received_from), log_namespace);
375                        Some(events.remove(0))
376                    }
377                    Err(error) => {
378                        emit!(SocketReceiveError {
379                            mode: SocketMode::Udp,
380                            error: &error,
381                        });
382                        None
383                    }
384                }
385            }
386        })
387        .boxed();
388
389        match out.send_event_stream(&mut stream).await {
390            Ok(()) => {
391                debug!("Finished sending.");
392                Ok(())
393            }
394            Err(_) => {
395                let (count, _) = stream.size_hint();
396                emit!(StreamClosedError { count });
397                Err(())
398            }
399        }
400    })
401}
402
403fn handle_events(
404    events: &mut [Event],
405    host_key: &Option<OwnedValuePath>,
406    default_host: Option<Bytes>,
407    log_namespace: LogNamespace,
408) {
409    for event in events {
410        enrich_syslog_event(event, host_key, default_host.clone(), log_namespace);
411    }
412}
413
414fn enrich_syslog_event(
415    event: &mut Event,
416    host_key: &Option<OwnedValuePath>,
417    default_host: Option<Bytes>,
418    log_namespace: LogNamespace,
419) {
420    let log = event.as_mut_log();
421
422    if let Some(default_host) = &default_host {
423        log_namespace.insert_source_metadata(
424            SyslogConfig::NAME,
425            log,
426            Some(LegacyKey::Overwrite(path!("source_ip"))),
427            path!("source_ip"),
428            default_host.clone(),
429        );
430    }
431
432    let parsed_hostname = log
433        .get(event_path!("hostname"))
434        .map(|hostname| hostname.coerce_to_bytes());
435
436    if let Some(parsed_host) = parsed_hostname.or(default_host) {
437        let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
438
439        log_namespace.insert_source_metadata(
440            SyslogConfig::NAME,
441            log,
442            legacy_host_key,
443            path!("host"),
444            parsed_host,
445        );
446    }
447
448    log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now());
449
450    if log_namespace == LogNamespace::Legacy {
451        let timestamp = log
452            .get(event_path!("timestamp"))
453            .and_then(|timestamp| timestamp.as_timestamp().cloned())
454            .unwrap_or_else(Utc::now);
455        log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
456    }
457
458    trace!(
459        message = "Processing one event.",
460        event = ?event
461    );
462}
463
464#[cfg(test)]
465mod test {
466    use std::{collections::HashMap, fmt, str::FromStr};
467
468    use chrono::prelude::*;
469    use rand::{Rng, rng};
470    use serde::Deserialize;
471    use tokio::time::{Duration, Instant, sleep};
472    use tokio_util::codec::BytesCodec;
473    use vector_lib::{
474        assert_event_data_eq,
475        codecs::decoding::format::Deserializer,
476        config::ComponentKey,
477        lookup::{OwnedTargetPath, PathPrefix, event_path, owned_value_path},
478        schema::Definition,
479    };
480    use vrl::value::{Kind, ObjectMap, Value, kind::Collection};
481
482    use super::*;
483    use crate::{
484        config::log_schema,
485        event::{Event, LogEvent},
486        test_util::{
487            CountReceiver,
488            addr::next_addr,
489            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
490            random_maps, random_string, send_encodable, send_lines, wait_for_tcp,
491        },
492    };
493
494    fn event_from_bytes(
495        host_key: &str,
496        default_host: Option<Bytes>,
497        bytes: Bytes,
498        log_namespace: LogNamespace,
499    ) -> Option<Event> {
500        let parser = SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build();
501        let mut events = parser.parse(bytes, LogNamespace::Legacy).ok()?;
502        handle_events(
503            &mut events,
504            &Some(owned_value_path!(host_key)),
505            default_host,
506            log_namespace,
507        );
508        Some(events.remove(0))
509    }
510
511    #[test]
512    fn generate_config() {
513        crate::test_util::test_generate_config::<SyslogConfig>();
514    }
515
516    #[test]
517    fn output_schema_definition_vector_namespace() {
518        let config = SyslogConfig {
519            log_namespace: Some(true),
520            ..Default::default()
521        };
522
523        let definitions = config
524            .outputs(LogNamespace::Vector)
525            .remove(0)
526            .schema_definition(true);
527
528        let expected_definition =
529            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
530                .with_meaning(OwnedTargetPath::event_root(), "message")
531                .with_metadata_field(
532                    &owned_value_path!("vector", "source_type"),
533                    Kind::bytes(),
534                    None,
535                )
536                .with_metadata_field(
537                    &owned_value_path!("vector", "ingest_timestamp"),
538                    Kind::timestamp(),
539                    None,
540                )
541                .with_metadata_field(
542                    &owned_value_path!("syslog", "timestamp"),
543                    Kind::timestamp(),
544                    Some("timestamp"),
545                )
546                .with_metadata_field(
547                    &owned_value_path!("syslog", "hostname"),
548                    Kind::bytes().or_undefined(),
549                    Some("host"),
550                )
551                .with_metadata_field(
552                    &owned_value_path!("syslog", "source_ip"),
553                    Kind::bytes().or_undefined(),
554                    None,
555                )
556                .with_metadata_field(
557                    &owned_value_path!("syslog", "severity"),
558                    Kind::bytes().or_undefined(),
559                    Some("severity"),
560                )
561                .with_metadata_field(
562                    &owned_value_path!("syslog", "facility"),
563                    Kind::bytes().or_undefined(),
564                    None,
565                )
566                .with_metadata_field(
567                    &owned_value_path!("syslog", "version"),
568                    Kind::integer().or_undefined(),
569                    None,
570                )
571                .with_metadata_field(
572                    &owned_value_path!("syslog", "appname"),
573                    Kind::bytes().or_undefined(),
574                    Some("service"),
575                )
576                .with_metadata_field(
577                    &owned_value_path!("syslog", "msgid"),
578                    Kind::bytes().or_undefined(),
579                    None,
580                )
581                .with_metadata_field(
582                    &owned_value_path!("syslog", "procid"),
583                    Kind::integer().or_bytes().or_undefined(),
584                    None,
585                )
586                .with_metadata_field(
587                    &owned_value_path!("syslog", "structured_data"),
588                    Kind::object(Collection::from_unknown(Kind::object(
589                        Collection::from_unknown(Kind::bytes()),
590                    ))),
591                    None,
592                )
593                .with_metadata_field(
594                    &owned_value_path!("syslog", "tls_client_metadata"),
595                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
596                    None,
597                );
598
599        assert_eq!(definitions, Some(expected_definition));
600    }
601
602    #[test]
603    fn output_schema_definition_legacy_namespace() {
604        let config = SyslogConfig::default();
605
606        let definitions = config
607            .outputs(LogNamespace::Legacy)
608            .remove(0)
609            .schema_definition(true);
610
611        let expected_definition = Definition::new_with_default_metadata(
612            Kind::object(Collection::empty()),
613            [LogNamespace::Legacy],
614        )
615        .with_event_field(
616            &owned_value_path!("message"),
617            Kind::bytes(),
618            Some("message"),
619        )
620        .with_event_field(
621            &owned_value_path!("timestamp"),
622            Kind::timestamp(),
623            Some("timestamp"),
624        )
625        .with_event_field(
626            &owned_value_path!("hostname"),
627            Kind::bytes().or_undefined(),
628            Some("host"),
629        )
630        .with_event_field(
631            &owned_value_path!("source_ip"),
632            Kind::bytes().or_undefined(),
633            None,
634        )
635        .with_event_field(
636            &owned_value_path!("severity"),
637            Kind::bytes().or_undefined(),
638            Some("severity"),
639        )
640        .with_event_field(
641            &owned_value_path!("facility"),
642            Kind::bytes().or_undefined(),
643            None,
644        )
645        .with_event_field(
646            &owned_value_path!("version"),
647            Kind::integer().or_undefined(),
648            None,
649        )
650        .with_event_field(
651            &owned_value_path!("appname"),
652            Kind::bytes().or_undefined(),
653            Some("service"),
654        )
655        .with_event_field(
656            &owned_value_path!("msgid"),
657            Kind::bytes().or_undefined(),
658            None,
659        )
660        .with_event_field(
661            &owned_value_path!("procid"),
662            Kind::integer().or_bytes().or_undefined(),
663            None,
664        )
665        .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
666        .with_standard_vector_source_metadata();
667
668        assert_eq!(definitions, Some(expected_definition));
669    }
670
671    #[test]
672    fn config_tcp() {
673        let config: SyslogConfig = toml::from_str(
674            r#"
675            mode = "tcp"
676            address = "127.0.0.1:1235"
677          "#,
678        )
679        .unwrap();
680        assert!(matches!(config.mode, Mode::Tcp { .. }));
681    }
682
683    #[test]
684    fn config_tcp_with_receive_buffer_size() {
685        let config: SyslogConfig = toml::from_str(
686            r#"
687            mode = "tcp"
688            address = "127.0.0.1:1235"
689            receive_buffer_bytes = 256
690          "#,
691        )
692        .unwrap();
693
694        let receive_buffer_bytes = match config.mode {
695            Mode::Tcp {
696                receive_buffer_bytes,
697                ..
698            } => receive_buffer_bytes,
699            _ => panic!("expected Mode::Tcp"),
700        };
701
702        assert_eq!(receive_buffer_bytes, Some(256));
703    }
704
705    #[test]
706    fn config_tcp_keepalive_empty() {
707        let config: SyslogConfig = toml::from_str(
708            r#"
709            mode = "tcp"
710            address = "127.0.0.1:1235"
711          "#,
712        )
713        .unwrap();
714
715        let keepalive = match config.mode {
716            Mode::Tcp { keepalive, .. } => keepalive,
717            _ => panic!("expected Mode::Tcp"),
718        };
719
720        assert_eq!(keepalive, None);
721    }
722
723    #[test]
724    fn config_tcp_keepalive_full() {
725        let config: SyslogConfig = toml::from_str(
726            r#"
727            mode = "tcp"
728            address = "127.0.0.1:1235"
729            keepalive.time_secs = 7200
730          "#,
731        )
732        .unwrap();
733
734        let keepalive = match config.mode {
735            Mode::Tcp { keepalive, .. } => keepalive,
736            _ => panic!("expected Mode::Tcp"),
737        };
738
739        let keepalive = keepalive.expect("keepalive config not set");
740
741        assert_eq!(keepalive.time_secs, Some(7200));
742    }
743
744    #[test]
745    fn config_udp() {
746        let config: SyslogConfig = toml::from_str(
747            r#"
748            mode = "udp"
749            address = "127.0.0.1:1235"
750            max_length = 32187
751          "#,
752        )
753        .unwrap();
754        assert!(matches!(config.mode, Mode::Udp { .. }));
755    }
756
757    #[test]
758    fn config_udp_with_receive_buffer_size() {
759        let config: SyslogConfig = toml::from_str(
760            r#"
761            mode = "udp"
762            address = "127.0.0.1:1235"
763            max_length = 32187
764            receive_buffer_bytes = 256
765          "#,
766        )
767        .unwrap();
768
769        let receive_buffer_bytes = match config.mode {
770            Mode::Udp {
771                receive_buffer_bytes,
772                ..
773            } => receive_buffer_bytes,
774            _ => panic!("expected Mode::Udp"),
775        };
776
777        assert_eq!(receive_buffer_bytes, Some(256));
778    }
779
780    #[cfg(unix)]
781    #[test]
782    fn config_unix() {
783        let config: SyslogConfig = toml::from_str(
784            r#"
785            mode = "unix"
786            path = "127.0.0.1:1235"
787          "#,
788        )
789        .unwrap();
790        assert!(matches!(config.mode, Mode::Unix { .. }));
791    }
792
793    #[cfg(unix)]
794    #[test]
795    fn config_unix_permissions() {
796        let config: SyslogConfig = toml::from_str(
797            r#"
798            mode = "unix"
799            path = "127.0.0.1:1235"
800            socket_file_mode = 0o777
801          "#,
802        )
803        .unwrap();
804        let socket_file_mode = match config.mode {
805            Mode::Unix {
806                path: _,
807                socket_file_mode,
808            } => socket_file_mode,
809            _ => panic!("expected Mode::Unix"),
810        };
811
812        assert_eq!(socket_file_mode, Some(0o777));
813    }
814
815    #[test]
816    fn syslog_ng_network_syslog_protocol() {
817        // this should also match rsyslog omfwd with template=RSYSLOG_SyslogProtocol23Format
818        let msg = "i am foobar";
819        let raw = format!(
820            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {}{} {}"#,
821            r#"[meta sequenceId="1" sysUpTime="37" language="EN"]"#,
822            r#"[origin ip="192.168.0.1" software="test"]"#,
823            msg
824        );
825
826        let mut expected = Event::Log(LogEvent::from(msg));
827
828        {
829            let expected = expected.as_mut_log();
830            expected.insert(
831                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
832                Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
833                    .single()
834                    .expect("invalid timestamp"),
835            );
836            expected.insert(
837                log_schema().source_type_key_target_path().unwrap(),
838                "syslog",
839            );
840            expected.insert("host", "74794bfb6795");
841            expected.insert("hostname", "74794bfb6795");
842
843            expected.insert("meta.sequenceId", "1");
844            expected.insert("meta.sysUpTime", "37");
845            expected.insert("meta.language", "EN");
846            expected.insert("origin.software", "test");
847            expected.insert("origin.ip", "192.168.0.1");
848
849            expected.insert("severity", "notice");
850            expected.insert("facility", "user");
851            expected.insert("version", 1);
852            expected.insert("appname", "root");
853            expected.insert("procid", 8449);
854            expected.insert("source_ip", "192.168.0.254");
855        }
856
857        assert_event_data_eq!(
858            event_from_bytes(
859                "host",
860                Some(Bytes::from("192.168.0.254")),
861                raw.into(),
862                LogNamespace::Legacy
863            )
864            .unwrap(),
865            expected
866        );
867    }
868
869    #[test]
870    fn handles_incorrect_sd_element() {
871        let msg = "qwerty";
872        let raw = format!(
873            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
874            r"[incorrect x]", msg
875        );
876
877        let mut expected = Event::Log(LogEvent::from(msg));
878        {
879            let expected = expected.as_mut_log();
880            expected.insert(
881                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
882                Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
883                    .single()
884                    .expect("invalid timestamp"),
885            );
886            expected.insert(
887                log_schema().host_key().unwrap().to_string().as_str(),
888                "74794bfb6795",
889            );
890            expected.insert("hostname", "74794bfb6795");
891            expected.insert(
892                log_schema().source_type_key_target_path().unwrap(),
893                "syslog",
894            );
895            expected.insert("severity", "notice");
896            expected.insert("facility", "user");
897            expected.insert("version", 1);
898            expected.insert("appname", "root");
899            expected.insert("procid", 8449);
900            expected.insert("source_ip", "192.168.0.254");
901        }
902
903        let event = event_from_bytes(
904            "host",
905            Some(Bytes::from("192.168.0.254")),
906            raw.into(),
907            LogNamespace::Legacy,
908        )
909        .unwrap();
910        assert_event_data_eq!(event, expected);
911
912        let raw = format!(
913            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
914            r"[incorrect x=]", msg
915        );
916
917        let event = event_from_bytes(
918            "host",
919            Some(Bytes::from("192.168.0.254")),
920            raw.into(),
921            LogNamespace::Legacy,
922        )
923        .unwrap();
924        assert_event_data_eq!(event, expected);
925    }
926
927    #[test]
928    fn handles_empty_sd_element() {
929        fn there_is_map_called_empty(event: Event) -> bool {
930            event
931                .as_log()
932                .get("empty")
933                .expect("empty exists")
934                .is_object()
935        }
936
937        let msg = format!(
938            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
939            r"[empty]"
940        );
941
942        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
943        assert!(there_is_map_called_empty(event));
944
945        let msg = format!(
946            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
947            r#"[non_empty x="1"][empty]"#
948        );
949
950        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
951        assert!(there_is_map_called_empty(event));
952
953        let msg = format!(
954            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
955            r#"[empty][non_empty x="1"]"#
956        );
957
958        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
959        assert!(there_is_map_called_empty(event));
960
961        let msg = format!(
962            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
963            r#"[empty not_really="testing the test"]"#
964        );
965
966        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
967        assert!(there_is_map_called_empty(event));
968    }
969
970    #[test]
971    fn handles_weird_whitespace() {
972        // this should also match rsyslog omfwd with template=RSYSLOG_SyslogProtocol23Format
973        let raw = r#"
974            <13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
975            "#;
976        let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#;
977
978        assert_event_data_eq!(
979            event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap(),
980            event_from_bytes(
981                "host",
982                None,
983                cleaned.to_owned().into(),
984                LogNamespace::Legacy
985            )
986            .unwrap()
987        );
988    }
989
990    #[test]
991    fn handles_dots_in_sdata() {
992        let raw =
993            r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog:  [origin foo.bar="baz"] hello"#;
994        let event =
995            event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap();
996        assert_eq!(
997            event.as_log().get(r#"origin."foo.bar""#),
998            Some(&Value::from("baz"))
999        );
1000    }
1001
1002    #[test]
1003    fn syslog_ng_default_network() {
1004        let msg = "i am foobar";
1005        let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {msg}"#);
1006        let event = event_from_bytes(
1007            "host",
1008            Some(Bytes::from("192.168.0.254")),
1009            raw.into(),
1010            LogNamespace::Legacy,
1011        )
1012        .unwrap();
1013
1014        let mut expected = Event::Log(LogEvent::from(msg));
1015        {
1016            let value = event.as_log().get("timestamp").unwrap();
1017            let year = value.as_timestamp().unwrap().naive_local().year();
1018
1019            let expected = expected.as_mut_log();
1020            let expected_date: DateTime<Utc> = Local
1021                .with_ymd_and_hms(year, 2, 13, 20, 7, 26)
1022                .single()
1023                .expect("invalid timestamp")
1024                .into();
1025
1026            expected.insert(
1027                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1028                expected_date,
1029            );
1030            expected.insert(
1031                log_schema().host_key().unwrap().to_string().as_str(),
1032                "74794bfb6795",
1033            );
1034            expected.insert(
1035                log_schema().source_type_key_target_path().unwrap(),
1036                "syslog",
1037            );
1038            expected.insert("hostname", "74794bfb6795");
1039            expected.insert("severity", "notice");
1040            expected.insert("facility", "user");
1041            expected.insert("appname", "root");
1042            expected.insert("procid", 8539);
1043            expected.insert("source_ip", "192.168.0.254");
1044        }
1045
1046        assert_event_data_eq!(event, expected);
1047    }
1048
1049    #[test]
1050    fn rsyslog_omfwd_tcp_default() {
1051        let msg = "start";
1052        let raw = format!(
1053            r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog:  [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {msg}"#
1054        );
1055        let event = event_from_bytes(
1056            "host",
1057            Some(Bytes::from("192.168.0.254")),
1058            raw.into(),
1059            LogNamespace::Legacy,
1060        )
1061        .unwrap();
1062
1063        let mut expected = Event::Log(LogEvent::from(msg));
1064        {
1065            let value = event.as_log().get("timestamp").unwrap();
1066            let year = value.as_timestamp().unwrap().naive_local().year();
1067
1068            let expected = expected.as_mut_log();
1069            let expected_date: DateTime<Utc> = Local
1070                .with_ymd_and_hms(year, 2, 13, 21, 31, 56)
1071                .single()
1072                .expect("invalid timestamp")
1073                .into();
1074            expected.insert(
1075                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1076                expected_date,
1077            );
1078            expected.insert(
1079                log_schema().source_type_key_target_path().unwrap(),
1080                "syslog",
1081            );
1082            expected.insert("host", "74794bfb6795");
1083            expected.insert("hostname", "74794bfb6795");
1084            expected.insert("severity", "info");
1085            expected.insert("facility", "local7");
1086            expected.insert("appname", "liblogging-stdlog");
1087            expected.insert("origin.software", "rsyslogd");
1088            expected.insert("origin.swVersion", "8.24.0");
1089            expected.insert("source_ip", "192.168.0.254");
1090            expected.insert(event_path!("origin", "x-pid"), "8979");
1091            expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1092        }
1093
1094        assert_event_data_eq!(event, expected);
1095    }
1096
1097    #[test]
1098    fn rsyslog_omfwd_tcp_forward_format() {
1099        let msg = "start";
1100        let raw = format!(
1101            r#"<190>2019-02-13T21:53:30.605850+00:00 74794bfb6795 liblogging-stdlog:  [origin software="rsyslogd" swVersion="8.24.0" x-pid="9043" x-info="http://www.rsyslog.com"] {msg}"#
1102        );
1103
1104        let mut expected = Event::Log(LogEvent::from(msg));
1105        {
1106            let expected = expected.as_mut_log();
1107            expected.insert(
1108                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1109                Utc.with_ymd_and_hms(2019, 2, 13, 21, 53, 30)
1110                    .single()
1111                    .and_then(|t| t.with_nanosecond(605_850 * 1000))
1112                    .expect("invalid timestamp"),
1113            );
1114            expected.insert(
1115                log_schema().source_type_key_target_path().unwrap(),
1116                "syslog",
1117            );
1118            expected.insert("host", "74794bfb6795");
1119            expected.insert("hostname", "74794bfb6795");
1120            expected.insert("severity", "info");
1121            expected.insert("facility", "local7");
1122            expected.insert("appname", "liblogging-stdlog");
1123            expected.insert("origin.software", "rsyslogd");
1124            expected.insert("origin.swVersion", "8.24.0");
1125            expected.insert(event_path!("origin", "x-pid"), "9043");
1126            expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1127        }
1128
1129        assert_event_data_eq!(
1130            event_from_bytes("host", None, raw.into(), LogNamespace::Legacy).unwrap(),
1131            expected
1132        );
1133    }
1134
1135    #[tokio::test]
1136    async fn test_tcp_syslog() {
1137        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1138            let num_messages: usize = 10000;
1139            let (_guard, in_addr) = next_addr();
1140
1141            // Create and spawn the source.
1142            let config = SyslogConfig::from_mode(Mode::Tcp {
1143                address: in_addr.into(),
1144                permit_origin: None,
1145                keepalive: None,
1146                tls: None,
1147                receive_buffer_bytes: None,
1148                connection_limit: None,
1149            });
1150
1151            let key = ComponentKey::from("in");
1152            let (tx, rx) = SourceSender::new_test();
1153            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1154            let shutdown_complete = shutdown.shutdown_tripwire();
1155
1156            let source = config
1157                .build(context)
1158                .await
1159                .expect("source should not fail to build");
1160            tokio::spawn(source);
1161
1162            // Wait for source to become ready to accept traffic.
1163            wait_for_tcp(in_addr).await;
1164
1165            let output_events = CountReceiver::receive_events(rx);
1166
1167            // Now craft and send syslog messages to the source, and collect them on the other side.
1168            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1169                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1170                .collect();
1171
1172            let input_lines: Vec<String> =
1173                input_messages.iter().map(|msg| msg.to_string()).collect();
1174
1175            send_lines(in_addr, input_lines).await.unwrap();
1176
1177            // Wait a short period of time to ensure the messages get sent.
1178            sleep(Duration::from_secs(2)).await;
1179
1180            // Shutdown the source, and make sure we've got all the messages we sent in.
1181            shutdown
1182                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1183                .await;
1184            shutdown_complete.await;
1185
1186            let output_events = output_events.await;
1187            assert_eq!(output_events.len(), num_messages);
1188
1189            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1190                .into_iter()
1191                .map(|mut e| {
1192                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1193                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1194                    e.into()
1195                })
1196                .collect();
1197            assert_eq!(output_messages, input_messages);
1198        })
1199        .await;
1200    }
1201
1202    #[tokio::test]
1203    async fn test_udp_syslog() {
1204        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1205            let num_messages: usize = 1000;
1206            let (_guard, in_addr) = next_addr();
1207
1208            // Create and spawn the source.
1209            let config = SyslogConfig::from_mode(Mode::Udp {
1210                address: in_addr.into(),
1211                receive_buffer_bytes: None,
1212            });
1213
1214            let key = ComponentKey::from("in");
1215            let (tx, rx) = SourceSender::new_test();
1216            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1217            let shutdown_complete = shutdown.shutdown_tripwire();
1218
1219            let source = config
1220                .build(context)
1221                .await
1222                .expect("source should not fail to build");
1223            tokio::spawn(source);
1224
1225            // Give UDP a brief moment to start listening.
1226            sleep(Duration::from_millis(150)).await;
1227
1228            let output_events = CountReceiver::receive_events(rx);
1229
1230            // Craft and send syslog messages as individual UDP datagrams.
1231            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1232                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1233                .collect();
1234
1235            let input_lines: Vec<String> =
1236                input_messages.iter().map(|msg| msg.to_string()).collect();
1237
1238            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
1239            for line in input_lines {
1240                socket.send_to(line.as_bytes(), in_addr).await.unwrap();
1241            }
1242
1243            // Wait a short period of time to ensure the messages get sent.
1244            sleep(Duration::from_secs(2)).await;
1245
1246            // Shutdown the source, and make sure we've got all the messages we sent in.
1247            shutdown
1248                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1249                .await;
1250            shutdown_complete.await;
1251
1252            let output_events = output_events.await;
1253            assert_eq!(output_events.len(), num_messages);
1254
1255            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1256                .into_iter()
1257                .map(|mut e| {
1258                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1259                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1260                    e.into()
1261                })
1262                .collect();
1263            assert_eq!(output_messages, input_messages);
1264        })
1265        .await;
1266    }
1267
1268    #[cfg(unix)]
1269    #[tokio::test]
1270    async fn test_unix_stream_syslog() {
1271        use std::os::unix::net::UnixStream as StdUnixStream;
1272
1273        use futures_util::{SinkExt, stream};
1274        use tokio::{io::AsyncWriteExt, net::UnixStream};
1275        use tokio_util::codec::{FramedWrite, LinesCodec};
1276
1277        use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
1278
1279        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1280            let num_messages: usize = 1;
1281            let in_path = tempfile::tempdir().unwrap().keep().join("stream_test");
1282
1283            // Create and spawn the source.
1284            let config = SyslogConfig::from_mode(Mode::Unix {
1285                path: in_path.clone(),
1286                socket_file_mode: None,
1287            });
1288
1289            let key = ComponentKey::from("in");
1290            let (tx, rx) = SourceSender::new_test();
1291            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1292            let shutdown_complete = shutdown.shutdown_tripwire();
1293
1294            let source = config
1295                .build(context)
1296                .await
1297                .expect("source should not fail to build");
1298            tokio::spawn(source);
1299
1300            // Wait for source to become ready to accept traffic.
1301            while StdUnixStream::connect(&in_path).is_err() {
1302                tokio::task::yield_now().await;
1303            }
1304
1305            let output_events = CountReceiver::receive_events(rx);
1306
1307            // Now craft and send syslog messages to the source, and collect them on the other side.
1308            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1309                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1310                .collect();
1311
1312            let stream = UnixStream::connect(&in_path).await.unwrap();
1313            let mut sink = FramedWrite::new(stream, LinesCodec::new());
1314
1315            let lines: Vec<String> = input_messages.iter().map(|msg| msg.to_string()).collect();
1316            let mut lines = stream::iter(lines).map(Ok);
1317            sink.send_all(&mut lines).await.unwrap();
1318
1319            let stream = sink.get_mut();
1320            stream.shutdown().await.unwrap();
1321
1322            // Wait a short period of time to ensure the messages get sent.
1323            sleep(Duration::from_secs(1)).await;
1324
1325            shutdown
1326                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1327                .await;
1328            shutdown_complete.await;
1329
1330            let output_events = output_events.await;
1331            assert_eq!(output_events.len(), num_messages);
1332
1333            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1334                .into_iter()
1335                .map(|mut e| {
1336                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1337                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1338                    e.into()
1339                })
1340                .collect();
1341            assert_eq!(output_messages, input_messages);
1342        })
1343        .await;
1344    }
1345
1346    #[tokio::test]
1347    async fn test_octet_counting_syslog() {
1348        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1349            let num_messages: usize = 10000;
1350            let (_guard, in_addr) = next_addr();
1351
1352            // Create and spawn the source.
1353            let config = SyslogConfig::from_mode(Mode::Tcp {
1354                address: in_addr.into(),
1355                permit_origin: None,
1356                keepalive: None,
1357                tls: None,
1358                receive_buffer_bytes: None,
1359                connection_limit: None,
1360            });
1361
1362            let key = ComponentKey::from("in");
1363            let (tx, rx) = SourceSender::new_test();
1364            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1365            let shutdown_complete = shutdown.shutdown_tripwire();
1366
1367            let source = config
1368                .build(context)
1369                .await
1370                .expect("source should not fail to build");
1371            tokio::spawn(source);
1372
1373            // Wait for source to become ready to accept traffic.
1374            wait_for_tcp(in_addr).await;
1375
1376            let output_events = CountReceiver::receive_events(rx);
1377
1378            // Now craft and send syslog messages to the source, and collect them on the other side.
1379            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1380                .map(|i| {
1381                    let mut msg = SyslogMessageRfc5424::random(i, 30, 4, 3, 3);
1382                    msg.message.push('\n');
1383                    msg.message.push_str(&random_string(30));
1384                    msg
1385                })
1386                .collect();
1387
1388            let codec = BytesCodec::new();
1389            let input_lines: Vec<Bytes> = input_messages
1390                .iter()
1391                .map(|msg| {
1392                    let s = msg.to_string();
1393                    format!("{} {}", s.len(), s).into()
1394                })
1395                .collect();
1396
1397            send_encodable(in_addr, codec, input_lines).await.unwrap();
1398
1399            // Wait a short period of time to ensure the messages get sent.
1400            sleep(Duration::from_secs(2)).await;
1401
1402            // Shutdown the source, and make sure we've got all the messages we sent in.
1403            shutdown
1404                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1405                .await;
1406            shutdown_complete.await;
1407
1408            let output_events = output_events.await;
1409            assert_eq!(output_events.len(), num_messages);
1410
1411            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1412                .into_iter()
1413                .map(|mut e| {
1414                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1415                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1416                    e.into()
1417                })
1418                .collect();
1419            assert_eq!(output_messages, input_messages);
1420        })
1421        .await;
1422    }
1423
1424    #[derive(Deserialize, PartialEq, Clone, Debug)]
1425    struct SyslogMessageRfc5424 {
1426        msgid: String,
1427        severity: Severity,
1428        facility: Facility,
1429        version: u8,
1430        timestamp: String,
1431        host: String,
1432        source_type: String,
1433        appname: String,
1434        procid: usize,
1435        message: String,
1436        #[serde(flatten)]
1437        structured_data: StructuredData,
1438    }
1439
1440    impl SyslogMessageRfc5424 {
1441        fn random(
1442            id: usize,
1443            msg_len: usize,
1444            field_len: usize,
1445            max_map_size: usize,
1446            max_children: usize,
1447        ) -> Self {
1448            let msg = random_string(msg_len);
1449            let structured_data = random_structured_data(max_map_size, max_children, field_len);
1450
1451            let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
1452            //"secfrac" can contain up to 6 digits, but TCP sinks uses `AutoSi`
1453
1454            Self {
1455                msgid: format!("test{id}"),
1456                severity: Severity::LOG_INFO,
1457                facility: Facility::LOG_USER,
1458                version: 1,
1459                timestamp,
1460                host: "hogwarts".to_owned(),
1461                source_type: "syslog".to_owned(),
1462                appname: "harry".to_owned(),
1463                procid: rng().random_range(0..32768),
1464                structured_data,
1465                message: msg,
1466            }
1467        }
1468    }
1469
1470    impl fmt::Display for SyslogMessageRfc5424 {
1471        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1472            write!(
1473                f,
1474                "<{}>{} {} {} {} {} {} {} {}",
1475                encode_priority(self.severity, self.facility),
1476                self.version,
1477                self.timestamp,
1478                self.host,
1479                self.appname,
1480                self.procid,
1481                self.msgid,
1482                format_structured_data_rfc5424(&self.structured_data),
1483                self.message
1484            )
1485        }
1486    }
1487
1488    impl From<Event> for SyslogMessageRfc5424 {
1489        fn from(e: Event) -> Self {
1490            let (value, _) = e.into_log().into_parts();
1491            let mut fields = value.into_object().unwrap();
1492
1493            Self {
1494                msgid: fields.remove("msgid").map(value_to_string).unwrap(),
1495                severity: fields
1496                    .remove("severity")
1497                    .map(value_to_string)
1498                    .and_then(|s| Severity::from_str(s.as_str()))
1499                    .unwrap(),
1500                facility: fields
1501                    .remove("facility")
1502                    .map(value_to_string)
1503                    .and_then(|s| Facility::from_str(s.as_str()))
1504                    .unwrap(),
1505                version: fields
1506                    .remove("version")
1507                    .map(value_to_string)
1508                    .map(|s| u8::from_str(s.as_str()).unwrap())
1509                    .unwrap(),
1510                timestamp: fields.remove("timestamp").map(value_to_string).unwrap(),
1511                host: fields.remove("host").map(value_to_string).unwrap(),
1512                source_type: fields.remove("source_type").map(value_to_string).unwrap(),
1513                appname: fields.remove("appname").map(value_to_string).unwrap(),
1514                procid: fields
1515                    .remove("procid")
1516                    .map(value_to_string)
1517                    .map(|s| usize::from_str(s.as_str()).unwrap())
1518                    .unwrap(),
1519                message: fields.remove("message").map(value_to_string).unwrap(),
1520                structured_data: structured_data_from_fields(fields),
1521            }
1522        }
1523    }
1524
1525    fn structured_data_from_fields(fields: ObjectMap) -> StructuredData {
1526        let mut structured_data = StructuredData::default();
1527
1528        for (key, value) in fields.into_iter() {
1529            let subfields = value
1530                .into_object()
1531                .unwrap()
1532                .into_iter()
1533                .map(|(k, v)| (k.into(), value_to_string(v)))
1534                .collect();
1535
1536            structured_data.insert(key.into(), subfields);
1537        }
1538
1539        structured_data
1540    }
1541
1542    #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1543    #[derive(Copy, Clone, Deserialize, PartialEq, Eq, Debug)]
1544    pub enum Severity {
1545        #[serde(rename(deserialize = "emergency"))]
1546        LOG_EMERG,
1547        #[serde(rename(deserialize = "alert"))]
1548        LOG_ALERT,
1549        #[serde(rename(deserialize = "critical"))]
1550        LOG_CRIT,
1551        #[serde(rename(deserialize = "error"))]
1552        LOG_ERR,
1553        #[serde(rename(deserialize = "warn"))]
1554        LOG_WARNING,
1555        #[serde(rename(deserialize = "notice"))]
1556        LOG_NOTICE,
1557        #[serde(rename(deserialize = "info"))]
1558        LOG_INFO,
1559        #[serde(rename(deserialize = "debug"))]
1560        LOG_DEBUG,
1561    }
1562
1563    impl Severity {
1564        fn from_str(s: &str) -> Option<Self> {
1565            match s {
1566                "emergency" => Some(Self::LOG_EMERG),
1567                "alert" => Some(Self::LOG_ALERT),
1568                "critical" => Some(Self::LOG_CRIT),
1569                "error" => Some(Self::LOG_ERR),
1570                "warn" => Some(Self::LOG_WARNING),
1571                "notice" => Some(Self::LOG_NOTICE),
1572                "info" => Some(Self::LOG_INFO),
1573                "debug" => Some(Self::LOG_DEBUG),
1574
1575                x => {
1576                    #[allow(clippy::print_stdout)]
1577                    {
1578                        println!("converting severity str, got {x}");
1579                    }
1580                    None
1581                }
1582            }
1583        }
1584    }
1585
1586    #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1587    #[derive(Copy, Clone, PartialEq, Eq, Deserialize, Debug)]
1588    pub enum Facility {
1589        #[serde(rename(deserialize = "kernel"))]
1590        LOG_KERN = 0 << 3,
1591        #[serde(rename(deserialize = "user"))]
1592        LOG_USER = 1 << 3,
1593        #[serde(rename(deserialize = "mail"))]
1594        LOG_MAIL = 2 << 3,
1595        #[serde(rename(deserialize = "daemon"))]
1596        LOG_DAEMON = 3 << 3,
1597        #[serde(rename(deserialize = "auth"))]
1598        LOG_AUTH = 4 << 3,
1599        #[serde(rename(deserialize = "syslog"))]
1600        LOG_SYSLOG = 5 << 3,
1601    }
1602
1603    impl Facility {
1604        fn from_str(s: &str) -> Option<Self> {
1605            match s {
1606                "kernel" => Some(Self::LOG_KERN),
1607                "user" => Some(Self::LOG_USER),
1608                "mail" => Some(Self::LOG_MAIL),
1609                "daemon" => Some(Self::LOG_DAEMON),
1610                "auth" => Some(Self::LOG_AUTH),
1611                "syslog" => Some(Self::LOG_SYSLOG),
1612                _ => None,
1613            }
1614        }
1615    }
1616
1617    type StructuredData = HashMap<String, HashMap<String, String>>;
1618
1619    fn random_structured_data(
1620        max_map_size: usize,
1621        max_children: usize,
1622        field_len: usize,
1623    ) -> StructuredData {
1624        let amount = rng().random_range(0..max_children);
1625
1626        random_maps(max_map_size, field_len)
1627            .filter(|m| !m.is_empty()) //syslog_rfc5424 ignores empty maps, tested separately
1628            .take(amount)
1629            .enumerate()
1630            .map(|(i, map)| (format!("id{i}"), map))
1631            .collect()
1632    }
1633
1634    fn format_structured_data_rfc5424(data: &StructuredData) -> String {
1635        if data.is_empty() {
1636            "-".to_string()
1637        } else {
1638            let mut res = String::new();
1639            for (id, params) in data {
1640                res = res + "[" + id;
1641                for (name, value) in params {
1642                    res = res + " " + name + "=\"" + value + "\"";
1643                }
1644                res += "]";
1645            }
1646
1647            res
1648        }
1649    }
1650
1651    const fn encode_priority(severity: Severity, facility: Facility) -> u8 {
1652        facility as u8 | severity as u8
1653    }
1654
1655    fn value_to_string(v: Value) -> String {
1656        if v.is_bytes() {
1657            let buf = v.as_bytes().unwrap();
1658            String::from_utf8_lossy(buf).to_string()
1659        } else if v.is_timestamp() {
1660            let ts = v.as_timestamp().unwrap();
1661            ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1662        } else {
1663            v.to_string()
1664        }
1665    }
1666}