1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use chrono::Utc;
7use futures::StreamExt;
8use listenfd::ListenFd;
9use smallvec::SmallVec;
10use tokio_util::udp::UdpFramed;
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::{
14 BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
15 decoding::{Deserializer, Framer},
16 },
17 config::{LegacyKey, LogNamespace},
18 configurable::configurable_component,
19 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
20 ipallowlist::IpAllowlistConfig,
21 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, path},
22};
23use vrl::event_path;
24
25#[cfg(unix)]
26use crate::sources::util::build_unix_stream_source;
27use crate::{
28 SourceSender,
29 codecs::Decoder,
30 config::{
31 DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput, log_schema,
32 },
33 event::Event,
34 internal_events::{
35 SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
36 },
37 net,
38 shutdown::ShutdownSignal,
39 sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket},
40 tcp::TcpKeepaliveConfig,
41 tls::{MaybeTlsSettings, TlsSourceConfig},
42};
43
44#[configurable_component(source("syslog", "Collect logs sent via Syslog."))]
46#[derive(Clone, Debug)]
47pub struct SyslogConfig {
48 #[serde(flatten)]
49 mode: Mode,
50
51 #[serde(default = "crate::serde::default_max_length")]
55 #[configurable(metadata(docs::type_unit = "bytes"))]
56 max_length: usize,
57
58 host_key: Option<OptionalValuePath>,
67
68 #[configurable(metadata(docs::hidden))]
70 #[serde(default)]
71 pub log_namespace: Option<bool>,
72}
73
74#[configurable_component]
76#[derive(Clone, Debug)]
77#[serde(tag = "mode", rename_all = "snake_case")]
78#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
79#[allow(clippy::large_enum_variant)]
80pub enum Mode {
81 Tcp {
83 #[configurable(derived)]
84 address: SocketListenAddr,
85
86 #[configurable(derived)]
87 keepalive: Option<TcpKeepaliveConfig>,
88
89 #[configurable(derived)]
90 permit_origin: Option<IpAllowlistConfig>,
91
92 #[configurable(derived)]
93 tls: Option<TlsSourceConfig>,
94
95 #[configurable(metadata(docs::type_unit = "bytes"))]
99 receive_buffer_bytes: Option<usize>,
100
101 connection_limit: Option<u32>,
103 },
104
105 Udp {
107 #[configurable(derived)]
108 address: SocketListenAddr,
109
110 #[configurable(metadata(docs::type_unit = "bytes"))]
114 receive_buffer_bytes: Option<usize>,
115 },
116
117 #[cfg(unix)]
121 Unix {
122 #[configurable(metadata(docs::examples = "/path/to/socket"))]
126 path: PathBuf,
127
128 socket_file_mode: Option<u32>,
133 },
134}
135
136impl SyslogConfig {
137 #[cfg(test)]
138 pub fn from_mode(mode: Mode) -> Self {
139 Self {
140 mode,
141 host_key: None,
142 max_length: crate::serde::default_max_length(),
143 log_namespace: None,
144 }
145 }
146}
147
148impl Default for SyslogConfig {
149 fn default() -> Self {
150 Self {
151 mode: Mode::Tcp {
152 address: SocketListenAddr::SocketAddr("0.0.0.0:514".parse().unwrap()),
153 keepalive: None,
154 permit_origin: None,
155 tls: None,
156 receive_buffer_bytes: None,
157 connection_limit: None,
158 },
159 host_key: None,
160 max_length: crate::serde::default_max_length(),
161 log_namespace: None,
162 }
163 }
164}
165
166impl GenerateConfig for SyslogConfig {
167 fn generate_config() -> toml::Value {
168 toml::Value::try_from(SyslogConfig::default()).unwrap()
169 }
170}
171
172#[async_trait::async_trait]
173#[typetag::serde(name = "syslog")]
174impl SourceConfig for SyslogConfig {
175 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
176 let log_namespace = cx.log_namespace(self.log_namespace);
177 let host_key = self
178 .host_key
179 .clone()
180 .and_then(|k| k.path)
181 .or(log_schema().host_key().cloned());
182
183 match self.mode.clone() {
184 Mode::Tcp {
185 address,
186 keepalive,
187 permit_origin,
188 tls,
189 receive_buffer_bytes,
190 connection_limit,
191 } => {
192 let source = SyslogTcpSource {
193 max_length: self.max_length,
194 host_key,
195 log_namespace,
196 };
197 let shutdown_secs = Duration::from_secs(30);
198 let tls_config = tls.as_ref().map(|tls| tls.tls_config.clone());
199 let tls_client_metadata_key = tls
200 .as_ref()
201 .and_then(|tls| tls.client_metadata_key.clone())
202 .and_then(|k| k.path);
203 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
204 source.run(
205 address,
206 keepalive,
207 shutdown_secs,
208 tls,
209 tls_client_metadata_key,
210 receive_buffer_bytes,
211 None,
212 cx,
213 false.into(),
214 connection_limit,
215 permit_origin.map(Into::into),
216 SyslogConfig::NAME,
217 log_namespace,
218 )
219 }
220 Mode::Udp {
221 address,
222 receive_buffer_bytes,
223 } => Ok(udp(
224 address,
225 self.max_length,
226 host_key,
227 receive_buffer_bytes,
228 cx.shutdown,
229 log_namespace,
230 cx.out,
231 )),
232 #[cfg(unix)]
233 Mode::Unix {
234 path,
235 socket_file_mode,
236 } => {
237 let decoder = Decoder::new(
238 Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(
239 self.max_length,
240 )),
241 Deserializer::Syslog(
242 SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
243 ),
244 );
245
246 build_unix_stream_source(
247 path,
248 socket_file_mode,
249 decoder,
250 move |events, host| handle_events(events, &host_key, host, log_namespace),
251 cx.shutdown,
252 cx.out,
253 )
254 }
255 }
256 }
257
258 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
259 let log_namespace = global_log_namespace.merge(self.log_namespace);
260 let schema_definition = SyslogDeserializerConfig::from_source(SyslogConfig::NAME)
261 .schema_definition(log_namespace)
262 .with_standard_vector_source_metadata();
263
264 vec![SourceOutput::new_maybe_logs(
265 DataType::Log,
266 schema_definition,
267 )]
268 }
269
270 fn resources(&self) -> Vec<Resource> {
271 match self.mode.clone() {
272 Mode::Tcp { address, .. } => vec![address.as_tcp_resource()],
273 Mode::Udp { address, .. } => vec![address.as_udp_resource()],
274 #[cfg(unix)]
275 Mode::Unix { .. } => vec![],
276 }
277 }
278
279 fn can_acknowledge(&self) -> bool {
280 false
281 }
282}
283
284#[derive(Debug, Clone)]
285struct SyslogTcpSource {
286 max_length: usize,
287 host_key: Option<OwnedValuePath>,
288 log_namespace: LogNamespace,
289}
290
291impl TcpSource for SyslogTcpSource {
292 type Error = vector_lib::codecs::decoding::Error;
293 type Item = SmallVec<[Event; 1]>;
294 type Decoder = Decoder;
295 type Acker = TcpNullAcker;
296
297 fn decoder(&self) -> Self::Decoder {
298 Decoder::new(
299 Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(self.max_length)),
300 Deserializer::Syslog(SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build()),
301 )
302 }
303
304 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
305 handle_events(
306 events,
307 &self.host_key,
308 Some(host.ip().to_string().into()),
309 self.log_namespace,
310 );
311 }
312
313 fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
314 TcpNullAcker
315 }
316}
317
318pub fn udp(
319 addr: SocketListenAddr,
320 _max_length: usize,
321 host_key: Option<OwnedValuePath>,
322 receive_buffer_bytes: Option<usize>,
323 shutdown: ShutdownSignal,
324 log_namespace: LogNamespace,
325 mut out: SourceSender,
326) -> super::Source {
327 Box::pin(async move {
328 let listenfd = ListenFd::from_env();
329 let socket = try_bind_udp_socket(addr, listenfd).await.map_err(|error| {
330 emit!(SocketBindError {
331 mode: SocketMode::Udp,
332 error: &error,
333 })
334 })?;
335
336 if let Some(receive_buffer_bytes) = receive_buffer_bytes
337 && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
338 {
339 warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
340 }
341
342 info!(
343 message = "Listening.",
344 addr = %addr,
345 r#type = "udp"
346 );
347
348 let bytes_received = register!(BytesReceived::from(Protocol::UDP));
349
350 let mut stream = UdpFramed::new(
351 socket,
352 Decoder::new(
353 Framer::Bytes(BytesDecoder::new()),
354 Deserializer::Syslog(
355 SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
356 ),
357 ),
358 )
359 .take_until(shutdown)
360 .filter_map(|frame| {
361 let host_key = host_key.clone();
362 let bytes_received = bytes_received.clone();
363 async move {
364 match frame {
365 Ok(((mut events, byte_size), received_from)) => {
366 let count = events.len();
367 bytes_received.emit(ByteSize(byte_size));
368 emit!(SocketEventsReceived {
369 mode: SocketMode::Udp,
370 byte_size: events.estimated_json_encoded_size_of(),
371 count,
372 });
373 let received_from = received_from.ip().to_string().into();
374 handle_events(&mut events, &host_key, Some(received_from), log_namespace);
375 Some(events.remove(0))
376 }
377 Err(error) => {
378 emit!(SocketReceiveError {
379 mode: SocketMode::Udp,
380 error: &error,
381 });
382 None
383 }
384 }
385 }
386 })
387 .boxed();
388
389 match out.send_event_stream(&mut stream).await {
390 Ok(()) => {
391 debug!("Finished sending.");
392 Ok(())
393 }
394 Err(_) => {
395 let (count, _) = stream.size_hint();
396 emit!(StreamClosedError { count });
397 Err(())
398 }
399 }
400 })
401}
402
403fn handle_events(
404 events: &mut [Event],
405 host_key: &Option<OwnedValuePath>,
406 default_host: Option<Bytes>,
407 log_namespace: LogNamespace,
408) {
409 for event in events {
410 enrich_syslog_event(event, host_key, default_host.clone(), log_namespace);
411 }
412}
413
414fn enrich_syslog_event(
415 event: &mut Event,
416 host_key: &Option<OwnedValuePath>,
417 default_host: Option<Bytes>,
418 log_namespace: LogNamespace,
419) {
420 let log = event.as_mut_log();
421
422 if let Some(default_host) = &default_host {
423 log_namespace.insert_source_metadata(
424 SyslogConfig::NAME,
425 log,
426 Some(LegacyKey::Overwrite(path!("source_ip"))),
427 path!("source_ip"),
428 default_host.clone(),
429 );
430 }
431
432 let parsed_hostname = log
433 .get(event_path!("hostname"))
434 .map(|hostname| hostname.coerce_to_bytes());
435
436 if let Some(parsed_host) = parsed_hostname.or(default_host) {
437 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
438
439 log_namespace.insert_source_metadata(
440 SyslogConfig::NAME,
441 log,
442 legacy_host_key,
443 path!("host"),
444 parsed_host,
445 );
446 }
447
448 log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now());
449
450 if log_namespace == LogNamespace::Legacy {
451 let timestamp = log
452 .get(event_path!("timestamp"))
453 .and_then(|timestamp| timestamp.as_timestamp().cloned())
454 .unwrap_or_else(Utc::now);
455 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
456 }
457
458 trace!(
459 message = "Processing one event.",
460 event = ?event
461 );
462}
463
464#[cfg(test)]
465mod test {
466 use std::{collections::HashMap, fmt, str::FromStr};
467
468 use chrono::prelude::*;
469 use rand::{Rng, rng};
470 use serde::Deserialize;
471 use tokio::time::{Duration, Instant, sleep};
472 use tokio_util::codec::BytesCodec;
473 use vector_lib::{
474 assert_event_data_eq,
475 codecs::decoding::format::Deserializer,
476 config::ComponentKey,
477 lookup::{OwnedTargetPath, PathPrefix, event_path, owned_value_path},
478 schema::Definition,
479 };
480 use vrl::value::{Kind, ObjectMap, Value, kind::Collection};
481
482 use super::*;
483 use crate::{
484 config::log_schema,
485 event::{Event, LogEvent},
486 test_util::{
487 CountReceiver,
488 addr::next_addr,
489 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
490 random_maps, random_string, send_encodable, send_lines, wait_for_tcp,
491 },
492 };
493
494 fn event_from_bytes(
495 host_key: &str,
496 default_host: Option<Bytes>,
497 bytes: Bytes,
498 log_namespace: LogNamespace,
499 ) -> Option<Event> {
500 let parser = SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build();
501 let mut events = parser.parse(bytes, LogNamespace::Legacy).ok()?;
502 handle_events(
503 &mut events,
504 &Some(owned_value_path!(host_key)),
505 default_host,
506 log_namespace,
507 );
508 Some(events.remove(0))
509 }
510
511 #[test]
512 fn generate_config() {
513 crate::test_util::test_generate_config::<SyslogConfig>();
514 }
515
516 #[test]
517 fn output_schema_definition_vector_namespace() {
518 let config = SyslogConfig {
519 log_namespace: Some(true),
520 ..Default::default()
521 };
522
523 let definitions = config
524 .outputs(LogNamespace::Vector)
525 .remove(0)
526 .schema_definition(true);
527
528 let expected_definition =
529 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
530 .with_meaning(OwnedTargetPath::event_root(), "message")
531 .with_metadata_field(
532 &owned_value_path!("vector", "source_type"),
533 Kind::bytes(),
534 None,
535 )
536 .with_metadata_field(
537 &owned_value_path!("vector", "ingest_timestamp"),
538 Kind::timestamp(),
539 None,
540 )
541 .with_metadata_field(
542 &owned_value_path!("syslog", "timestamp"),
543 Kind::timestamp(),
544 Some("timestamp"),
545 )
546 .with_metadata_field(
547 &owned_value_path!("syslog", "hostname"),
548 Kind::bytes().or_undefined(),
549 Some("host"),
550 )
551 .with_metadata_field(
552 &owned_value_path!("syslog", "source_ip"),
553 Kind::bytes().or_undefined(),
554 None,
555 )
556 .with_metadata_field(
557 &owned_value_path!("syslog", "severity"),
558 Kind::bytes().or_undefined(),
559 Some("severity"),
560 )
561 .with_metadata_field(
562 &owned_value_path!("syslog", "facility"),
563 Kind::bytes().or_undefined(),
564 None,
565 )
566 .with_metadata_field(
567 &owned_value_path!("syslog", "version"),
568 Kind::integer().or_undefined(),
569 None,
570 )
571 .with_metadata_field(
572 &owned_value_path!("syslog", "appname"),
573 Kind::bytes().or_undefined(),
574 Some("service"),
575 )
576 .with_metadata_field(
577 &owned_value_path!("syslog", "msgid"),
578 Kind::bytes().or_undefined(),
579 None,
580 )
581 .with_metadata_field(
582 &owned_value_path!("syslog", "procid"),
583 Kind::integer().or_bytes().or_undefined(),
584 None,
585 )
586 .with_metadata_field(
587 &owned_value_path!("syslog", "structured_data"),
588 Kind::object(Collection::from_unknown(Kind::object(
589 Collection::from_unknown(Kind::bytes()),
590 ))),
591 None,
592 )
593 .with_metadata_field(
594 &owned_value_path!("syslog", "tls_client_metadata"),
595 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
596 None,
597 );
598
599 assert_eq!(definitions, Some(expected_definition));
600 }
601
602 #[test]
603 fn output_schema_definition_legacy_namespace() {
604 let config = SyslogConfig::default();
605
606 let definitions = config
607 .outputs(LogNamespace::Legacy)
608 .remove(0)
609 .schema_definition(true);
610
611 let expected_definition = Definition::new_with_default_metadata(
612 Kind::object(Collection::empty()),
613 [LogNamespace::Legacy],
614 )
615 .with_event_field(
616 &owned_value_path!("message"),
617 Kind::bytes(),
618 Some("message"),
619 )
620 .with_event_field(
621 &owned_value_path!("timestamp"),
622 Kind::timestamp(),
623 Some("timestamp"),
624 )
625 .with_event_field(
626 &owned_value_path!("hostname"),
627 Kind::bytes().or_undefined(),
628 Some("host"),
629 )
630 .with_event_field(
631 &owned_value_path!("source_ip"),
632 Kind::bytes().or_undefined(),
633 None,
634 )
635 .with_event_field(
636 &owned_value_path!("severity"),
637 Kind::bytes().or_undefined(),
638 Some("severity"),
639 )
640 .with_event_field(
641 &owned_value_path!("facility"),
642 Kind::bytes().or_undefined(),
643 None,
644 )
645 .with_event_field(
646 &owned_value_path!("version"),
647 Kind::integer().or_undefined(),
648 None,
649 )
650 .with_event_field(
651 &owned_value_path!("appname"),
652 Kind::bytes().or_undefined(),
653 Some("service"),
654 )
655 .with_event_field(
656 &owned_value_path!("msgid"),
657 Kind::bytes().or_undefined(),
658 None,
659 )
660 .with_event_field(
661 &owned_value_path!("procid"),
662 Kind::integer().or_bytes().or_undefined(),
663 None,
664 )
665 .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
666 .with_standard_vector_source_metadata();
667
668 assert_eq!(definitions, Some(expected_definition));
669 }
670
671 #[test]
672 fn config_tcp() {
673 let config: SyslogConfig = toml::from_str(
674 r#"
675 mode = "tcp"
676 address = "127.0.0.1:1235"
677 "#,
678 )
679 .unwrap();
680 assert!(matches!(config.mode, Mode::Tcp { .. }));
681 }
682
683 #[test]
684 fn config_tcp_with_receive_buffer_size() {
685 let config: SyslogConfig = toml::from_str(
686 r#"
687 mode = "tcp"
688 address = "127.0.0.1:1235"
689 receive_buffer_bytes = 256
690 "#,
691 )
692 .unwrap();
693
694 let receive_buffer_bytes = match config.mode {
695 Mode::Tcp {
696 receive_buffer_bytes,
697 ..
698 } => receive_buffer_bytes,
699 _ => panic!("expected Mode::Tcp"),
700 };
701
702 assert_eq!(receive_buffer_bytes, Some(256));
703 }
704
705 #[test]
706 fn config_tcp_keepalive_empty() {
707 let config: SyslogConfig = toml::from_str(
708 r#"
709 mode = "tcp"
710 address = "127.0.0.1:1235"
711 "#,
712 )
713 .unwrap();
714
715 let keepalive = match config.mode {
716 Mode::Tcp { keepalive, .. } => keepalive,
717 _ => panic!("expected Mode::Tcp"),
718 };
719
720 assert_eq!(keepalive, None);
721 }
722
723 #[test]
724 fn config_tcp_keepalive_full() {
725 let config: SyslogConfig = toml::from_str(
726 r#"
727 mode = "tcp"
728 address = "127.0.0.1:1235"
729 keepalive.time_secs = 7200
730 "#,
731 )
732 .unwrap();
733
734 let keepalive = match config.mode {
735 Mode::Tcp { keepalive, .. } => keepalive,
736 _ => panic!("expected Mode::Tcp"),
737 };
738
739 let keepalive = keepalive.expect("keepalive config not set");
740
741 assert_eq!(keepalive.time_secs, Some(7200));
742 }
743
744 #[test]
745 fn config_udp() {
746 let config: SyslogConfig = toml::from_str(
747 r#"
748 mode = "udp"
749 address = "127.0.0.1:1235"
750 max_length = 32187
751 "#,
752 )
753 .unwrap();
754 assert!(matches!(config.mode, Mode::Udp { .. }));
755 }
756
757 #[test]
758 fn config_udp_with_receive_buffer_size() {
759 let config: SyslogConfig = toml::from_str(
760 r#"
761 mode = "udp"
762 address = "127.0.0.1:1235"
763 max_length = 32187
764 receive_buffer_bytes = 256
765 "#,
766 )
767 .unwrap();
768
769 let receive_buffer_bytes = match config.mode {
770 Mode::Udp {
771 receive_buffer_bytes,
772 ..
773 } => receive_buffer_bytes,
774 _ => panic!("expected Mode::Udp"),
775 };
776
777 assert_eq!(receive_buffer_bytes, Some(256));
778 }
779
780 #[cfg(unix)]
781 #[test]
782 fn config_unix() {
783 let config: SyslogConfig = toml::from_str(
784 r#"
785 mode = "unix"
786 path = "127.0.0.1:1235"
787 "#,
788 )
789 .unwrap();
790 assert!(matches!(config.mode, Mode::Unix { .. }));
791 }
792
793 #[cfg(unix)]
794 #[test]
795 fn config_unix_permissions() {
796 let config: SyslogConfig = toml::from_str(
797 r#"
798 mode = "unix"
799 path = "127.0.0.1:1235"
800 socket_file_mode = 0o777
801 "#,
802 )
803 .unwrap();
804 let socket_file_mode = match config.mode {
805 Mode::Unix {
806 path: _,
807 socket_file_mode,
808 } => socket_file_mode,
809 _ => panic!("expected Mode::Unix"),
810 };
811
812 assert_eq!(socket_file_mode, Some(0o777));
813 }
814
815 #[test]
816 fn syslog_ng_network_syslog_protocol() {
817 let msg = "i am foobar";
819 let raw = format!(
820 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {}{} {}"#,
821 r#"[meta sequenceId="1" sysUpTime="37" language="EN"]"#,
822 r#"[origin ip="192.168.0.1" software="test"]"#,
823 msg
824 );
825
826 let mut expected = Event::Log(LogEvent::from(msg));
827
828 {
829 let expected = expected.as_mut_log();
830 expected.insert(
831 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
832 Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
833 .single()
834 .expect("invalid timestamp"),
835 );
836 expected.insert(
837 log_schema().source_type_key_target_path().unwrap(),
838 "syslog",
839 );
840 expected.insert("host", "74794bfb6795");
841 expected.insert("hostname", "74794bfb6795");
842
843 expected.insert("meta.sequenceId", "1");
844 expected.insert("meta.sysUpTime", "37");
845 expected.insert("meta.language", "EN");
846 expected.insert("origin.software", "test");
847 expected.insert("origin.ip", "192.168.0.1");
848
849 expected.insert("severity", "notice");
850 expected.insert("facility", "user");
851 expected.insert("version", 1);
852 expected.insert("appname", "root");
853 expected.insert("procid", 8449);
854 expected.insert("source_ip", "192.168.0.254");
855 }
856
857 assert_event_data_eq!(
858 event_from_bytes(
859 "host",
860 Some(Bytes::from("192.168.0.254")),
861 raw.into(),
862 LogNamespace::Legacy
863 )
864 .unwrap(),
865 expected
866 );
867 }
868
869 #[test]
870 fn handles_incorrect_sd_element() {
871 let msg = "qwerty";
872 let raw = format!(
873 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
874 r"[incorrect x]", msg
875 );
876
877 let mut expected = Event::Log(LogEvent::from(msg));
878 {
879 let expected = expected.as_mut_log();
880 expected.insert(
881 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
882 Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
883 .single()
884 .expect("invalid timestamp"),
885 );
886 expected.insert(
887 log_schema().host_key().unwrap().to_string().as_str(),
888 "74794bfb6795",
889 );
890 expected.insert("hostname", "74794bfb6795");
891 expected.insert(
892 log_schema().source_type_key_target_path().unwrap(),
893 "syslog",
894 );
895 expected.insert("severity", "notice");
896 expected.insert("facility", "user");
897 expected.insert("version", 1);
898 expected.insert("appname", "root");
899 expected.insert("procid", 8449);
900 expected.insert("source_ip", "192.168.0.254");
901 }
902
903 let event = event_from_bytes(
904 "host",
905 Some(Bytes::from("192.168.0.254")),
906 raw.into(),
907 LogNamespace::Legacy,
908 )
909 .unwrap();
910 assert_event_data_eq!(event, expected);
911
912 let raw = format!(
913 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
914 r"[incorrect x=]", msg
915 );
916
917 let event = event_from_bytes(
918 "host",
919 Some(Bytes::from("192.168.0.254")),
920 raw.into(),
921 LogNamespace::Legacy,
922 )
923 .unwrap();
924 assert_event_data_eq!(event, expected);
925 }
926
927 #[test]
928 fn handles_empty_sd_element() {
929 fn there_is_map_called_empty(event: Event) -> bool {
930 event
931 .as_log()
932 .get("empty")
933 .expect("empty exists")
934 .is_object()
935 }
936
937 let msg = format!(
938 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
939 r"[empty]"
940 );
941
942 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
943 assert!(there_is_map_called_empty(event));
944
945 let msg = format!(
946 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
947 r#"[non_empty x="1"][empty]"#
948 );
949
950 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
951 assert!(there_is_map_called_empty(event));
952
953 let msg = format!(
954 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
955 r#"[empty][non_empty x="1"]"#
956 );
957
958 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
959 assert!(there_is_map_called_empty(event));
960
961 let msg = format!(
962 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
963 r#"[empty not_really="testing the test"]"#
964 );
965
966 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
967 assert!(there_is_map_called_empty(event));
968 }
969
970 #[test]
971 fn handles_weird_whitespace() {
972 let raw = r#"
974 <13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
975 "#;
976 let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#;
977
978 assert_event_data_eq!(
979 event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap(),
980 event_from_bytes(
981 "host",
982 None,
983 cleaned.to_owned().into(),
984 LogNamespace::Legacy
985 )
986 .unwrap()
987 );
988 }
989
990 #[test]
991 fn handles_dots_in_sdata() {
992 let raw =
993 r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin foo.bar="baz"] hello"#;
994 let event =
995 event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap();
996 assert_eq!(
997 event.as_log().get(r#"origin."foo.bar""#),
998 Some(&Value::from("baz"))
999 );
1000 }
1001
1002 #[test]
1003 fn syslog_ng_default_network() {
1004 let msg = "i am foobar";
1005 let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {msg}"#);
1006 let event = event_from_bytes(
1007 "host",
1008 Some(Bytes::from("192.168.0.254")),
1009 raw.into(),
1010 LogNamespace::Legacy,
1011 )
1012 .unwrap();
1013
1014 let mut expected = Event::Log(LogEvent::from(msg));
1015 {
1016 let value = event.as_log().get("timestamp").unwrap();
1017 let year = value.as_timestamp().unwrap().naive_local().year();
1018
1019 let expected = expected.as_mut_log();
1020 let expected_date: DateTime<Utc> = Local
1021 .with_ymd_and_hms(year, 2, 13, 20, 7, 26)
1022 .single()
1023 .expect("invalid timestamp")
1024 .into();
1025
1026 expected.insert(
1027 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1028 expected_date,
1029 );
1030 expected.insert(
1031 log_schema().host_key().unwrap().to_string().as_str(),
1032 "74794bfb6795",
1033 );
1034 expected.insert(
1035 log_schema().source_type_key_target_path().unwrap(),
1036 "syslog",
1037 );
1038 expected.insert("hostname", "74794bfb6795");
1039 expected.insert("severity", "notice");
1040 expected.insert("facility", "user");
1041 expected.insert("appname", "root");
1042 expected.insert("procid", 8539);
1043 expected.insert("source_ip", "192.168.0.254");
1044 }
1045
1046 assert_event_data_eq!(event, expected);
1047 }
1048
1049 #[test]
1050 fn rsyslog_omfwd_tcp_default() {
1051 let msg = "start";
1052 let raw = format!(
1053 r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {msg}"#
1054 );
1055 let event = event_from_bytes(
1056 "host",
1057 Some(Bytes::from("192.168.0.254")),
1058 raw.into(),
1059 LogNamespace::Legacy,
1060 )
1061 .unwrap();
1062
1063 let mut expected = Event::Log(LogEvent::from(msg));
1064 {
1065 let value = event.as_log().get("timestamp").unwrap();
1066 let year = value.as_timestamp().unwrap().naive_local().year();
1067
1068 let expected = expected.as_mut_log();
1069 let expected_date: DateTime<Utc> = Local
1070 .with_ymd_and_hms(year, 2, 13, 21, 31, 56)
1071 .single()
1072 .expect("invalid timestamp")
1073 .into();
1074 expected.insert(
1075 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1076 expected_date,
1077 );
1078 expected.insert(
1079 log_schema().source_type_key_target_path().unwrap(),
1080 "syslog",
1081 );
1082 expected.insert("host", "74794bfb6795");
1083 expected.insert("hostname", "74794bfb6795");
1084 expected.insert("severity", "info");
1085 expected.insert("facility", "local7");
1086 expected.insert("appname", "liblogging-stdlog");
1087 expected.insert("origin.software", "rsyslogd");
1088 expected.insert("origin.swVersion", "8.24.0");
1089 expected.insert("source_ip", "192.168.0.254");
1090 expected.insert(event_path!("origin", "x-pid"), "8979");
1091 expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1092 }
1093
1094 assert_event_data_eq!(event, expected);
1095 }
1096
1097 #[test]
1098 fn rsyslog_omfwd_tcp_forward_format() {
1099 let msg = "start";
1100 let raw = format!(
1101 r#"<190>2019-02-13T21:53:30.605850+00:00 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="9043" x-info="http://www.rsyslog.com"] {msg}"#
1102 );
1103
1104 let mut expected = Event::Log(LogEvent::from(msg));
1105 {
1106 let expected = expected.as_mut_log();
1107 expected.insert(
1108 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1109 Utc.with_ymd_and_hms(2019, 2, 13, 21, 53, 30)
1110 .single()
1111 .and_then(|t| t.with_nanosecond(605_850 * 1000))
1112 .expect("invalid timestamp"),
1113 );
1114 expected.insert(
1115 log_schema().source_type_key_target_path().unwrap(),
1116 "syslog",
1117 );
1118 expected.insert("host", "74794bfb6795");
1119 expected.insert("hostname", "74794bfb6795");
1120 expected.insert("severity", "info");
1121 expected.insert("facility", "local7");
1122 expected.insert("appname", "liblogging-stdlog");
1123 expected.insert("origin.software", "rsyslogd");
1124 expected.insert("origin.swVersion", "8.24.0");
1125 expected.insert(event_path!("origin", "x-pid"), "9043");
1126 expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1127 }
1128
1129 assert_event_data_eq!(
1130 event_from_bytes("host", None, raw.into(), LogNamespace::Legacy).unwrap(),
1131 expected
1132 );
1133 }
1134
1135 #[tokio::test]
1136 async fn test_tcp_syslog() {
1137 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1138 let num_messages: usize = 10000;
1139 let (_guard, in_addr) = next_addr();
1140
1141 let config = SyslogConfig::from_mode(Mode::Tcp {
1143 address: in_addr.into(),
1144 permit_origin: None,
1145 keepalive: None,
1146 tls: None,
1147 receive_buffer_bytes: None,
1148 connection_limit: None,
1149 });
1150
1151 let key = ComponentKey::from("in");
1152 let (tx, rx) = SourceSender::new_test();
1153 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1154 let shutdown_complete = shutdown.shutdown_tripwire();
1155
1156 let source = config
1157 .build(context)
1158 .await
1159 .expect("source should not fail to build");
1160 tokio::spawn(source);
1161
1162 wait_for_tcp(in_addr).await;
1164
1165 let output_events = CountReceiver::receive_events(rx);
1166
1167 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1169 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1170 .collect();
1171
1172 let input_lines: Vec<String> =
1173 input_messages.iter().map(|msg| msg.to_string()).collect();
1174
1175 send_lines(in_addr, input_lines).await.unwrap();
1176
1177 sleep(Duration::from_secs(2)).await;
1179
1180 shutdown
1182 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1183 .await;
1184 shutdown_complete.await;
1185
1186 let output_events = output_events.await;
1187 assert_eq!(output_events.len(), num_messages);
1188
1189 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1190 .into_iter()
1191 .map(|mut e| {
1192 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1195 })
1196 .collect();
1197 assert_eq!(output_messages, input_messages);
1198 })
1199 .await;
1200 }
1201
1202 #[tokio::test]
1203 async fn test_udp_syslog() {
1204 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1205 let num_messages: usize = 1000;
1206 let (_guard, in_addr) = next_addr();
1207
1208 let config = SyslogConfig::from_mode(Mode::Udp {
1210 address: in_addr.into(),
1211 receive_buffer_bytes: None,
1212 });
1213
1214 let key = ComponentKey::from("in");
1215 let (tx, rx) = SourceSender::new_test();
1216 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1217 let shutdown_complete = shutdown.shutdown_tripwire();
1218
1219 let source = config
1220 .build(context)
1221 .await
1222 .expect("source should not fail to build");
1223 tokio::spawn(source);
1224
1225 sleep(Duration::from_millis(150)).await;
1227
1228 let output_events = CountReceiver::receive_events(rx);
1229
1230 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1232 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1233 .collect();
1234
1235 let input_lines: Vec<String> =
1236 input_messages.iter().map(|msg| msg.to_string()).collect();
1237
1238 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
1239 for line in input_lines {
1240 socket.send_to(line.as_bytes(), in_addr).await.unwrap();
1241 }
1242
1243 sleep(Duration::from_secs(2)).await;
1245
1246 shutdown
1248 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1249 .await;
1250 shutdown_complete.await;
1251
1252 let output_events = output_events.await;
1253 assert_eq!(output_events.len(), num_messages);
1254
1255 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1256 .into_iter()
1257 .map(|mut e| {
1258 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1261 })
1262 .collect();
1263 assert_eq!(output_messages, input_messages);
1264 })
1265 .await;
1266 }
1267
1268 #[cfg(unix)]
1269 #[tokio::test]
1270 async fn test_unix_stream_syslog() {
1271 use std::os::unix::net::UnixStream as StdUnixStream;
1272
1273 use futures_util::{SinkExt, stream};
1274 use tokio::{io::AsyncWriteExt, net::UnixStream};
1275 use tokio_util::codec::{FramedWrite, LinesCodec};
1276
1277 use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
1278
1279 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1280 let num_messages: usize = 1;
1281 let in_path = tempfile::tempdir().unwrap().keep().join("stream_test");
1282
1283 let config = SyslogConfig::from_mode(Mode::Unix {
1285 path: in_path.clone(),
1286 socket_file_mode: None,
1287 });
1288
1289 let key = ComponentKey::from("in");
1290 let (tx, rx) = SourceSender::new_test();
1291 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1292 let shutdown_complete = shutdown.shutdown_tripwire();
1293
1294 let source = config
1295 .build(context)
1296 .await
1297 .expect("source should not fail to build");
1298 tokio::spawn(source);
1299
1300 while StdUnixStream::connect(&in_path).is_err() {
1302 tokio::task::yield_now().await;
1303 }
1304
1305 let output_events = CountReceiver::receive_events(rx);
1306
1307 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1309 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1310 .collect();
1311
1312 let stream = UnixStream::connect(&in_path).await.unwrap();
1313 let mut sink = FramedWrite::new(stream, LinesCodec::new());
1314
1315 let lines: Vec<String> = input_messages.iter().map(|msg| msg.to_string()).collect();
1316 let mut lines = stream::iter(lines).map(Ok);
1317 sink.send_all(&mut lines).await.unwrap();
1318
1319 let stream = sink.get_mut();
1320 stream.shutdown().await.unwrap();
1321
1322 sleep(Duration::from_secs(1)).await;
1324
1325 shutdown
1326 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1327 .await;
1328 shutdown_complete.await;
1329
1330 let output_events = output_events.await;
1331 assert_eq!(output_events.len(), num_messages);
1332
1333 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1334 .into_iter()
1335 .map(|mut e| {
1336 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1339 })
1340 .collect();
1341 assert_eq!(output_messages, input_messages);
1342 })
1343 .await;
1344 }
1345
1346 #[tokio::test]
1347 async fn test_octet_counting_syslog() {
1348 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1349 let num_messages: usize = 10000;
1350 let (_guard, in_addr) = next_addr();
1351
1352 let config = SyslogConfig::from_mode(Mode::Tcp {
1354 address: in_addr.into(),
1355 permit_origin: None,
1356 keepalive: None,
1357 tls: None,
1358 receive_buffer_bytes: None,
1359 connection_limit: None,
1360 });
1361
1362 let key = ComponentKey::from("in");
1363 let (tx, rx) = SourceSender::new_test();
1364 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1365 let shutdown_complete = shutdown.shutdown_tripwire();
1366
1367 let source = config
1368 .build(context)
1369 .await
1370 .expect("source should not fail to build");
1371 tokio::spawn(source);
1372
1373 wait_for_tcp(in_addr).await;
1375
1376 let output_events = CountReceiver::receive_events(rx);
1377
1378 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1380 .map(|i| {
1381 let mut msg = SyslogMessageRfc5424::random(i, 30, 4, 3, 3);
1382 msg.message.push('\n');
1383 msg.message.push_str(&random_string(30));
1384 msg
1385 })
1386 .collect();
1387
1388 let codec = BytesCodec::new();
1389 let input_lines: Vec<Bytes> = input_messages
1390 .iter()
1391 .map(|msg| {
1392 let s = msg.to_string();
1393 format!("{} {}", s.len(), s).into()
1394 })
1395 .collect();
1396
1397 send_encodable(in_addr, codec, input_lines).await.unwrap();
1398
1399 sleep(Duration::from_secs(2)).await;
1401
1402 shutdown
1404 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1405 .await;
1406 shutdown_complete.await;
1407
1408 let output_events = output_events.await;
1409 assert_eq!(output_events.len(), num_messages);
1410
1411 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1412 .into_iter()
1413 .map(|mut e| {
1414 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1417 })
1418 .collect();
1419 assert_eq!(output_messages, input_messages);
1420 })
1421 .await;
1422 }
1423
1424 #[derive(Deserialize, PartialEq, Clone, Debug)]
1425 struct SyslogMessageRfc5424 {
1426 msgid: String,
1427 severity: Severity,
1428 facility: Facility,
1429 version: u8,
1430 timestamp: String,
1431 host: String,
1432 source_type: String,
1433 appname: String,
1434 procid: usize,
1435 message: String,
1436 #[serde(flatten)]
1437 structured_data: StructuredData,
1438 }
1439
1440 impl SyslogMessageRfc5424 {
1441 fn random(
1442 id: usize,
1443 msg_len: usize,
1444 field_len: usize,
1445 max_map_size: usize,
1446 max_children: usize,
1447 ) -> Self {
1448 let msg = random_string(msg_len);
1449 let structured_data = random_structured_data(max_map_size, max_children, field_len);
1450
1451 let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
1452 Self {
1455 msgid: format!("test{id}"),
1456 severity: Severity::LOG_INFO,
1457 facility: Facility::LOG_USER,
1458 version: 1,
1459 timestamp,
1460 host: "hogwarts".to_owned(),
1461 source_type: "syslog".to_owned(),
1462 appname: "harry".to_owned(),
1463 procid: rng().random_range(0..32768),
1464 structured_data,
1465 message: msg,
1466 }
1467 }
1468 }
1469
1470 impl fmt::Display for SyslogMessageRfc5424 {
1471 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1472 write!(
1473 f,
1474 "<{}>{} {} {} {} {} {} {} {}",
1475 encode_priority(self.severity, self.facility),
1476 self.version,
1477 self.timestamp,
1478 self.host,
1479 self.appname,
1480 self.procid,
1481 self.msgid,
1482 format_structured_data_rfc5424(&self.structured_data),
1483 self.message
1484 )
1485 }
1486 }
1487
1488 impl From<Event> for SyslogMessageRfc5424 {
1489 fn from(e: Event) -> Self {
1490 let (value, _) = e.into_log().into_parts();
1491 let mut fields = value.into_object().unwrap();
1492
1493 Self {
1494 msgid: fields.remove("msgid").map(value_to_string).unwrap(),
1495 severity: fields
1496 .remove("severity")
1497 .map(value_to_string)
1498 .and_then(|s| Severity::from_str(s.as_str()))
1499 .unwrap(),
1500 facility: fields
1501 .remove("facility")
1502 .map(value_to_string)
1503 .and_then(|s| Facility::from_str(s.as_str()))
1504 .unwrap(),
1505 version: fields
1506 .remove("version")
1507 .map(value_to_string)
1508 .map(|s| u8::from_str(s.as_str()).unwrap())
1509 .unwrap(),
1510 timestamp: fields.remove("timestamp").map(value_to_string).unwrap(),
1511 host: fields.remove("host").map(value_to_string).unwrap(),
1512 source_type: fields.remove("source_type").map(value_to_string).unwrap(),
1513 appname: fields.remove("appname").map(value_to_string).unwrap(),
1514 procid: fields
1515 .remove("procid")
1516 .map(value_to_string)
1517 .map(|s| usize::from_str(s.as_str()).unwrap())
1518 .unwrap(),
1519 message: fields.remove("message").map(value_to_string).unwrap(),
1520 structured_data: structured_data_from_fields(fields),
1521 }
1522 }
1523 }
1524
1525 fn structured_data_from_fields(fields: ObjectMap) -> StructuredData {
1526 let mut structured_data = StructuredData::default();
1527
1528 for (key, value) in fields.into_iter() {
1529 let subfields = value
1530 .into_object()
1531 .unwrap()
1532 .into_iter()
1533 .map(|(k, v)| (k.into(), value_to_string(v)))
1534 .collect();
1535
1536 structured_data.insert(key.into(), subfields);
1537 }
1538
1539 structured_data
1540 }
1541
1542 #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1543 #[derive(Copy, Clone, Deserialize, PartialEq, Eq, Debug)]
1544 pub enum Severity {
1545 #[serde(rename(deserialize = "emergency"))]
1546 LOG_EMERG,
1547 #[serde(rename(deserialize = "alert"))]
1548 LOG_ALERT,
1549 #[serde(rename(deserialize = "critical"))]
1550 LOG_CRIT,
1551 #[serde(rename(deserialize = "error"))]
1552 LOG_ERR,
1553 #[serde(rename(deserialize = "warn"))]
1554 LOG_WARNING,
1555 #[serde(rename(deserialize = "notice"))]
1556 LOG_NOTICE,
1557 #[serde(rename(deserialize = "info"))]
1558 LOG_INFO,
1559 #[serde(rename(deserialize = "debug"))]
1560 LOG_DEBUG,
1561 }
1562
1563 impl Severity {
1564 fn from_str(s: &str) -> Option<Self> {
1565 match s {
1566 "emergency" => Some(Self::LOG_EMERG),
1567 "alert" => Some(Self::LOG_ALERT),
1568 "critical" => Some(Self::LOG_CRIT),
1569 "error" => Some(Self::LOG_ERR),
1570 "warn" => Some(Self::LOG_WARNING),
1571 "notice" => Some(Self::LOG_NOTICE),
1572 "info" => Some(Self::LOG_INFO),
1573 "debug" => Some(Self::LOG_DEBUG),
1574
1575 x => {
1576 #[allow(clippy::print_stdout)]
1577 {
1578 println!("converting severity str, got {x}");
1579 }
1580 None
1581 }
1582 }
1583 }
1584 }
1585
1586 #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1587 #[derive(Copy, Clone, PartialEq, Eq, Deserialize, Debug)]
1588 pub enum Facility {
1589 #[serde(rename(deserialize = "kernel"))]
1590 LOG_KERN = 0 << 3,
1591 #[serde(rename(deserialize = "user"))]
1592 LOG_USER = 1 << 3,
1593 #[serde(rename(deserialize = "mail"))]
1594 LOG_MAIL = 2 << 3,
1595 #[serde(rename(deserialize = "daemon"))]
1596 LOG_DAEMON = 3 << 3,
1597 #[serde(rename(deserialize = "auth"))]
1598 LOG_AUTH = 4 << 3,
1599 #[serde(rename(deserialize = "syslog"))]
1600 LOG_SYSLOG = 5 << 3,
1601 }
1602
1603 impl Facility {
1604 fn from_str(s: &str) -> Option<Self> {
1605 match s {
1606 "kernel" => Some(Self::LOG_KERN),
1607 "user" => Some(Self::LOG_USER),
1608 "mail" => Some(Self::LOG_MAIL),
1609 "daemon" => Some(Self::LOG_DAEMON),
1610 "auth" => Some(Self::LOG_AUTH),
1611 "syslog" => Some(Self::LOG_SYSLOG),
1612 _ => None,
1613 }
1614 }
1615 }
1616
1617 type StructuredData = HashMap<String, HashMap<String, String>>;
1618
1619 fn random_structured_data(
1620 max_map_size: usize,
1621 max_children: usize,
1622 field_len: usize,
1623 ) -> StructuredData {
1624 let amount = rng().random_range(0..max_children);
1625
1626 random_maps(max_map_size, field_len)
1627 .filter(|m| !m.is_empty()) .take(amount)
1629 .enumerate()
1630 .map(|(i, map)| (format!("id{i}"), map))
1631 .collect()
1632 }
1633
1634 fn format_structured_data_rfc5424(data: &StructuredData) -> String {
1635 if data.is_empty() {
1636 "-".to_string()
1637 } else {
1638 let mut res = String::new();
1639 for (id, params) in data {
1640 res = res + "[" + id;
1641 for (name, value) in params {
1642 res = res + " " + name + "=\"" + value + "\"";
1643 }
1644 res += "]";
1645 }
1646
1647 res
1648 }
1649 }
1650
1651 const fn encode_priority(severity: Severity, facility: Facility) -> u8 {
1652 facility as u8 | severity as u8
1653 }
1654
1655 fn value_to_string(v: Value) -> String {
1656 if v.is_bytes() {
1657 let buf = v.as_bytes().unwrap();
1658 String::from_utf8_lossy(buf).to_string()
1659 } else if v.is_timestamp() {
1660 let ts = v.as_timestamp().unwrap();
1661 ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1662 } else {
1663 v.to_string()
1664 }
1665 }
1666}