1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use chrono::Utc;
7use futures::StreamExt;
8use listenfd::ListenFd;
9use smallvec::SmallVec;
10use tokio_util::udp::UdpFramed;
11use vector_lib::{
12 codecs::{
13 BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
14 decoding::{Deserializer, Framer},
15 },
16 config::{LegacyKey, LogNamespace},
17 configurable::configurable_component,
18 ipallowlist::IpAllowlistConfig,
19 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, path},
20};
21use vrl::event_path;
22
23#[cfg(unix)]
24use crate::sources::util::build_unix_stream_source;
25use crate::{
26 SourceSender,
27 codecs::Decoder,
28 config::{
29 DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput, log_schema,
30 },
31 event::Event,
32 internal_events::{SocketBindError, SocketMode, SocketReceiveError, StreamClosedError},
33 net,
34 shutdown::ShutdownSignal,
35 sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket},
36 tcp::TcpKeepaliveConfig,
37 tls::{MaybeTlsSettings, TlsSourceConfig},
38};
39
40#[configurable_component(source("syslog", "Collect logs sent via Syslog."))]
42#[derive(Clone, Debug)]
43pub struct SyslogConfig {
44 #[serde(flatten)]
45 mode: Mode,
46
47 #[serde(default = "crate::serde::default_max_length")]
51 #[configurable(metadata(docs::type_unit = "bytes"))]
52 max_length: usize,
53
54 host_key: Option<OptionalValuePath>,
63
64 #[configurable(metadata(docs::hidden))]
66 #[serde(default)]
67 pub log_namespace: Option<bool>,
68}
69
70#[configurable_component]
72#[derive(Clone, Debug)]
73#[serde(tag = "mode", rename_all = "snake_case")]
74#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
75#[allow(clippy::large_enum_variant)]
76pub enum Mode {
77 Tcp {
79 #[configurable(derived)]
80 address: SocketListenAddr,
81
82 #[configurable(derived)]
83 keepalive: Option<TcpKeepaliveConfig>,
84
85 #[configurable(derived)]
86 permit_origin: Option<IpAllowlistConfig>,
87
88 #[configurable(derived)]
89 tls: Option<TlsSourceConfig>,
90
91 #[configurable(metadata(docs::type_unit = "bytes"))]
95 receive_buffer_bytes: Option<usize>,
96
97 connection_limit: Option<u32>,
99 },
100
101 Udp {
103 #[configurable(derived)]
104 address: SocketListenAddr,
105
106 #[configurable(metadata(docs::type_unit = "bytes"))]
110 receive_buffer_bytes: Option<usize>,
111 },
112
113 #[cfg(unix)]
117 Unix {
118 #[configurable(metadata(docs::examples = "/path/to/socket"))]
122 path: PathBuf,
123
124 socket_file_mode: Option<u32>,
129 },
130}
131
132impl SyslogConfig {
133 #[cfg(test)]
134 pub fn from_mode(mode: Mode) -> Self {
135 Self {
136 mode,
137 host_key: None,
138 max_length: crate::serde::default_max_length(),
139 log_namespace: None,
140 }
141 }
142}
143
144impl Default for SyslogConfig {
145 fn default() -> Self {
146 Self {
147 mode: Mode::Tcp {
148 address: SocketListenAddr::SocketAddr("0.0.0.0:514".parse().unwrap()),
149 keepalive: None,
150 permit_origin: None,
151 tls: None,
152 receive_buffer_bytes: None,
153 connection_limit: None,
154 },
155 host_key: None,
156 max_length: crate::serde::default_max_length(),
157 log_namespace: None,
158 }
159 }
160}
161
162impl GenerateConfig for SyslogConfig {
163 fn generate_config() -> toml::Value {
164 toml::Value::try_from(SyslogConfig::default()).unwrap()
165 }
166}
167
168#[async_trait::async_trait]
169#[typetag::serde(name = "syslog")]
170impl SourceConfig for SyslogConfig {
171 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
172 let log_namespace = cx.log_namespace(self.log_namespace);
173 let host_key = self
174 .host_key
175 .clone()
176 .and_then(|k| k.path)
177 .or(log_schema().host_key().cloned());
178
179 match self.mode.clone() {
180 Mode::Tcp {
181 address,
182 keepalive,
183 permit_origin,
184 tls,
185 receive_buffer_bytes,
186 connection_limit,
187 } => {
188 let source = SyslogTcpSource {
189 max_length: self.max_length,
190 host_key,
191 log_namespace,
192 };
193 let shutdown_secs = Duration::from_secs(30);
194 let tls_config = tls.as_ref().map(|tls| tls.tls_config.clone());
195 let tls_client_metadata_key = tls
196 .as_ref()
197 .and_then(|tls| tls.client_metadata_key.clone())
198 .and_then(|k| k.path);
199 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
200 source.run(
201 address,
202 keepalive,
203 shutdown_secs,
204 tls,
205 tls_client_metadata_key,
206 receive_buffer_bytes,
207 None,
208 cx,
209 false.into(),
210 connection_limit,
211 permit_origin.map(Into::into),
212 SyslogConfig::NAME,
213 log_namespace,
214 )
215 }
216 Mode::Udp {
217 address,
218 receive_buffer_bytes,
219 } => Ok(udp(
220 address,
221 self.max_length,
222 host_key,
223 receive_buffer_bytes,
224 cx.shutdown,
225 log_namespace,
226 cx.out,
227 )),
228 #[cfg(unix)]
229 Mode::Unix {
230 path,
231 socket_file_mode,
232 } => {
233 let decoder = Decoder::new(
234 Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(
235 self.max_length,
236 )),
237 Deserializer::Syslog(
238 SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
239 ),
240 );
241
242 build_unix_stream_source(
243 path,
244 socket_file_mode,
245 decoder,
246 move |events, host| handle_events(events, &host_key, host, log_namespace),
247 cx.shutdown,
248 cx.out,
249 )
250 }
251 }
252 }
253
254 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
255 let log_namespace = global_log_namespace.merge(self.log_namespace);
256 let schema_definition = SyslogDeserializerConfig::from_source(SyslogConfig::NAME)
257 .schema_definition(log_namespace)
258 .with_standard_vector_source_metadata();
259
260 vec![SourceOutput::new_maybe_logs(
261 DataType::Log,
262 schema_definition,
263 )]
264 }
265
266 fn resources(&self) -> Vec<Resource> {
267 match self.mode.clone() {
268 Mode::Tcp { address, .. } => vec![address.as_tcp_resource()],
269 Mode::Udp { address, .. } => vec![address.as_udp_resource()],
270 #[cfg(unix)]
271 Mode::Unix { .. } => vec![],
272 }
273 }
274
275 fn can_acknowledge(&self) -> bool {
276 false
277 }
278}
279
280#[derive(Debug, Clone)]
281struct SyslogTcpSource {
282 max_length: usize,
283 host_key: Option<OwnedValuePath>,
284 log_namespace: LogNamespace,
285}
286
287impl TcpSource for SyslogTcpSource {
288 type Error = vector_lib::codecs::decoding::Error;
289 type Item = SmallVec<[Event; 1]>;
290 type Decoder = Decoder;
291 type Acker = TcpNullAcker;
292
293 fn decoder(&self) -> Self::Decoder {
294 Decoder::new(
295 Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(self.max_length)),
296 Deserializer::Syslog(SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build()),
297 )
298 }
299
300 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
301 handle_events(
302 events,
303 &self.host_key,
304 Some(host.ip().to_string().into()),
305 self.log_namespace,
306 );
307 }
308
309 fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
310 TcpNullAcker
311 }
312}
313
314pub fn udp(
315 addr: SocketListenAddr,
316 _max_length: usize,
317 host_key: Option<OwnedValuePath>,
318 receive_buffer_bytes: Option<usize>,
319 shutdown: ShutdownSignal,
320 log_namespace: LogNamespace,
321 mut out: SourceSender,
322) -> super::Source {
323 Box::pin(async move {
324 let listenfd = ListenFd::from_env();
325 let socket = try_bind_udp_socket(addr, listenfd).await.map_err(|error| {
326 emit!(SocketBindError {
327 mode: SocketMode::Udp,
328 error: &error,
329 })
330 })?;
331
332 if let Some(receive_buffer_bytes) = receive_buffer_bytes
333 && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
334 {
335 warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
336 }
337
338 info!(
339 message = "Listening.",
340 addr = %addr,
341 r#type = "udp"
342 );
343
344 let mut stream = UdpFramed::new(
345 socket,
346 Decoder::new(
347 Framer::Bytes(BytesDecoder::new()),
348 Deserializer::Syslog(
349 SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
350 ),
351 ),
352 )
353 .take_until(shutdown)
354 .filter_map(|frame| {
355 let host_key = host_key.clone();
356 async move {
357 match frame {
358 Ok(((mut events, _byte_size), received_from)) => {
359 let received_from = received_from.ip().to_string().into();
360 handle_events(&mut events, &host_key, Some(received_from), log_namespace);
361 Some(events.remove(0))
362 }
363 Err(error) => {
364 emit!(SocketReceiveError {
365 mode: SocketMode::Udp,
366 error: &error,
367 });
368 None
369 }
370 }
371 }
372 })
373 .boxed();
374
375 match out.send_event_stream(&mut stream).await {
376 Ok(()) => {
377 debug!("Finished sending.");
378 Ok(())
379 }
380 Err(_) => {
381 let (count, _) = stream.size_hint();
382 emit!(StreamClosedError { count });
383 Err(())
384 }
385 }
386 })
387}
388
389fn handle_events(
390 events: &mut [Event],
391 host_key: &Option<OwnedValuePath>,
392 default_host: Option<Bytes>,
393 log_namespace: LogNamespace,
394) {
395 for event in events {
396 enrich_syslog_event(event, host_key, default_host.clone(), log_namespace);
397 }
398}
399
400fn enrich_syslog_event(
401 event: &mut Event,
402 host_key: &Option<OwnedValuePath>,
403 default_host: Option<Bytes>,
404 log_namespace: LogNamespace,
405) {
406 let log = event.as_mut_log();
407
408 if let Some(default_host) = &default_host {
409 log_namespace.insert_source_metadata(
410 SyslogConfig::NAME,
411 log,
412 Some(LegacyKey::Overwrite(path!("source_ip"))),
413 path!("source_ip"),
414 default_host.clone(),
415 );
416 }
417
418 let parsed_hostname = log
419 .get(event_path!("hostname"))
420 .map(|hostname| hostname.coerce_to_bytes());
421
422 if let Some(parsed_host) = parsed_hostname.or(default_host) {
423 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
424
425 log_namespace.insert_source_metadata(
426 SyslogConfig::NAME,
427 log,
428 legacy_host_key,
429 path!("host"),
430 parsed_host,
431 );
432 }
433
434 log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now());
435
436 if log_namespace == LogNamespace::Legacy {
437 let timestamp = log
438 .get(event_path!("timestamp"))
439 .and_then(|timestamp| timestamp.as_timestamp().cloned())
440 .unwrap_or_else(Utc::now);
441 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
442 }
443
444 trace!(
445 message = "Processing one event.",
446 event = ?event
447 );
448}
449
450#[cfg(test)]
451mod test {
452 use std::{collections::HashMap, fmt, str::FromStr};
453
454 use chrono::prelude::*;
455 use rand::{Rng, rng};
456 use serde::Deserialize;
457 use tokio::time::{Duration, Instant, sleep};
458 use tokio_util::codec::BytesCodec;
459 use vector_lib::{
460 assert_event_data_eq,
461 codecs::decoding::format::Deserializer,
462 config::ComponentKey,
463 lookup::{OwnedTargetPath, PathPrefix, event_path, owned_value_path},
464 schema::Definition,
465 };
466 use vrl::value::{Kind, ObjectMap, Value, kind::Collection};
467
468 use super::*;
469 use crate::{
470 config::log_schema,
471 event::{Event, LogEvent},
472 test_util::{
473 CountReceiver,
474 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
475 next_addr, random_maps, random_string, send_encodable, send_lines, wait_for_tcp,
476 },
477 };
478
479 fn event_from_bytes(
480 host_key: &str,
481 default_host: Option<Bytes>,
482 bytes: Bytes,
483 log_namespace: LogNamespace,
484 ) -> Option<Event> {
485 let parser = SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build();
486 let mut events = parser.parse(bytes, LogNamespace::Legacy).ok()?;
487 handle_events(
488 &mut events,
489 &Some(owned_value_path!(host_key)),
490 default_host,
491 log_namespace,
492 );
493 Some(events.remove(0))
494 }
495
496 #[test]
497 fn generate_config() {
498 crate::test_util::test_generate_config::<SyslogConfig>();
499 }
500
501 #[test]
502 fn output_schema_definition_vector_namespace() {
503 let config = SyslogConfig {
504 log_namespace: Some(true),
505 ..Default::default()
506 };
507
508 let definitions = config
509 .outputs(LogNamespace::Vector)
510 .remove(0)
511 .schema_definition(true);
512
513 let expected_definition =
514 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
515 .with_meaning(OwnedTargetPath::event_root(), "message")
516 .with_metadata_field(
517 &owned_value_path!("vector", "source_type"),
518 Kind::bytes(),
519 None,
520 )
521 .with_metadata_field(
522 &owned_value_path!("vector", "ingest_timestamp"),
523 Kind::timestamp(),
524 None,
525 )
526 .with_metadata_field(
527 &owned_value_path!("syslog", "timestamp"),
528 Kind::timestamp(),
529 Some("timestamp"),
530 )
531 .with_metadata_field(
532 &owned_value_path!("syslog", "hostname"),
533 Kind::bytes().or_undefined(),
534 Some("host"),
535 )
536 .with_metadata_field(
537 &owned_value_path!("syslog", "source_ip"),
538 Kind::bytes().or_undefined(),
539 None,
540 )
541 .with_metadata_field(
542 &owned_value_path!("syslog", "severity"),
543 Kind::bytes().or_undefined(),
544 Some("severity"),
545 )
546 .with_metadata_field(
547 &owned_value_path!("syslog", "facility"),
548 Kind::bytes().or_undefined(),
549 None,
550 )
551 .with_metadata_field(
552 &owned_value_path!("syslog", "version"),
553 Kind::integer().or_undefined(),
554 None,
555 )
556 .with_metadata_field(
557 &owned_value_path!("syslog", "appname"),
558 Kind::bytes().or_undefined(),
559 Some("service"),
560 )
561 .with_metadata_field(
562 &owned_value_path!("syslog", "msgid"),
563 Kind::bytes().or_undefined(),
564 None,
565 )
566 .with_metadata_field(
567 &owned_value_path!("syslog", "procid"),
568 Kind::integer().or_bytes().or_undefined(),
569 None,
570 )
571 .with_metadata_field(
572 &owned_value_path!("syslog", "structured_data"),
573 Kind::object(Collection::from_unknown(Kind::object(
574 Collection::from_unknown(Kind::bytes()),
575 ))),
576 None,
577 )
578 .with_metadata_field(
579 &owned_value_path!("syslog", "tls_client_metadata"),
580 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
581 None,
582 );
583
584 assert_eq!(definitions, Some(expected_definition));
585 }
586
587 #[test]
588 fn output_schema_definition_legacy_namespace() {
589 let config = SyslogConfig::default();
590
591 let definitions = config
592 .outputs(LogNamespace::Legacy)
593 .remove(0)
594 .schema_definition(true);
595
596 let expected_definition = Definition::new_with_default_metadata(
597 Kind::object(Collection::empty()),
598 [LogNamespace::Legacy],
599 )
600 .with_event_field(
601 &owned_value_path!("message"),
602 Kind::bytes(),
603 Some("message"),
604 )
605 .with_event_field(
606 &owned_value_path!("timestamp"),
607 Kind::timestamp(),
608 Some("timestamp"),
609 )
610 .with_event_field(
611 &owned_value_path!("hostname"),
612 Kind::bytes().or_undefined(),
613 Some("host"),
614 )
615 .with_event_field(
616 &owned_value_path!("source_ip"),
617 Kind::bytes().or_undefined(),
618 None,
619 )
620 .with_event_field(
621 &owned_value_path!("severity"),
622 Kind::bytes().or_undefined(),
623 Some("severity"),
624 )
625 .with_event_field(
626 &owned_value_path!("facility"),
627 Kind::bytes().or_undefined(),
628 None,
629 )
630 .with_event_field(
631 &owned_value_path!("version"),
632 Kind::integer().or_undefined(),
633 None,
634 )
635 .with_event_field(
636 &owned_value_path!("appname"),
637 Kind::bytes().or_undefined(),
638 Some("service"),
639 )
640 .with_event_field(
641 &owned_value_path!("msgid"),
642 Kind::bytes().or_undefined(),
643 None,
644 )
645 .with_event_field(
646 &owned_value_path!("procid"),
647 Kind::integer().or_bytes().or_undefined(),
648 None,
649 )
650 .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
651 .with_standard_vector_source_metadata();
652
653 assert_eq!(definitions, Some(expected_definition));
654 }
655
656 #[test]
657 fn config_tcp() {
658 let config: SyslogConfig = toml::from_str(
659 r#"
660 mode = "tcp"
661 address = "127.0.0.1:1235"
662 "#,
663 )
664 .unwrap();
665 assert!(matches!(config.mode, Mode::Tcp { .. }));
666 }
667
668 #[test]
669 fn config_tcp_with_receive_buffer_size() {
670 let config: SyslogConfig = toml::from_str(
671 r#"
672 mode = "tcp"
673 address = "127.0.0.1:1235"
674 receive_buffer_bytes = 256
675 "#,
676 )
677 .unwrap();
678
679 let receive_buffer_bytes = match config.mode {
680 Mode::Tcp {
681 receive_buffer_bytes,
682 ..
683 } => receive_buffer_bytes,
684 _ => panic!("expected Mode::Tcp"),
685 };
686
687 assert_eq!(receive_buffer_bytes, Some(256));
688 }
689
690 #[test]
691 fn config_tcp_keepalive_empty() {
692 let config: SyslogConfig = toml::from_str(
693 r#"
694 mode = "tcp"
695 address = "127.0.0.1:1235"
696 "#,
697 )
698 .unwrap();
699
700 let keepalive = match config.mode {
701 Mode::Tcp { keepalive, .. } => keepalive,
702 _ => panic!("expected Mode::Tcp"),
703 };
704
705 assert_eq!(keepalive, None);
706 }
707
708 #[test]
709 fn config_tcp_keepalive_full() {
710 let config: SyslogConfig = toml::from_str(
711 r#"
712 mode = "tcp"
713 address = "127.0.0.1:1235"
714 keepalive.time_secs = 7200
715 "#,
716 )
717 .unwrap();
718
719 let keepalive = match config.mode {
720 Mode::Tcp { keepalive, .. } => keepalive,
721 _ => panic!("expected Mode::Tcp"),
722 };
723
724 let keepalive = keepalive.expect("keepalive config not set");
725
726 assert_eq!(keepalive.time_secs, Some(7200));
727 }
728
729 #[test]
730 fn config_udp() {
731 let config: SyslogConfig = toml::from_str(
732 r#"
733 mode = "udp"
734 address = "127.0.0.1:1235"
735 max_length = 32187
736 "#,
737 )
738 .unwrap();
739 assert!(matches!(config.mode, Mode::Udp { .. }));
740 }
741
742 #[test]
743 fn config_udp_with_receive_buffer_size() {
744 let config: SyslogConfig = toml::from_str(
745 r#"
746 mode = "udp"
747 address = "127.0.0.1:1235"
748 max_length = 32187
749 receive_buffer_bytes = 256
750 "#,
751 )
752 .unwrap();
753
754 let receive_buffer_bytes = match config.mode {
755 Mode::Udp {
756 receive_buffer_bytes,
757 ..
758 } => receive_buffer_bytes,
759 _ => panic!("expected Mode::Udp"),
760 };
761
762 assert_eq!(receive_buffer_bytes, Some(256));
763 }
764
765 #[cfg(unix)]
766 #[test]
767 fn config_unix() {
768 let config: SyslogConfig = toml::from_str(
769 r#"
770 mode = "unix"
771 path = "127.0.0.1:1235"
772 "#,
773 )
774 .unwrap();
775 assert!(matches!(config.mode, Mode::Unix { .. }));
776 }
777
778 #[cfg(unix)]
779 #[test]
780 fn config_unix_permissions() {
781 let config: SyslogConfig = toml::from_str(
782 r#"
783 mode = "unix"
784 path = "127.0.0.1:1235"
785 socket_file_mode = 0o777
786 "#,
787 )
788 .unwrap();
789 let socket_file_mode = match config.mode {
790 Mode::Unix {
791 path: _,
792 socket_file_mode,
793 } => socket_file_mode,
794 _ => panic!("expected Mode::Unix"),
795 };
796
797 assert_eq!(socket_file_mode, Some(0o777));
798 }
799
800 #[test]
801 fn syslog_ng_network_syslog_protocol() {
802 let msg = "i am foobar";
804 let raw = format!(
805 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {}{} {}"#,
806 r#"[meta sequenceId="1" sysUpTime="37" language="EN"]"#,
807 r#"[origin ip="192.168.0.1" software="test"]"#,
808 msg
809 );
810
811 let mut expected = Event::Log(LogEvent::from(msg));
812
813 {
814 let expected = expected.as_mut_log();
815 expected.insert(
816 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
817 Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
818 .single()
819 .expect("invalid timestamp"),
820 );
821 expected.insert(
822 log_schema().source_type_key_target_path().unwrap(),
823 "syslog",
824 );
825 expected.insert("host", "74794bfb6795");
826 expected.insert("hostname", "74794bfb6795");
827
828 expected.insert("meta.sequenceId", "1");
829 expected.insert("meta.sysUpTime", "37");
830 expected.insert("meta.language", "EN");
831 expected.insert("origin.software", "test");
832 expected.insert("origin.ip", "192.168.0.1");
833
834 expected.insert("severity", "notice");
835 expected.insert("facility", "user");
836 expected.insert("version", 1);
837 expected.insert("appname", "root");
838 expected.insert("procid", 8449);
839 expected.insert("source_ip", "192.168.0.254");
840 }
841
842 assert_event_data_eq!(
843 event_from_bytes(
844 "host",
845 Some(Bytes::from("192.168.0.254")),
846 raw.into(),
847 LogNamespace::Legacy
848 )
849 .unwrap(),
850 expected
851 );
852 }
853
854 #[test]
855 fn handles_incorrect_sd_element() {
856 let msg = "qwerty";
857 let raw = format!(
858 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
859 r"[incorrect x]", msg
860 );
861
862 let mut expected = Event::Log(LogEvent::from(msg));
863 {
864 let expected = expected.as_mut_log();
865 expected.insert(
866 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
867 Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
868 .single()
869 .expect("invalid timestamp"),
870 );
871 expected.insert(
872 log_schema().host_key().unwrap().to_string().as_str(),
873 "74794bfb6795",
874 );
875 expected.insert("hostname", "74794bfb6795");
876 expected.insert(
877 log_schema().source_type_key_target_path().unwrap(),
878 "syslog",
879 );
880 expected.insert("severity", "notice");
881 expected.insert("facility", "user");
882 expected.insert("version", 1);
883 expected.insert("appname", "root");
884 expected.insert("procid", 8449);
885 expected.insert("source_ip", "192.168.0.254");
886 }
887
888 let event = event_from_bytes(
889 "host",
890 Some(Bytes::from("192.168.0.254")),
891 raw.into(),
892 LogNamespace::Legacy,
893 )
894 .unwrap();
895 assert_event_data_eq!(event, expected);
896
897 let raw = format!(
898 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
899 r"[incorrect x=]", msg
900 );
901
902 let event = event_from_bytes(
903 "host",
904 Some(Bytes::from("192.168.0.254")),
905 raw.into(),
906 LogNamespace::Legacy,
907 )
908 .unwrap();
909 assert_event_data_eq!(event, expected);
910 }
911
912 #[test]
913 fn handles_empty_sd_element() {
914 fn there_is_map_called_empty(event: Event) -> bool {
915 event
916 .as_log()
917 .get("empty")
918 .expect("empty exists")
919 .is_object()
920 }
921
922 let msg = format!(
923 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
924 r"[empty]"
925 );
926
927 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
928 assert!(there_is_map_called_empty(event));
929
930 let msg = format!(
931 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
932 r#"[non_empty x="1"][empty]"#
933 );
934
935 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
936 assert!(there_is_map_called_empty(event));
937
938 let msg = format!(
939 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
940 r#"[empty][non_empty x="1"]"#
941 );
942
943 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
944 assert!(there_is_map_called_empty(event));
945
946 let msg = format!(
947 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
948 r#"[empty not_really="testing the test"]"#
949 );
950
951 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
952 assert!(there_is_map_called_empty(event));
953 }
954
955 #[test]
956 fn handles_weird_whitespace() {
957 let raw = r#"
959 <13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
960 "#;
961 let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#;
962
963 assert_event_data_eq!(
964 event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap(),
965 event_from_bytes(
966 "host",
967 None,
968 cleaned.to_owned().into(),
969 LogNamespace::Legacy
970 )
971 .unwrap()
972 );
973 }
974
975 #[test]
976 fn handles_dots_in_sdata() {
977 let raw =
978 r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin foo.bar="baz"] hello"#;
979 let event =
980 event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap();
981 assert_eq!(
982 event.as_log().get(r#"origin."foo.bar""#),
983 Some(&Value::from("baz"))
984 );
985 }
986
987 #[test]
988 fn syslog_ng_default_network() {
989 let msg = "i am foobar";
990 let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {msg}"#);
991 let event = event_from_bytes(
992 "host",
993 Some(Bytes::from("192.168.0.254")),
994 raw.into(),
995 LogNamespace::Legacy,
996 )
997 .unwrap();
998
999 let mut expected = Event::Log(LogEvent::from(msg));
1000 {
1001 let value = event.as_log().get("timestamp").unwrap();
1002 let year = value.as_timestamp().unwrap().naive_local().year();
1003
1004 let expected = expected.as_mut_log();
1005 let expected_date: DateTime<Utc> = Local
1006 .with_ymd_and_hms(year, 2, 13, 20, 7, 26)
1007 .single()
1008 .expect("invalid timestamp")
1009 .into();
1010
1011 expected.insert(
1012 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1013 expected_date,
1014 );
1015 expected.insert(
1016 log_schema().host_key().unwrap().to_string().as_str(),
1017 "74794bfb6795",
1018 );
1019 expected.insert(
1020 log_schema().source_type_key_target_path().unwrap(),
1021 "syslog",
1022 );
1023 expected.insert("hostname", "74794bfb6795");
1024 expected.insert("severity", "notice");
1025 expected.insert("facility", "user");
1026 expected.insert("appname", "root");
1027 expected.insert("procid", 8539);
1028 expected.insert("source_ip", "192.168.0.254");
1029 }
1030
1031 assert_event_data_eq!(event, expected);
1032 }
1033
1034 #[test]
1035 fn rsyslog_omfwd_tcp_default() {
1036 let msg = "start";
1037 let raw = format!(
1038 r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {msg}"#
1039 );
1040 let event = event_from_bytes(
1041 "host",
1042 Some(Bytes::from("192.168.0.254")),
1043 raw.into(),
1044 LogNamespace::Legacy,
1045 )
1046 .unwrap();
1047
1048 let mut expected = Event::Log(LogEvent::from(msg));
1049 {
1050 let value = event.as_log().get("timestamp").unwrap();
1051 let year = value.as_timestamp().unwrap().naive_local().year();
1052
1053 let expected = expected.as_mut_log();
1054 let expected_date: DateTime<Utc> = Local
1055 .with_ymd_and_hms(year, 2, 13, 21, 31, 56)
1056 .single()
1057 .expect("invalid timestamp")
1058 .into();
1059 expected.insert(
1060 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1061 expected_date,
1062 );
1063 expected.insert(
1064 log_schema().source_type_key_target_path().unwrap(),
1065 "syslog",
1066 );
1067 expected.insert("host", "74794bfb6795");
1068 expected.insert("hostname", "74794bfb6795");
1069 expected.insert("severity", "info");
1070 expected.insert("facility", "local7");
1071 expected.insert("appname", "liblogging-stdlog");
1072 expected.insert("origin.software", "rsyslogd");
1073 expected.insert("origin.swVersion", "8.24.0");
1074 expected.insert("source_ip", "192.168.0.254");
1075 expected.insert(event_path!("origin", "x-pid"), "8979");
1076 expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1077 }
1078
1079 assert_event_data_eq!(event, expected);
1080 }
1081
1082 #[test]
1083 fn rsyslog_omfwd_tcp_forward_format() {
1084 let msg = "start";
1085 let raw = format!(
1086 r#"<190>2019-02-13T21:53:30.605850+00:00 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="9043" x-info="http://www.rsyslog.com"] {msg}"#
1087 );
1088
1089 let mut expected = Event::Log(LogEvent::from(msg));
1090 {
1091 let expected = expected.as_mut_log();
1092 expected.insert(
1093 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1094 Utc.with_ymd_and_hms(2019, 2, 13, 21, 53, 30)
1095 .single()
1096 .and_then(|t| t.with_nanosecond(605_850 * 1000))
1097 .expect("invalid timestamp"),
1098 );
1099 expected.insert(
1100 log_schema().source_type_key_target_path().unwrap(),
1101 "syslog",
1102 );
1103 expected.insert("host", "74794bfb6795");
1104 expected.insert("hostname", "74794bfb6795");
1105 expected.insert("severity", "info");
1106 expected.insert("facility", "local7");
1107 expected.insert("appname", "liblogging-stdlog");
1108 expected.insert("origin.software", "rsyslogd");
1109 expected.insert("origin.swVersion", "8.24.0");
1110 expected.insert(event_path!("origin", "x-pid"), "9043");
1111 expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1112 }
1113
1114 assert_event_data_eq!(
1115 event_from_bytes("host", None, raw.into(), LogNamespace::Legacy).unwrap(),
1116 expected
1117 );
1118 }
1119
1120 #[tokio::test]
1121 async fn test_tcp_syslog() {
1122 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1123 let num_messages: usize = 10000;
1124 let in_addr = next_addr();
1125
1126 let config = SyslogConfig::from_mode(Mode::Tcp {
1128 address: in_addr.into(),
1129 permit_origin: None,
1130 keepalive: None,
1131 tls: None,
1132 receive_buffer_bytes: None,
1133 connection_limit: None,
1134 });
1135
1136 let key = ComponentKey::from("in");
1137 let (tx, rx) = SourceSender::new_test();
1138 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1139 let shutdown_complete = shutdown.shutdown_tripwire();
1140
1141 let source = config
1142 .build(context)
1143 .await
1144 .expect("source should not fail to build");
1145 tokio::spawn(source);
1146
1147 wait_for_tcp(in_addr).await;
1149
1150 let output_events = CountReceiver::receive_events(rx);
1151
1152 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1154 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1155 .collect();
1156
1157 let input_lines: Vec<String> =
1158 input_messages.iter().map(|msg| msg.to_string()).collect();
1159
1160 send_lines(in_addr, input_lines).await.unwrap();
1161
1162 sleep(Duration::from_secs(2)).await;
1164
1165 shutdown
1167 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1168 .await;
1169 shutdown_complete.await;
1170
1171 let output_events = output_events.await;
1172 assert_eq!(output_events.len(), num_messages);
1173
1174 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1175 .into_iter()
1176 .map(|mut e| {
1177 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1180 })
1181 .collect();
1182 assert_eq!(output_messages, input_messages);
1183 })
1184 .await;
1185 }
1186
1187 #[cfg(unix)]
1188 #[tokio::test]
1189 async fn test_unix_stream_syslog() {
1190 use std::os::unix::net::UnixStream as StdUnixStream;
1191
1192 use futures_util::{SinkExt, stream};
1193 use tokio::{io::AsyncWriteExt, net::UnixStream};
1194 use tokio_util::codec::{FramedWrite, LinesCodec};
1195
1196 use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
1197
1198 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1199 let num_messages: usize = 1;
1200 let in_path = tempfile::tempdir().unwrap().keep().join("stream_test");
1201
1202 let config = SyslogConfig::from_mode(Mode::Unix {
1204 path: in_path.clone(),
1205 socket_file_mode: None,
1206 });
1207
1208 let key = ComponentKey::from("in");
1209 let (tx, rx) = SourceSender::new_test();
1210 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1211 let shutdown_complete = shutdown.shutdown_tripwire();
1212
1213 let source = config
1214 .build(context)
1215 .await
1216 .expect("source should not fail to build");
1217 tokio::spawn(source);
1218
1219 while StdUnixStream::connect(&in_path).is_err() {
1221 tokio::task::yield_now().await;
1222 }
1223
1224 let output_events = CountReceiver::receive_events(rx);
1225
1226 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1228 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1229 .collect();
1230
1231 let stream = UnixStream::connect(&in_path).await.unwrap();
1232 let mut sink = FramedWrite::new(stream, LinesCodec::new());
1233
1234 let lines: Vec<String> = input_messages.iter().map(|msg| msg.to_string()).collect();
1235 let mut lines = stream::iter(lines).map(Ok);
1236 sink.send_all(&mut lines).await.unwrap();
1237
1238 let stream = sink.get_mut();
1239 stream.shutdown().await.unwrap();
1240
1241 sleep(Duration::from_secs(1)).await;
1243
1244 shutdown
1245 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1246 .await;
1247 shutdown_complete.await;
1248
1249 let output_events = output_events.await;
1250 assert_eq!(output_events.len(), num_messages);
1251
1252 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1253 .into_iter()
1254 .map(|mut e| {
1255 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1258 })
1259 .collect();
1260 assert_eq!(output_messages, input_messages);
1261 })
1262 .await;
1263 }
1264
1265 #[tokio::test]
1266 async fn test_octet_counting_syslog() {
1267 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1268 let num_messages: usize = 10000;
1269 let in_addr = next_addr();
1270
1271 let config = SyslogConfig::from_mode(Mode::Tcp {
1273 address: in_addr.into(),
1274 permit_origin: None,
1275 keepalive: None,
1276 tls: None,
1277 receive_buffer_bytes: None,
1278 connection_limit: None,
1279 });
1280
1281 let key = ComponentKey::from("in");
1282 let (tx, rx) = SourceSender::new_test();
1283 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1284 let shutdown_complete = shutdown.shutdown_tripwire();
1285
1286 let source = config
1287 .build(context)
1288 .await
1289 .expect("source should not fail to build");
1290 tokio::spawn(source);
1291
1292 wait_for_tcp(in_addr).await;
1294
1295 let output_events = CountReceiver::receive_events(rx);
1296
1297 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1299 .map(|i| {
1300 let mut msg = SyslogMessageRfc5424::random(i, 30, 4, 3, 3);
1301 msg.message.push('\n');
1302 msg.message.push_str(&random_string(30));
1303 msg
1304 })
1305 .collect();
1306
1307 let codec = BytesCodec::new();
1308 let input_lines: Vec<Bytes> = input_messages
1309 .iter()
1310 .map(|msg| {
1311 let s = msg.to_string();
1312 format!("{} {}", s.len(), s).into()
1313 })
1314 .collect();
1315
1316 send_encodable(in_addr, codec, input_lines).await.unwrap();
1317
1318 sleep(Duration::from_secs(2)).await;
1320
1321 shutdown
1323 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1324 .await;
1325 shutdown_complete.await;
1326
1327 let output_events = output_events.await;
1328 assert_eq!(output_events.len(), num_messages);
1329
1330 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1331 .into_iter()
1332 .map(|mut e| {
1333 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1336 })
1337 .collect();
1338 assert_eq!(output_messages, input_messages);
1339 })
1340 .await;
1341 }
1342
1343 #[derive(Deserialize, PartialEq, Clone, Debug)]
1344 struct SyslogMessageRfc5424 {
1345 msgid: String,
1346 severity: Severity,
1347 facility: Facility,
1348 version: u8,
1349 timestamp: String,
1350 host: String,
1351 source_type: String,
1352 appname: String,
1353 procid: usize,
1354 message: String,
1355 #[serde(flatten)]
1356 structured_data: StructuredData,
1357 }
1358
1359 impl SyslogMessageRfc5424 {
1360 fn random(
1361 id: usize,
1362 msg_len: usize,
1363 field_len: usize,
1364 max_map_size: usize,
1365 max_children: usize,
1366 ) -> Self {
1367 let msg = random_string(msg_len);
1368 let structured_data = random_structured_data(max_map_size, max_children, field_len);
1369
1370 let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
1371 Self {
1374 msgid: format!("test{id}"),
1375 severity: Severity::LOG_INFO,
1376 facility: Facility::LOG_USER,
1377 version: 1,
1378 timestamp,
1379 host: "hogwarts".to_owned(),
1380 source_type: "syslog".to_owned(),
1381 appname: "harry".to_owned(),
1382 procid: rng().random_range(0..32768),
1383 structured_data,
1384 message: msg,
1385 }
1386 }
1387 }
1388
1389 impl fmt::Display for SyslogMessageRfc5424 {
1390 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1391 write!(
1392 f,
1393 "<{}>{} {} {} {} {} {} {} {}",
1394 encode_priority(self.severity, self.facility),
1395 self.version,
1396 self.timestamp,
1397 self.host,
1398 self.appname,
1399 self.procid,
1400 self.msgid,
1401 format_structured_data_rfc5424(&self.structured_data),
1402 self.message
1403 )
1404 }
1405 }
1406
1407 impl From<Event> for SyslogMessageRfc5424 {
1408 fn from(e: Event) -> Self {
1409 let (value, _) = e.into_log().into_parts();
1410 let mut fields = value.into_object().unwrap();
1411
1412 Self {
1413 msgid: fields.remove("msgid").map(value_to_string).unwrap(),
1414 severity: fields
1415 .remove("severity")
1416 .map(value_to_string)
1417 .and_then(|s| Severity::from_str(s.as_str()))
1418 .unwrap(),
1419 facility: fields
1420 .remove("facility")
1421 .map(value_to_string)
1422 .and_then(|s| Facility::from_str(s.as_str()))
1423 .unwrap(),
1424 version: fields
1425 .remove("version")
1426 .map(value_to_string)
1427 .map(|s| u8::from_str(s.as_str()).unwrap())
1428 .unwrap(),
1429 timestamp: fields.remove("timestamp").map(value_to_string).unwrap(),
1430 host: fields.remove("host").map(value_to_string).unwrap(),
1431 source_type: fields.remove("source_type").map(value_to_string).unwrap(),
1432 appname: fields.remove("appname").map(value_to_string).unwrap(),
1433 procid: fields
1434 .remove("procid")
1435 .map(value_to_string)
1436 .map(|s| usize::from_str(s.as_str()).unwrap())
1437 .unwrap(),
1438 message: fields.remove("message").map(value_to_string).unwrap(),
1439 structured_data: structured_data_from_fields(fields),
1440 }
1441 }
1442 }
1443
1444 fn structured_data_from_fields(fields: ObjectMap) -> StructuredData {
1445 let mut structured_data = StructuredData::default();
1446
1447 for (key, value) in fields.into_iter() {
1448 let subfields = value
1449 .into_object()
1450 .unwrap()
1451 .into_iter()
1452 .map(|(k, v)| (k.into(), value_to_string(v)))
1453 .collect();
1454
1455 structured_data.insert(key.into(), subfields);
1456 }
1457
1458 structured_data
1459 }
1460
1461 #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1462 #[derive(Copy, Clone, Deserialize, PartialEq, Eq, Debug)]
1463 pub enum Severity {
1464 #[serde(rename(deserialize = "emergency"))]
1465 LOG_EMERG,
1466 #[serde(rename(deserialize = "alert"))]
1467 LOG_ALERT,
1468 #[serde(rename(deserialize = "critical"))]
1469 LOG_CRIT,
1470 #[serde(rename(deserialize = "error"))]
1471 LOG_ERR,
1472 #[serde(rename(deserialize = "warn"))]
1473 LOG_WARNING,
1474 #[serde(rename(deserialize = "notice"))]
1475 LOG_NOTICE,
1476 #[serde(rename(deserialize = "info"))]
1477 LOG_INFO,
1478 #[serde(rename(deserialize = "debug"))]
1479 LOG_DEBUG,
1480 }
1481
1482 impl Severity {
1483 fn from_str(s: &str) -> Option<Self> {
1484 match s {
1485 "emergency" => Some(Self::LOG_EMERG),
1486 "alert" => Some(Self::LOG_ALERT),
1487 "critical" => Some(Self::LOG_CRIT),
1488 "error" => Some(Self::LOG_ERR),
1489 "warn" => Some(Self::LOG_WARNING),
1490 "notice" => Some(Self::LOG_NOTICE),
1491 "info" => Some(Self::LOG_INFO),
1492 "debug" => Some(Self::LOG_DEBUG),
1493
1494 x => {
1495 #[allow(clippy::print_stdout)]
1496 {
1497 println!("converting severity str, got {x}");
1498 }
1499 None
1500 }
1501 }
1502 }
1503 }
1504
1505 #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1506 #[derive(Copy, Clone, PartialEq, Eq, Deserialize, Debug)]
1507 pub enum Facility {
1508 #[serde(rename(deserialize = "kernel"))]
1509 LOG_KERN = 0 << 3,
1510 #[serde(rename(deserialize = "user"))]
1511 LOG_USER = 1 << 3,
1512 #[serde(rename(deserialize = "mail"))]
1513 LOG_MAIL = 2 << 3,
1514 #[serde(rename(deserialize = "daemon"))]
1515 LOG_DAEMON = 3 << 3,
1516 #[serde(rename(deserialize = "auth"))]
1517 LOG_AUTH = 4 << 3,
1518 #[serde(rename(deserialize = "syslog"))]
1519 LOG_SYSLOG = 5 << 3,
1520 }
1521
1522 impl Facility {
1523 fn from_str(s: &str) -> Option<Self> {
1524 match s {
1525 "kernel" => Some(Self::LOG_KERN),
1526 "user" => Some(Self::LOG_USER),
1527 "mail" => Some(Self::LOG_MAIL),
1528 "daemon" => Some(Self::LOG_DAEMON),
1529 "auth" => Some(Self::LOG_AUTH),
1530 "syslog" => Some(Self::LOG_SYSLOG),
1531 _ => None,
1532 }
1533 }
1534 }
1535
1536 type StructuredData = HashMap<String, HashMap<String, String>>;
1537
1538 fn random_structured_data(
1539 max_map_size: usize,
1540 max_children: usize,
1541 field_len: usize,
1542 ) -> StructuredData {
1543 let amount = rng().random_range(0..max_children);
1544
1545 random_maps(max_map_size, field_len)
1546 .filter(|m| !m.is_empty()) .take(amount)
1548 .enumerate()
1549 .map(|(i, map)| (format!("id{i}"), map))
1550 .collect()
1551 }
1552
1553 fn format_structured_data_rfc5424(data: &StructuredData) -> String {
1554 if data.is_empty() {
1555 "-".to_string()
1556 } else {
1557 let mut res = String::new();
1558 for (id, params) in data {
1559 res = res + "[" + id;
1560 for (name, value) in params {
1561 res = res + " " + name + "=\"" + value + "\"";
1562 }
1563 res += "]";
1564 }
1565
1566 res
1567 }
1568 }
1569
1570 const fn encode_priority(severity: Severity, facility: Facility) -> u8 {
1571 facility as u8 | severity as u8
1572 }
1573
1574 fn value_to_string(v: Value) -> String {
1575 if v.is_bytes() {
1576 let buf = v.as_bytes().unwrap();
1577 String::from_utf8_lossy(buf).to_string()
1578 } else if v.is_timestamp() {
1579 let ts = v.as_timestamp().unwrap();
1580 ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1581 } else {
1582 v.to_string()
1583 }
1584 }
1585}