1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{net::SocketAddr, time::Duration};
4use vector_lib::ipallowlist::IpAllowlistConfig;
5
6use bytes::Bytes;
7use chrono::Utc;
8use futures::StreamExt;
9use listenfd::ListenFd;
10use smallvec::SmallVec;
11use tokio_util::udp::UdpFramed;
12use vector_lib::codecs::{
13 decoding::{Deserializer, Framer},
14 BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
15};
16use vector_lib::config::{LegacyKey, LogNamespace};
17use vector_lib::configurable::configurable_component;
18use vector_lib::lookup::{lookup_v2::OptionalValuePath, path, OwnedValuePath};
19use vrl::event_path;
20
21#[cfg(unix)]
22use crate::sources::util::build_unix_stream_source;
23use crate::{
24 codecs::Decoder,
25 config::{
26 log_schema, DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput,
27 },
28 event::Event,
29 internal_events::StreamClosedError,
30 internal_events::{SocketBindError, SocketMode, SocketReceiveError},
31 net,
32 shutdown::ShutdownSignal,
33 sources::util::net::{try_bind_udp_socket, SocketListenAddr, TcpNullAcker, TcpSource},
34 tcp::TcpKeepaliveConfig,
35 tls::{MaybeTlsSettings, TlsSourceConfig},
36 SourceSender,
37};
38
39#[configurable_component(source("syslog", "Collect logs sent via Syslog."))]
41#[derive(Clone, Debug)]
42pub struct SyslogConfig {
43 #[serde(flatten)]
44 mode: Mode,
45
46 #[serde(default = "crate::serde::default_max_length")]
50 #[configurable(metadata(docs::type_unit = "bytes"))]
51 max_length: usize,
52
53 host_key: Option<OptionalValuePath>,
62
63 #[configurable(metadata(docs::hidden))]
65 #[serde(default)]
66 pub log_namespace: Option<bool>,
67}
68
69#[configurable_component]
71#[derive(Clone, Debug)]
72#[serde(tag = "mode", rename_all = "snake_case")]
73#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
74#[allow(clippy::large_enum_variant)]
75pub enum Mode {
76 Tcp {
78 #[configurable(derived)]
79 address: SocketListenAddr,
80
81 #[configurable(derived)]
82 keepalive: Option<TcpKeepaliveConfig>,
83
84 #[configurable(derived)]
85 permit_origin: Option<IpAllowlistConfig>,
86
87 #[configurable(derived)]
88 tls: Option<TlsSourceConfig>,
89
90 #[configurable(metadata(docs::type_unit = "bytes"))]
94 receive_buffer_bytes: Option<usize>,
95
96 connection_limit: Option<u32>,
98 },
99
100 Udp {
102 #[configurable(derived)]
103 address: SocketListenAddr,
104
105 #[configurable(metadata(docs::type_unit = "bytes"))]
109 receive_buffer_bytes: Option<usize>,
110 },
111
112 #[cfg(unix)]
116 Unix {
117 #[configurable(metadata(docs::examples = "/path/to/socket"))]
121 path: PathBuf,
122
123 socket_file_mode: Option<u32>,
128 },
129}
130
131impl SyslogConfig {
132 #[cfg(test)]
133 pub fn from_mode(mode: Mode) -> Self {
134 Self {
135 mode,
136 host_key: None,
137 max_length: crate::serde::default_max_length(),
138 log_namespace: None,
139 }
140 }
141}
142
143impl Default for SyslogConfig {
144 fn default() -> Self {
145 Self {
146 mode: Mode::Tcp {
147 address: SocketListenAddr::SocketAddr("0.0.0.0:514".parse().unwrap()),
148 keepalive: None,
149 permit_origin: None,
150 tls: None,
151 receive_buffer_bytes: None,
152 connection_limit: None,
153 },
154 host_key: None,
155 max_length: crate::serde::default_max_length(),
156 log_namespace: None,
157 }
158 }
159}
160
161impl GenerateConfig for SyslogConfig {
162 fn generate_config() -> toml::Value {
163 toml::Value::try_from(SyslogConfig::default()).unwrap()
164 }
165}
166
167#[async_trait::async_trait]
168#[typetag::serde(name = "syslog")]
169impl SourceConfig for SyslogConfig {
170 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
171 let log_namespace = cx.log_namespace(self.log_namespace);
172 let host_key = self
173 .host_key
174 .clone()
175 .and_then(|k| k.path)
176 .or(log_schema().host_key().cloned());
177
178 match self.mode.clone() {
179 Mode::Tcp {
180 address,
181 keepalive,
182 permit_origin,
183 tls,
184 receive_buffer_bytes,
185 connection_limit,
186 } => {
187 let source = SyslogTcpSource {
188 max_length: self.max_length,
189 host_key,
190 log_namespace,
191 };
192 let shutdown_secs = Duration::from_secs(30);
193 let tls_config = tls.as_ref().map(|tls| tls.tls_config.clone());
194 let tls_client_metadata_key = tls
195 .as_ref()
196 .and_then(|tls| tls.client_metadata_key.clone())
197 .and_then(|k| k.path);
198 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
199 source.run(
200 address,
201 keepalive,
202 shutdown_secs,
203 tls,
204 tls_client_metadata_key,
205 receive_buffer_bytes,
206 None,
207 cx,
208 false.into(),
209 connection_limit,
210 permit_origin.map(Into::into),
211 SyslogConfig::NAME,
212 log_namespace,
213 )
214 }
215 Mode::Udp {
216 address,
217 receive_buffer_bytes,
218 } => Ok(udp(
219 address,
220 self.max_length,
221 host_key,
222 receive_buffer_bytes,
223 cx.shutdown,
224 log_namespace,
225 cx.out,
226 )),
227 #[cfg(unix)]
228 Mode::Unix {
229 path,
230 socket_file_mode,
231 } => {
232 let decoder = Decoder::new(
233 Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(
234 self.max_length,
235 )),
236 Deserializer::Syslog(
237 SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
238 ),
239 );
240
241 build_unix_stream_source(
242 path,
243 socket_file_mode,
244 decoder,
245 move |events, host| handle_events(events, &host_key, host, log_namespace),
246 cx.shutdown,
247 cx.out,
248 )
249 }
250 }
251 }
252
253 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
254 let log_namespace = global_log_namespace.merge(self.log_namespace);
255 let schema_definition = SyslogDeserializerConfig::from_source(SyslogConfig::NAME)
256 .schema_definition(log_namespace)
257 .with_standard_vector_source_metadata();
258
259 vec![SourceOutput::new_maybe_logs(
260 DataType::Log,
261 schema_definition,
262 )]
263 }
264
265 fn resources(&self) -> Vec<Resource> {
266 match self.mode.clone() {
267 Mode::Tcp { address, .. } => vec![address.as_tcp_resource()],
268 Mode::Udp { address, .. } => vec![address.as_udp_resource()],
269 #[cfg(unix)]
270 Mode::Unix { .. } => vec![],
271 }
272 }
273
274 fn can_acknowledge(&self) -> bool {
275 false
276 }
277}
278
279#[derive(Debug, Clone)]
280struct SyslogTcpSource {
281 max_length: usize,
282 host_key: Option<OwnedValuePath>,
283 log_namespace: LogNamespace,
284}
285
286impl TcpSource for SyslogTcpSource {
287 type Error = vector_lib::codecs::decoding::Error;
288 type Item = SmallVec<[Event; 1]>;
289 type Decoder = Decoder;
290 type Acker = TcpNullAcker;
291
292 fn decoder(&self) -> Self::Decoder {
293 Decoder::new(
294 Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(self.max_length)),
295 Deserializer::Syslog(SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build()),
296 )
297 }
298
299 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
300 handle_events(
301 events,
302 &self.host_key,
303 Some(host.ip().to_string().into()),
304 self.log_namespace,
305 );
306 }
307
308 fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
309 TcpNullAcker
310 }
311}
312
313pub fn udp(
314 addr: SocketListenAddr,
315 _max_length: usize,
316 host_key: Option<OwnedValuePath>,
317 receive_buffer_bytes: Option<usize>,
318 shutdown: ShutdownSignal,
319 log_namespace: LogNamespace,
320 mut out: SourceSender,
321) -> super::Source {
322 Box::pin(async move {
323 let listenfd = ListenFd::from_env();
324 let socket = try_bind_udp_socket(addr, listenfd).await.map_err(|error| {
325 emit!(SocketBindError {
326 mode: SocketMode::Udp,
327 error: &error,
328 })
329 })?;
330
331 if let Some(receive_buffer_bytes) = receive_buffer_bytes {
332 if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
333 warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
334 }
335 }
336
337 info!(
338 message = "Listening.",
339 addr = %addr,
340 r#type = "udp"
341 );
342
343 let mut stream = UdpFramed::new(
344 socket,
345 Decoder::new(
346 Framer::Bytes(BytesDecoder::new()),
347 Deserializer::Syslog(
348 SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
349 ),
350 ),
351 )
352 .take_until(shutdown)
353 .filter_map(|frame| {
354 let host_key = host_key.clone();
355 async move {
356 match frame {
357 Ok(((mut events, _byte_size), received_from)) => {
358 let received_from = received_from.ip().to_string().into();
359 handle_events(&mut events, &host_key, Some(received_from), log_namespace);
360 Some(events.remove(0))
361 }
362 Err(error) => {
363 emit!(SocketReceiveError {
364 mode: SocketMode::Udp,
365 error: &error,
366 });
367 None
368 }
369 }
370 }
371 })
372 .boxed();
373
374 match out.send_event_stream(&mut stream).await {
375 Ok(()) => {
376 debug!("Finished sending.");
377 Ok(())
378 }
379 Err(_) => {
380 let (count, _) = stream.size_hint();
381 emit!(StreamClosedError { count });
382 Err(())
383 }
384 }
385 })
386}
387
388fn handle_events(
389 events: &mut [Event],
390 host_key: &Option<OwnedValuePath>,
391 default_host: Option<Bytes>,
392 log_namespace: LogNamespace,
393) {
394 for event in events {
395 enrich_syslog_event(event, host_key, default_host.clone(), log_namespace);
396 }
397}
398
399fn enrich_syslog_event(
400 event: &mut Event,
401 host_key: &Option<OwnedValuePath>,
402 default_host: Option<Bytes>,
403 log_namespace: LogNamespace,
404) {
405 let log = event.as_mut_log();
406
407 if let Some(default_host) = &default_host {
408 log_namespace.insert_source_metadata(
409 SyslogConfig::NAME,
410 log,
411 Some(LegacyKey::Overwrite(path!("source_ip"))),
412 path!("source_ip"),
413 default_host.clone(),
414 );
415 }
416
417 let parsed_hostname = log
418 .get(event_path!("hostname"))
419 .map(|hostname| hostname.coerce_to_bytes());
420
421 if let Some(parsed_host) = parsed_hostname.or(default_host) {
422 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
423
424 log_namespace.insert_source_metadata(
425 SyslogConfig::NAME,
426 log,
427 legacy_host_key,
428 path!("host"),
429 parsed_host,
430 );
431 }
432
433 log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now());
434
435 if log_namespace == LogNamespace::Legacy {
436 let timestamp = log
437 .get(event_path!("timestamp"))
438 .and_then(|timestamp| timestamp.as_timestamp().cloned())
439 .unwrap_or_else(Utc::now);
440 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
441 }
442
443 trace!(
444 message = "Processing one event.",
445 event = ?event
446 );
447}
448
449#[cfg(test)]
450mod test {
451 use std::{collections::HashMap, fmt, str::FromStr};
452 use vector_lib::lookup::{event_path, owned_value_path, OwnedTargetPath};
453
454 use chrono::prelude::*;
455 use rand::{rng, Rng};
456 use serde::Deserialize;
457 use tokio::time::{sleep, Duration, Instant};
458 use tokio_util::codec::BytesCodec;
459 use vector_lib::assert_event_data_eq;
460 use vector_lib::codecs::decoding::format::Deserializer;
461 use vector_lib::lookup::PathPrefix;
462 use vector_lib::{config::ComponentKey, schema::Definition};
463 use vrl::value::{kind::Collection, Kind, ObjectMap, Value};
464
465 use super::*;
466 use crate::{
467 config::log_schema,
468 event::{Event, LogEvent},
469 test_util::{
470 components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
471 next_addr, random_maps, random_string, send_encodable, send_lines, wait_for_tcp,
472 CountReceiver,
473 },
474 };
475
476 fn event_from_bytes(
477 host_key: &str,
478 default_host: Option<Bytes>,
479 bytes: Bytes,
480 log_namespace: LogNamespace,
481 ) -> Option<Event> {
482 let parser = SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build();
483 let mut events = parser.parse(bytes, LogNamespace::Legacy).ok()?;
484 handle_events(
485 &mut events,
486 &Some(owned_value_path!(host_key)),
487 default_host,
488 log_namespace,
489 );
490 Some(events.remove(0))
491 }
492
493 #[test]
494 fn generate_config() {
495 crate::test_util::test_generate_config::<SyslogConfig>();
496 }
497
498 #[test]
499 fn output_schema_definition_vector_namespace() {
500 let config = SyslogConfig {
501 log_namespace: Some(true),
502 ..Default::default()
503 };
504
505 let definitions = config
506 .outputs(LogNamespace::Vector)
507 .remove(0)
508 .schema_definition(true);
509
510 let expected_definition =
511 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
512 .with_meaning(OwnedTargetPath::event_root(), "message")
513 .with_metadata_field(
514 &owned_value_path!("vector", "source_type"),
515 Kind::bytes(),
516 None,
517 )
518 .with_metadata_field(
519 &owned_value_path!("vector", "ingest_timestamp"),
520 Kind::timestamp(),
521 None,
522 )
523 .with_metadata_field(
524 &owned_value_path!("syslog", "timestamp"),
525 Kind::timestamp(),
526 Some("timestamp"),
527 )
528 .with_metadata_field(
529 &owned_value_path!("syslog", "hostname"),
530 Kind::bytes().or_undefined(),
531 Some("host"),
532 )
533 .with_metadata_field(
534 &owned_value_path!("syslog", "source_ip"),
535 Kind::bytes().or_undefined(),
536 None,
537 )
538 .with_metadata_field(
539 &owned_value_path!("syslog", "severity"),
540 Kind::bytes().or_undefined(),
541 Some("severity"),
542 )
543 .with_metadata_field(
544 &owned_value_path!("syslog", "facility"),
545 Kind::bytes().or_undefined(),
546 None,
547 )
548 .with_metadata_field(
549 &owned_value_path!("syslog", "version"),
550 Kind::integer().or_undefined(),
551 None,
552 )
553 .with_metadata_field(
554 &owned_value_path!("syslog", "appname"),
555 Kind::bytes().or_undefined(),
556 Some("service"),
557 )
558 .with_metadata_field(
559 &owned_value_path!("syslog", "msgid"),
560 Kind::bytes().or_undefined(),
561 None,
562 )
563 .with_metadata_field(
564 &owned_value_path!("syslog", "procid"),
565 Kind::integer().or_bytes().or_undefined(),
566 None,
567 )
568 .with_metadata_field(
569 &owned_value_path!("syslog", "structured_data"),
570 Kind::object(Collection::from_unknown(Kind::object(
571 Collection::from_unknown(Kind::bytes()),
572 ))),
573 None,
574 )
575 .with_metadata_field(
576 &owned_value_path!("syslog", "tls_client_metadata"),
577 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
578 None,
579 );
580
581 assert_eq!(definitions, Some(expected_definition));
582 }
583
584 #[test]
585 fn output_schema_definition_legacy_namespace() {
586 let config = SyslogConfig::default();
587
588 let definitions = config
589 .outputs(LogNamespace::Legacy)
590 .remove(0)
591 .schema_definition(true);
592
593 let expected_definition = Definition::new_with_default_metadata(
594 Kind::object(Collection::empty()),
595 [LogNamespace::Legacy],
596 )
597 .with_event_field(
598 &owned_value_path!("message"),
599 Kind::bytes(),
600 Some("message"),
601 )
602 .with_event_field(
603 &owned_value_path!("timestamp"),
604 Kind::timestamp(),
605 Some("timestamp"),
606 )
607 .with_event_field(
608 &owned_value_path!("hostname"),
609 Kind::bytes().or_undefined(),
610 Some("host"),
611 )
612 .with_event_field(
613 &owned_value_path!("source_ip"),
614 Kind::bytes().or_undefined(),
615 None,
616 )
617 .with_event_field(
618 &owned_value_path!("severity"),
619 Kind::bytes().or_undefined(),
620 Some("severity"),
621 )
622 .with_event_field(
623 &owned_value_path!("facility"),
624 Kind::bytes().or_undefined(),
625 None,
626 )
627 .with_event_field(
628 &owned_value_path!("version"),
629 Kind::integer().or_undefined(),
630 None,
631 )
632 .with_event_field(
633 &owned_value_path!("appname"),
634 Kind::bytes().or_undefined(),
635 Some("service"),
636 )
637 .with_event_field(
638 &owned_value_path!("msgid"),
639 Kind::bytes().or_undefined(),
640 None,
641 )
642 .with_event_field(
643 &owned_value_path!("procid"),
644 Kind::integer().or_bytes().or_undefined(),
645 None,
646 )
647 .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
648 .with_standard_vector_source_metadata();
649
650 assert_eq!(definitions, Some(expected_definition));
651 }
652
653 #[test]
654 fn config_tcp() {
655 let config: SyslogConfig = toml::from_str(
656 r#"
657 mode = "tcp"
658 address = "127.0.0.1:1235"
659 "#,
660 )
661 .unwrap();
662 assert!(matches!(config.mode, Mode::Tcp { .. }));
663 }
664
665 #[test]
666 fn config_tcp_with_receive_buffer_size() {
667 let config: SyslogConfig = toml::from_str(
668 r#"
669 mode = "tcp"
670 address = "127.0.0.1:1235"
671 receive_buffer_bytes = 256
672 "#,
673 )
674 .unwrap();
675
676 let receive_buffer_bytes = match config.mode {
677 Mode::Tcp {
678 receive_buffer_bytes,
679 ..
680 } => receive_buffer_bytes,
681 _ => panic!("expected Mode::Tcp"),
682 };
683
684 assert_eq!(receive_buffer_bytes, Some(256));
685 }
686
687 #[test]
688 fn config_tcp_keepalive_empty() {
689 let config: SyslogConfig = toml::from_str(
690 r#"
691 mode = "tcp"
692 address = "127.0.0.1:1235"
693 "#,
694 )
695 .unwrap();
696
697 let keepalive = match config.mode {
698 Mode::Tcp { keepalive, .. } => keepalive,
699 _ => panic!("expected Mode::Tcp"),
700 };
701
702 assert_eq!(keepalive, None);
703 }
704
705 #[test]
706 fn config_tcp_keepalive_full() {
707 let config: SyslogConfig = toml::from_str(
708 r#"
709 mode = "tcp"
710 address = "127.0.0.1:1235"
711 keepalive.time_secs = 7200
712 "#,
713 )
714 .unwrap();
715
716 let keepalive = match config.mode {
717 Mode::Tcp { keepalive, .. } => keepalive,
718 _ => panic!("expected Mode::Tcp"),
719 };
720
721 let keepalive = keepalive.expect("keepalive config not set");
722
723 assert_eq!(keepalive.time_secs, Some(7200));
724 }
725
726 #[test]
727 fn config_udp() {
728 let config: SyslogConfig = toml::from_str(
729 r#"
730 mode = "udp"
731 address = "127.0.0.1:1235"
732 max_length = 32187
733 "#,
734 )
735 .unwrap();
736 assert!(matches!(config.mode, Mode::Udp { .. }));
737 }
738
739 #[test]
740 fn config_udp_with_receive_buffer_size() {
741 let config: SyslogConfig = toml::from_str(
742 r#"
743 mode = "udp"
744 address = "127.0.0.1:1235"
745 max_length = 32187
746 receive_buffer_bytes = 256
747 "#,
748 )
749 .unwrap();
750
751 let receive_buffer_bytes = match config.mode {
752 Mode::Udp {
753 receive_buffer_bytes,
754 ..
755 } => receive_buffer_bytes,
756 _ => panic!("expected Mode::Udp"),
757 };
758
759 assert_eq!(receive_buffer_bytes, Some(256));
760 }
761
762 #[cfg(unix)]
763 #[test]
764 fn config_unix() {
765 let config: SyslogConfig = toml::from_str(
766 r#"
767 mode = "unix"
768 path = "127.0.0.1:1235"
769 "#,
770 )
771 .unwrap();
772 assert!(matches!(config.mode, Mode::Unix { .. }));
773 }
774
775 #[cfg(unix)]
776 #[test]
777 fn config_unix_permissions() {
778 let config: SyslogConfig = toml::from_str(
779 r#"
780 mode = "unix"
781 path = "127.0.0.1:1235"
782 socket_file_mode = 0o777
783 "#,
784 )
785 .unwrap();
786 let socket_file_mode = match config.mode {
787 Mode::Unix {
788 path: _,
789 socket_file_mode,
790 } => socket_file_mode,
791 _ => panic!("expected Mode::Unix"),
792 };
793
794 assert_eq!(socket_file_mode, Some(0o777));
795 }
796
797 #[test]
798 fn syslog_ng_network_syslog_protocol() {
799 let msg = "i am foobar";
801 let raw = format!(
802 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {}{} {}"#,
803 r#"[meta sequenceId="1" sysUpTime="37" language="EN"]"#,
804 r#"[origin ip="192.168.0.1" software="test"]"#,
805 msg
806 );
807
808 let mut expected = Event::Log(LogEvent::from(msg));
809
810 {
811 let expected = expected.as_mut_log();
812 expected.insert(
813 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
814 Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
815 .single()
816 .expect("invalid timestamp"),
817 );
818 expected.insert(
819 log_schema().source_type_key_target_path().unwrap(),
820 "syslog",
821 );
822 expected.insert("host", "74794bfb6795");
823 expected.insert("hostname", "74794bfb6795");
824
825 expected.insert("meta.sequenceId", "1");
826 expected.insert("meta.sysUpTime", "37");
827 expected.insert("meta.language", "EN");
828 expected.insert("origin.software", "test");
829 expected.insert("origin.ip", "192.168.0.1");
830
831 expected.insert("severity", "notice");
832 expected.insert("facility", "user");
833 expected.insert("version", 1);
834 expected.insert("appname", "root");
835 expected.insert("procid", 8449);
836 expected.insert("source_ip", "192.168.0.254");
837 }
838
839 assert_event_data_eq!(
840 event_from_bytes(
841 "host",
842 Some(Bytes::from("192.168.0.254")),
843 raw.into(),
844 LogNamespace::Legacy
845 )
846 .unwrap(),
847 expected
848 );
849 }
850
851 #[test]
852 fn handles_incorrect_sd_element() {
853 let msg = "qwerty";
854 let raw = format!(
855 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
856 r"[incorrect x]", msg
857 );
858
859 let mut expected = Event::Log(LogEvent::from(msg));
860 {
861 let expected = expected.as_mut_log();
862 expected.insert(
863 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
864 Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
865 .single()
866 .expect("invalid timestamp"),
867 );
868 expected.insert(
869 log_schema().host_key().unwrap().to_string().as_str(),
870 "74794bfb6795",
871 );
872 expected.insert("hostname", "74794bfb6795");
873 expected.insert(
874 log_schema().source_type_key_target_path().unwrap(),
875 "syslog",
876 );
877 expected.insert("severity", "notice");
878 expected.insert("facility", "user");
879 expected.insert("version", 1);
880 expected.insert("appname", "root");
881 expected.insert("procid", 8449);
882 expected.insert("source_ip", "192.168.0.254");
883 }
884
885 let event = event_from_bytes(
886 "host",
887 Some(Bytes::from("192.168.0.254")),
888 raw.into(),
889 LogNamespace::Legacy,
890 )
891 .unwrap();
892 assert_event_data_eq!(event, expected);
893
894 let raw = format!(
895 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
896 r"[incorrect x=]", msg
897 );
898
899 let event = event_from_bytes(
900 "host",
901 Some(Bytes::from("192.168.0.254")),
902 raw.into(),
903 LogNamespace::Legacy,
904 )
905 .unwrap();
906 assert_event_data_eq!(event, expected);
907 }
908
909 #[test]
910 fn handles_empty_sd_element() {
911 fn there_is_map_called_empty(event: Event) -> bool {
912 event
913 .as_log()
914 .get("empty")
915 .expect("empty exists")
916 .is_object()
917 }
918
919 let msg = format!(
920 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
921 r"[empty]"
922 );
923
924 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
925 assert!(there_is_map_called_empty(event));
926
927 let msg = format!(
928 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
929 r#"[non_empty x="1"][empty]"#
930 );
931
932 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
933 assert!(there_is_map_called_empty(event));
934
935 let msg = format!(
936 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
937 r#"[empty][non_empty x="1"]"#
938 );
939
940 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
941 assert!(there_is_map_called_empty(event));
942
943 let msg = format!(
944 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
945 r#"[empty not_really="testing the test"]"#
946 );
947
948 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
949 assert!(there_is_map_called_empty(event));
950 }
951
952 #[test]
953 fn handles_weird_whitespace() {
954 let raw = r#"
956 <13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
957 "#;
958 let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#;
959
960 assert_event_data_eq!(
961 event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap(),
962 event_from_bytes(
963 "host",
964 None,
965 cleaned.to_owned().into(),
966 LogNamespace::Legacy
967 )
968 .unwrap()
969 );
970 }
971
972 #[test]
973 fn handles_dots_in_sdata() {
974 let raw =
975 r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin foo.bar="baz"] hello"#;
976 let event =
977 event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap();
978 assert_eq!(
979 event.as_log().get(r#"origin."foo.bar""#),
980 Some(&Value::from("baz"))
981 );
982 }
983
984 #[test]
985 fn syslog_ng_default_network() {
986 let msg = "i am foobar";
987 let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {msg}"#);
988 let event = event_from_bytes(
989 "host",
990 Some(Bytes::from("192.168.0.254")),
991 raw.into(),
992 LogNamespace::Legacy,
993 )
994 .unwrap();
995
996 let mut expected = Event::Log(LogEvent::from(msg));
997 {
998 let value = event.as_log().get("timestamp").unwrap();
999 let year = value.as_timestamp().unwrap().naive_local().year();
1000
1001 let expected = expected.as_mut_log();
1002 let expected_date: DateTime<Utc> = Local
1003 .with_ymd_and_hms(year, 2, 13, 20, 7, 26)
1004 .single()
1005 .expect("invalid timestamp")
1006 .into();
1007
1008 expected.insert(
1009 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1010 expected_date,
1011 );
1012 expected.insert(
1013 log_schema().host_key().unwrap().to_string().as_str(),
1014 "74794bfb6795",
1015 );
1016 expected.insert(
1017 log_schema().source_type_key_target_path().unwrap(),
1018 "syslog",
1019 );
1020 expected.insert("hostname", "74794bfb6795");
1021 expected.insert("severity", "notice");
1022 expected.insert("facility", "user");
1023 expected.insert("appname", "root");
1024 expected.insert("procid", 8539);
1025 expected.insert("source_ip", "192.168.0.254");
1026 }
1027
1028 assert_event_data_eq!(event, expected);
1029 }
1030
1031 #[test]
1032 fn rsyslog_omfwd_tcp_default() {
1033 let msg = "start";
1034 let raw = format!(
1035 r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {msg}"#
1036 );
1037 let event = event_from_bytes(
1038 "host",
1039 Some(Bytes::from("192.168.0.254")),
1040 raw.into(),
1041 LogNamespace::Legacy,
1042 )
1043 .unwrap();
1044
1045 let mut expected = Event::Log(LogEvent::from(msg));
1046 {
1047 let value = event.as_log().get("timestamp").unwrap();
1048 let year = value.as_timestamp().unwrap().naive_local().year();
1049
1050 let expected = expected.as_mut_log();
1051 let expected_date: DateTime<Utc> = Local
1052 .with_ymd_and_hms(year, 2, 13, 21, 31, 56)
1053 .single()
1054 .expect("invalid timestamp")
1055 .into();
1056 expected.insert(
1057 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1058 expected_date,
1059 );
1060 expected.insert(
1061 log_schema().source_type_key_target_path().unwrap(),
1062 "syslog",
1063 );
1064 expected.insert("host", "74794bfb6795");
1065 expected.insert("hostname", "74794bfb6795");
1066 expected.insert("severity", "info");
1067 expected.insert("facility", "local7");
1068 expected.insert("appname", "liblogging-stdlog");
1069 expected.insert("origin.software", "rsyslogd");
1070 expected.insert("origin.swVersion", "8.24.0");
1071 expected.insert("source_ip", "192.168.0.254");
1072 expected.insert(event_path!("origin", "x-pid"), "8979");
1073 expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1074 }
1075
1076 assert_event_data_eq!(event, expected);
1077 }
1078
1079 #[test]
1080 fn rsyslog_omfwd_tcp_forward_format() {
1081 let msg = "start";
1082 let raw = format!(
1083 r#"<190>2019-02-13T21:53:30.605850+00:00 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="9043" x-info="http://www.rsyslog.com"] {msg}"#
1084 );
1085
1086 let mut expected = Event::Log(LogEvent::from(msg));
1087 {
1088 let expected = expected.as_mut_log();
1089 expected.insert(
1090 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1091 Utc.with_ymd_and_hms(2019, 2, 13, 21, 53, 30)
1092 .single()
1093 .and_then(|t| t.with_nanosecond(605_850 * 1000))
1094 .expect("invalid timestamp"),
1095 );
1096 expected.insert(
1097 log_schema().source_type_key_target_path().unwrap(),
1098 "syslog",
1099 );
1100 expected.insert("host", "74794bfb6795");
1101 expected.insert("hostname", "74794bfb6795");
1102 expected.insert("severity", "info");
1103 expected.insert("facility", "local7");
1104 expected.insert("appname", "liblogging-stdlog");
1105 expected.insert("origin.software", "rsyslogd");
1106 expected.insert("origin.swVersion", "8.24.0");
1107 expected.insert(event_path!("origin", "x-pid"), "9043");
1108 expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1109 }
1110
1111 assert_event_data_eq!(
1112 event_from_bytes("host", None, raw.into(), LogNamespace::Legacy).unwrap(),
1113 expected
1114 );
1115 }
1116
1117 #[tokio::test]
1118 async fn test_tcp_syslog() {
1119 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1120 let num_messages: usize = 10000;
1121 let in_addr = next_addr();
1122
1123 let config = SyslogConfig::from_mode(Mode::Tcp {
1125 address: in_addr.into(),
1126 permit_origin: None,
1127 keepalive: None,
1128 tls: None,
1129 receive_buffer_bytes: None,
1130 connection_limit: None,
1131 });
1132
1133 let key = ComponentKey::from("in");
1134 let (tx, rx) = SourceSender::new_test();
1135 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1136 let shutdown_complete = shutdown.shutdown_tripwire();
1137
1138 let source = config
1139 .build(context)
1140 .await
1141 .expect("source should not fail to build");
1142 tokio::spawn(source);
1143
1144 wait_for_tcp(in_addr).await;
1146
1147 let output_events = CountReceiver::receive_events(rx);
1148
1149 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1151 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1152 .collect();
1153
1154 let input_lines: Vec<String> =
1155 input_messages.iter().map(|msg| msg.to_string()).collect();
1156
1157 send_lines(in_addr, input_lines).await.unwrap();
1158
1159 sleep(Duration::from_secs(2)).await;
1161
1162 shutdown
1164 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1165 .await;
1166 shutdown_complete.await;
1167
1168 let output_events = output_events.await;
1169 assert_eq!(output_events.len(), num_messages);
1170
1171 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1172 .into_iter()
1173 .map(|mut e| {
1174 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1177 })
1178 .collect();
1179 assert_eq!(output_messages, input_messages);
1180 })
1181 .await;
1182 }
1183
1184 #[cfg(unix)]
1185 #[tokio::test]
1186 async fn test_unix_stream_syslog() {
1187 use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
1188 use futures_util::{stream, SinkExt};
1189 use std::os::unix::net::UnixStream as StdUnixStream;
1190 use tokio::io::AsyncWriteExt;
1191 use tokio::net::UnixStream;
1192 use tokio_util::codec::{FramedWrite, LinesCodec};
1193
1194 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1195 let num_messages: usize = 1;
1196 let in_path = tempfile::tempdir().unwrap().keep().join("stream_test");
1197
1198 let config = SyslogConfig::from_mode(Mode::Unix {
1200 path: in_path.clone(),
1201 socket_file_mode: None,
1202 });
1203
1204 let key = ComponentKey::from("in");
1205 let (tx, rx) = SourceSender::new_test();
1206 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1207 let shutdown_complete = shutdown.shutdown_tripwire();
1208
1209 let source = config
1210 .build(context)
1211 .await
1212 .expect("source should not fail to build");
1213 tokio::spawn(source);
1214
1215 while StdUnixStream::connect(&in_path).is_err() {
1217 tokio::task::yield_now().await;
1218 }
1219
1220 let output_events = CountReceiver::receive_events(rx);
1221
1222 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1224 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1225 .collect();
1226
1227 let stream = UnixStream::connect(&in_path).await.unwrap();
1228 let mut sink = FramedWrite::new(stream, LinesCodec::new());
1229
1230 let lines: Vec<String> = input_messages.iter().map(|msg| msg.to_string()).collect();
1231 let mut lines = stream::iter(lines).map(Ok);
1232 sink.send_all(&mut lines).await.unwrap();
1233
1234 let stream = sink.get_mut();
1235 stream.shutdown().await.unwrap();
1236
1237 sleep(Duration::from_secs(1)).await;
1239
1240 shutdown
1241 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1242 .await;
1243 shutdown_complete.await;
1244
1245 let output_events = output_events.await;
1246 assert_eq!(output_events.len(), num_messages);
1247
1248 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1249 .into_iter()
1250 .map(|mut e| {
1251 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1254 })
1255 .collect();
1256 assert_eq!(output_messages, input_messages);
1257 })
1258 .await;
1259 }
1260
1261 #[tokio::test]
1262 async fn test_octet_counting_syslog() {
1263 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1264 let num_messages: usize = 10000;
1265 let in_addr = next_addr();
1266
1267 let config = SyslogConfig::from_mode(Mode::Tcp {
1269 address: in_addr.into(),
1270 permit_origin: None,
1271 keepalive: None,
1272 tls: None,
1273 receive_buffer_bytes: None,
1274 connection_limit: None,
1275 });
1276
1277 let key = ComponentKey::from("in");
1278 let (tx, rx) = SourceSender::new_test();
1279 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1280 let shutdown_complete = shutdown.shutdown_tripwire();
1281
1282 let source = config
1283 .build(context)
1284 .await
1285 .expect("source should not fail to build");
1286 tokio::spawn(source);
1287
1288 wait_for_tcp(in_addr).await;
1290
1291 let output_events = CountReceiver::receive_events(rx);
1292
1293 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1295 .map(|i| {
1296 let mut msg = SyslogMessageRfc5424::random(i, 30, 4, 3, 3);
1297 msg.message.push('\n');
1298 msg.message.push_str(&random_string(30));
1299 msg
1300 })
1301 .collect();
1302
1303 let codec = BytesCodec::new();
1304 let input_lines: Vec<Bytes> = input_messages
1305 .iter()
1306 .map(|msg| {
1307 let s = msg.to_string();
1308 format!("{} {}", s.len(), s).into()
1309 })
1310 .collect();
1311
1312 send_encodable(in_addr, codec, input_lines).await.unwrap();
1313
1314 sleep(Duration::from_secs(2)).await;
1316
1317 shutdown
1319 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1320 .await;
1321 shutdown_complete.await;
1322
1323 let output_events = output_events.await;
1324 assert_eq!(output_events.len(), num_messages);
1325
1326 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1327 .into_iter()
1328 .map(|mut e| {
1329 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1332 })
1333 .collect();
1334 assert_eq!(output_messages, input_messages);
1335 })
1336 .await;
1337 }
1338
1339 #[derive(Deserialize, PartialEq, Clone, Debug)]
1340 struct SyslogMessageRfc5424 {
1341 msgid: String,
1342 severity: Severity,
1343 facility: Facility,
1344 version: u8,
1345 timestamp: String,
1346 host: String,
1347 source_type: String,
1348 appname: String,
1349 procid: usize,
1350 message: String,
1351 #[serde(flatten)]
1352 structured_data: StructuredData,
1353 }
1354
1355 impl SyslogMessageRfc5424 {
1356 fn random(
1357 id: usize,
1358 msg_len: usize,
1359 field_len: usize,
1360 max_map_size: usize,
1361 max_children: usize,
1362 ) -> Self {
1363 let msg = random_string(msg_len);
1364 let structured_data = random_structured_data(max_map_size, max_children, field_len);
1365
1366 let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
1367 Self {
1370 msgid: format!("test{id}"),
1371 severity: Severity::LOG_INFO,
1372 facility: Facility::LOG_USER,
1373 version: 1,
1374 timestamp,
1375 host: "hogwarts".to_owned(),
1376 source_type: "syslog".to_owned(),
1377 appname: "harry".to_owned(),
1378 procid: rng().random_range(0..32768),
1379 structured_data,
1380 message: msg,
1381 }
1382 }
1383 }
1384
1385 impl fmt::Display for SyslogMessageRfc5424 {
1386 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1387 write!(
1388 f,
1389 "<{}>{} {} {} {} {} {} {} {}",
1390 encode_priority(self.severity, self.facility),
1391 self.version,
1392 self.timestamp,
1393 self.host,
1394 self.appname,
1395 self.procid,
1396 self.msgid,
1397 format_structured_data_rfc5424(&self.structured_data),
1398 self.message
1399 )
1400 }
1401 }
1402
1403 impl From<Event> for SyslogMessageRfc5424 {
1404 fn from(e: Event) -> Self {
1405 let (value, _) = e.into_log().into_parts();
1406 let mut fields = value.into_object().unwrap();
1407
1408 Self {
1409 msgid: fields.remove("msgid").map(value_to_string).unwrap(),
1410 severity: fields
1411 .remove("severity")
1412 .map(value_to_string)
1413 .and_then(|s| Severity::from_str(s.as_str()))
1414 .unwrap(),
1415 facility: fields
1416 .remove("facility")
1417 .map(value_to_string)
1418 .and_then(|s| Facility::from_str(s.as_str()))
1419 .unwrap(),
1420 version: fields
1421 .remove("version")
1422 .map(value_to_string)
1423 .map(|s| u8::from_str(s.as_str()).unwrap())
1424 .unwrap(),
1425 timestamp: fields.remove("timestamp").map(value_to_string).unwrap(),
1426 host: fields.remove("host").map(value_to_string).unwrap(),
1427 source_type: fields.remove("source_type").map(value_to_string).unwrap(),
1428 appname: fields.remove("appname").map(value_to_string).unwrap(),
1429 procid: fields
1430 .remove("procid")
1431 .map(value_to_string)
1432 .map(|s| usize::from_str(s.as_str()).unwrap())
1433 .unwrap(),
1434 message: fields.remove("message").map(value_to_string).unwrap(),
1435 structured_data: structured_data_from_fields(fields),
1436 }
1437 }
1438 }
1439
1440 fn structured_data_from_fields(fields: ObjectMap) -> StructuredData {
1441 let mut structured_data = StructuredData::default();
1442
1443 for (key, value) in fields.into_iter() {
1444 let subfields = value
1445 .into_object()
1446 .unwrap()
1447 .into_iter()
1448 .map(|(k, v)| (k.into(), value_to_string(v)))
1449 .collect();
1450
1451 structured_data.insert(key.into(), subfields);
1452 }
1453
1454 structured_data
1455 }
1456
1457 #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1458 #[derive(Copy, Clone, Deserialize, PartialEq, Eq, Debug)]
1459 pub enum Severity {
1460 #[serde(rename(deserialize = "emergency"))]
1461 LOG_EMERG,
1462 #[serde(rename(deserialize = "alert"))]
1463 LOG_ALERT,
1464 #[serde(rename(deserialize = "critical"))]
1465 LOG_CRIT,
1466 #[serde(rename(deserialize = "error"))]
1467 LOG_ERR,
1468 #[serde(rename(deserialize = "warn"))]
1469 LOG_WARNING,
1470 #[serde(rename(deserialize = "notice"))]
1471 LOG_NOTICE,
1472 #[serde(rename(deserialize = "info"))]
1473 LOG_INFO,
1474 #[serde(rename(deserialize = "debug"))]
1475 LOG_DEBUG,
1476 }
1477
1478 impl Severity {
1479 fn from_str(s: &str) -> Option<Self> {
1480 match s {
1481 "emergency" => Some(Self::LOG_EMERG),
1482 "alert" => Some(Self::LOG_ALERT),
1483 "critical" => Some(Self::LOG_CRIT),
1484 "error" => Some(Self::LOG_ERR),
1485 "warn" => Some(Self::LOG_WARNING),
1486 "notice" => Some(Self::LOG_NOTICE),
1487 "info" => Some(Self::LOG_INFO),
1488 "debug" => Some(Self::LOG_DEBUG),
1489
1490 x => {
1491 #[allow(clippy::print_stdout)]
1492 {
1493 println!("converting severity str, got {x}");
1494 }
1495 None
1496 }
1497 }
1498 }
1499 }
1500
1501 #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1502 #[derive(Copy, Clone, PartialEq, Eq, Deserialize, Debug)]
1503 pub enum Facility {
1504 #[serde(rename(deserialize = "kernel"))]
1505 LOG_KERN = 0 << 3,
1506 #[serde(rename(deserialize = "user"))]
1507 LOG_USER = 1 << 3,
1508 #[serde(rename(deserialize = "mail"))]
1509 LOG_MAIL = 2 << 3,
1510 #[serde(rename(deserialize = "daemon"))]
1511 LOG_DAEMON = 3 << 3,
1512 #[serde(rename(deserialize = "auth"))]
1513 LOG_AUTH = 4 << 3,
1514 #[serde(rename(deserialize = "syslog"))]
1515 LOG_SYSLOG = 5 << 3,
1516 }
1517
1518 impl Facility {
1519 fn from_str(s: &str) -> Option<Self> {
1520 match s {
1521 "kernel" => Some(Self::LOG_KERN),
1522 "user" => Some(Self::LOG_USER),
1523 "mail" => Some(Self::LOG_MAIL),
1524 "daemon" => Some(Self::LOG_DAEMON),
1525 "auth" => Some(Self::LOG_AUTH),
1526 "syslog" => Some(Self::LOG_SYSLOG),
1527 _ => None,
1528 }
1529 }
1530 }
1531
1532 type StructuredData = HashMap<String, HashMap<String, String>>;
1533
1534 fn random_structured_data(
1535 max_map_size: usize,
1536 max_children: usize,
1537 field_len: usize,
1538 ) -> StructuredData {
1539 let amount = rng().random_range(0..max_children);
1540
1541 random_maps(max_map_size, field_len)
1542 .filter(|m| !m.is_empty()) .take(amount)
1544 .enumerate()
1545 .map(|(i, map)| (format!("id{i}"), map))
1546 .collect()
1547 }
1548
1549 fn format_structured_data_rfc5424(data: &StructuredData) -> String {
1550 if data.is_empty() {
1551 "-".to_string()
1552 } else {
1553 let mut res = String::new();
1554 for (id, params) in data {
1555 res = res + "[" + id;
1556 for (name, value) in params {
1557 res = res + " " + name + "=\"" + value + "\"";
1558 }
1559 res += "]";
1560 }
1561
1562 res
1563 }
1564 }
1565
1566 const fn encode_priority(severity: Severity, facility: Facility) -> u8 {
1567 facility as u8 | severity as u8
1568 }
1569
1570 fn value_to_string(v: Value) -> String {
1571 if v.is_bytes() {
1572 let buf = v.as_bytes().unwrap();
1573 String::from_utf8_lossy(buf).to_string()
1574 } else if v.is_timestamp() {
1575 let ts = v.as_timestamp().unwrap();
1576 ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1577 } else {
1578 v.to_string()
1579 }
1580 }
1581}