#[cfg(unix)]
use std::path::PathBuf;
use std::{net::SocketAddr, time::Duration};
use vector_lib::ipallowlist::IpAllowlistConfig;
use bytes::Bytes;
use chrono::Utc;
use futures::StreamExt;
use listenfd::ListenFd;
use smallvec::SmallVec;
use tokio_util::udp::UdpFramed;
use vector_lib::codecs::{
decoding::{Deserializer, Framer},
BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
};
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, path, OwnedValuePath};
use vrl::event_path;
#[cfg(unix)]
use crate::sources::util::build_unix_stream_source;
use crate::{
codecs::Decoder,
config::{
log_schema, DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput,
},
event::Event,
internal_events::StreamClosedError,
internal_events::{SocketBindError, SocketMode, SocketReceiveError},
net,
shutdown::ShutdownSignal,
sources::util::net::{try_bind_udp_socket, SocketListenAddr, TcpNullAcker, TcpSource},
tcp::TcpKeepaliveConfig,
tls::{MaybeTlsSettings, TlsSourceConfig},
SourceSender,
};
#[configurable_component(source("syslog", "Collect logs sent via Syslog."))]
#[derive(Clone, Debug)]
pub struct SyslogConfig {
#[serde(flatten)]
mode: Mode,
#[serde(default = "crate::serde::default_max_length")]
#[configurable(metadata(docs::type_unit = "bytes"))]
max_length: usize,
host_key: Option<OptionalValuePath>,
#[configurable(metadata(docs::hidden))]
#[serde(default)]
pub log_namespace: Option<bool>,
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "mode", rename_all = "snake_case")]
#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
#[allow(clippy::large_enum_variant)]
pub enum Mode {
Tcp {
#[configurable(derived)]
address: SocketListenAddr,
#[configurable(derived)]
keepalive: Option<TcpKeepaliveConfig>,
#[configurable(derived)]
permit_origin: Option<IpAllowlistConfig>,
#[configurable(derived)]
tls: Option<TlsSourceConfig>,
#[configurable(metadata(docs::type_unit = "bytes"))]
receive_buffer_bytes: Option<usize>,
connection_limit: Option<u32>,
},
Udp {
#[configurable(derived)]
address: SocketListenAddr,
#[configurable(metadata(docs::type_unit = "bytes"))]
receive_buffer_bytes: Option<usize>,
},
#[cfg(unix)]
Unix {
#[configurable(metadata(docs::examples = "/path/to/socket"))]
path: PathBuf,
socket_file_mode: Option<u32>,
},
}
impl SyslogConfig {
#[cfg(test)]
pub fn from_mode(mode: Mode) -> Self {
Self {
mode,
host_key: None,
max_length: crate::serde::default_max_length(),
log_namespace: None,
}
}
}
impl Default for SyslogConfig {
fn default() -> Self {
Self {
mode: Mode::Tcp {
address: SocketListenAddr::SocketAddr("0.0.0.0:514".parse().unwrap()),
keepalive: None,
permit_origin: None,
tls: None,
receive_buffer_bytes: None,
connection_limit: None,
},
host_key: None,
max_length: crate::serde::default_max_length(),
log_namespace: None,
}
}
}
impl GenerateConfig for SyslogConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(SyslogConfig::default()).unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "syslog")]
impl SourceConfig for SyslogConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
let host_key = self
.host_key
.clone()
.and_then(|k| k.path)
.or(log_schema().host_key().cloned());
match self.mode.clone() {
Mode::Tcp {
address,
keepalive,
permit_origin,
tls,
receive_buffer_bytes,
connection_limit,
} => {
let source = SyslogTcpSource {
max_length: self.max_length,
host_key,
log_namespace,
};
let shutdown_secs = Duration::from_secs(30);
let tls_config = tls.as_ref().map(|tls| tls.tls_config.clone());
let tls_client_metadata_key = tls
.as_ref()
.and_then(|tls| tls.client_metadata_key.clone())
.and_then(|k| k.path);
let tls = MaybeTlsSettings::from_config(&tls_config, true)?;
source.run(
address,
keepalive,
shutdown_secs,
tls,
tls_client_metadata_key,
receive_buffer_bytes,
None,
cx,
false.into(),
connection_limit,
permit_origin.map(Into::into),
SyslogConfig::NAME,
log_namespace,
)
}
Mode::Udp {
address,
receive_buffer_bytes,
} => Ok(udp(
address,
self.max_length,
host_key,
receive_buffer_bytes,
cx.shutdown,
log_namespace,
cx.out,
)),
#[cfg(unix)]
Mode::Unix {
path,
socket_file_mode,
} => {
let decoder = Decoder::new(
Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(
self.max_length,
)),
Deserializer::Syslog(
SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
),
);
build_unix_stream_source(
path,
socket_file_mode,
decoder,
move |events, host| handle_events(events, &host_key, host, log_namespace),
cx.shutdown,
cx.out,
)
}
}
}
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let schema_definition = SyslogDeserializerConfig::from_source(SyslogConfig::NAME)
.schema_definition(log_namespace)
.with_standard_vector_source_metadata();
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}
fn resources(&self) -> Vec<Resource> {
match self.mode.clone() {
Mode::Tcp { address, .. } => vec![address.as_tcp_resource()],
Mode::Udp { address, .. } => vec![address.as_udp_resource()],
#[cfg(unix)]
Mode::Unix { .. } => vec![],
}
}
fn can_acknowledge(&self) -> bool {
false
}
}
#[derive(Debug, Clone)]
struct SyslogTcpSource {
max_length: usize,
host_key: Option<OwnedValuePath>,
log_namespace: LogNamespace,
}
impl TcpSource for SyslogTcpSource {
type Error = vector_lib::codecs::decoding::Error;
type Item = SmallVec<[Event; 1]>;
type Decoder = Decoder;
type Acker = TcpNullAcker;
fn decoder(&self) -> Self::Decoder {
Decoder::new(
Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(self.max_length)),
Deserializer::Syslog(SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build()),
)
}
fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
handle_events(
events,
&self.host_key,
Some(host.ip().to_string().into()),
self.log_namespace,
);
}
fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
TcpNullAcker
}
}
pub fn udp(
addr: SocketListenAddr,
_max_length: usize,
host_key: Option<OwnedValuePath>,
receive_buffer_bytes: Option<usize>,
shutdown: ShutdownSignal,
log_namespace: LogNamespace,
mut out: SourceSender,
) -> super::Source {
Box::pin(async move {
let listenfd = ListenFd::from_env();
let socket = try_bind_udp_socket(addr, listenfd).await.map_err(|error| {
emit!(SocketBindError {
mode: SocketMode::Udp,
error: &error,
})
})?;
if let Some(receive_buffer_bytes) = receive_buffer_bytes {
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
}
}
info!(
message = "Listening.",
addr = %addr,
r#type = "udp"
);
let mut stream = UdpFramed::new(
socket,
Decoder::new(
Framer::Bytes(BytesDecoder::new()),
Deserializer::Syslog(
SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
),
),
)
.take_until(shutdown)
.filter_map(|frame| {
let host_key = host_key.clone();
async move {
match frame {
Ok(((mut events, _byte_size), received_from)) => {
let received_from = received_from.ip().to_string().into();
handle_events(&mut events, &host_key, Some(received_from), log_namespace);
Some(events.remove(0))
}
Err(error) => {
emit!(SocketReceiveError {
mode: SocketMode::Udp,
error: &error,
});
None
}
}
}
})
.boxed();
match out.send_event_stream(&mut stream).await {
Ok(()) => {
debug!("Finished sending.");
Ok(())
}
Err(_) => {
let (count, _) = stream.size_hint();
emit!(StreamClosedError { count });
Err(())
}
}
})
}
fn handle_events(
events: &mut [Event],
host_key: &Option<OwnedValuePath>,
default_host: Option<Bytes>,
log_namespace: LogNamespace,
) {
for event in events {
enrich_syslog_event(event, host_key, default_host.clone(), log_namespace);
}
}
fn enrich_syslog_event(
event: &mut Event,
host_key: &Option<OwnedValuePath>,
default_host: Option<Bytes>,
log_namespace: LogNamespace,
) {
let log = event.as_mut_log();
if let Some(default_host) = &default_host {
log_namespace.insert_source_metadata(
SyslogConfig::NAME,
log,
Some(LegacyKey::Overwrite(path!("source_ip"))),
path!("source_ip"),
default_host.clone(),
);
}
let parsed_hostname = log
.get(event_path!("hostname"))
.map(|hostname| hostname.coerce_to_bytes());
if let Some(parsed_host) = parsed_hostname.or(default_host) {
let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
log_namespace.insert_source_metadata(
SyslogConfig::NAME,
log,
legacy_host_key,
path!("host"),
parsed_host,
);
}
log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now());
if log_namespace == LogNamespace::Legacy {
let timestamp = log
.get(event_path!("timestamp"))
.and_then(|timestamp| timestamp.as_timestamp().cloned())
.unwrap_or_else(Utc::now);
log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
}
trace!(
message = "Processing one event.",
event = ?event
);
}
#[cfg(test)]
mod test {
use std::{collections::HashMap, fmt, str::FromStr};
use vector_lib::lookup::{event_path, owned_value_path, OwnedTargetPath};
use chrono::prelude::*;
use rand::{thread_rng, Rng};
use serde::Deserialize;
use tokio::time::{sleep, Duration, Instant};
use tokio_util::codec::BytesCodec;
use vector_lib::assert_event_data_eq;
use vector_lib::codecs::decoding::format::Deserializer;
use vector_lib::lookup::PathPrefix;
use vector_lib::{config::ComponentKey, schema::Definition};
use vrl::value::{kind::Collection, Kind, ObjectMap, Value};
use super::*;
use crate::{
config::log_schema,
event::{Event, LogEvent},
test_util::{
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
next_addr, random_maps, random_string, send_encodable, send_lines, wait_for_tcp,
CountReceiver,
},
};
fn event_from_bytes(
host_key: &str,
default_host: Option<Bytes>,
bytes: Bytes,
log_namespace: LogNamespace,
) -> Option<Event> {
let parser = SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build();
let mut events = parser.parse(bytes, LogNamespace::Legacy).ok()?;
handle_events(
&mut events,
&Some(owned_value_path!(host_key)),
default_host,
log_namespace,
);
Some(events.remove(0))
}
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<SyslogConfig>();
}
#[test]
fn output_schema_definition_vector_namespace() {
let config = SyslogConfig {
log_namespace: Some(true),
..Default::default()
};
let definitions = config
.outputs(LogNamespace::Vector)
.remove(0)
.schema_definition(true);
let expected_definition =
Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
.with_meaning(OwnedTargetPath::event_root(), "message")
.with_metadata_field(
&owned_value_path!("vector", "source_type"),
Kind::bytes(),
None,
)
.with_metadata_field(
&owned_value_path!("vector", "ingest_timestamp"),
Kind::timestamp(),
None,
)
.with_metadata_field(
&owned_value_path!("syslog", "timestamp"),
Kind::timestamp(),
Some("timestamp"),
)
.with_metadata_field(
&owned_value_path!("syslog", "hostname"),
Kind::bytes().or_undefined(),
Some("host"),
)
.with_metadata_field(
&owned_value_path!("syslog", "source_ip"),
Kind::bytes().or_undefined(),
None,
)
.with_metadata_field(
&owned_value_path!("syslog", "severity"),
Kind::bytes().or_undefined(),
Some("severity"),
)
.with_metadata_field(
&owned_value_path!("syslog", "facility"),
Kind::bytes().or_undefined(),
None,
)
.with_metadata_field(
&owned_value_path!("syslog", "version"),
Kind::integer().or_undefined(),
None,
)
.with_metadata_field(
&owned_value_path!("syslog", "appname"),
Kind::bytes().or_undefined(),
Some("service"),
)
.with_metadata_field(
&owned_value_path!("syslog", "msgid"),
Kind::bytes().or_undefined(),
None,
)
.with_metadata_field(
&owned_value_path!("syslog", "procid"),
Kind::integer().or_bytes().or_undefined(),
None,
)
.with_metadata_field(
&owned_value_path!("syslog", "structured_data"),
Kind::object(Collection::from_unknown(Kind::object(
Collection::from_unknown(Kind::bytes()),
))),
None,
)
.with_metadata_field(
&owned_value_path!("syslog", "tls_client_metadata"),
Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
None,
);
assert_eq!(definitions, Some(expected_definition));
}
#[test]
fn output_schema_definition_legacy_namespace() {
let config = SyslogConfig::default();
let definitions = config
.outputs(LogNamespace::Legacy)
.remove(0)
.schema_definition(true);
let expected_definition = Definition::new_with_default_metadata(
Kind::object(Collection::empty()),
[LogNamespace::Legacy],
)
.with_event_field(
&owned_value_path!("message"),
Kind::bytes(),
Some("message"),
)
.with_event_field(
&owned_value_path!("timestamp"),
Kind::timestamp(),
Some("timestamp"),
)
.with_event_field(
&owned_value_path!("hostname"),
Kind::bytes().or_undefined(),
Some("host"),
)
.with_event_field(
&owned_value_path!("source_ip"),
Kind::bytes().or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("severity"),
Kind::bytes().or_undefined(),
Some("severity"),
)
.with_event_field(
&owned_value_path!("facility"),
Kind::bytes().or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("version"),
Kind::integer().or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("appname"),
Kind::bytes().or_undefined(),
Some("service"),
)
.with_event_field(
&owned_value_path!("msgid"),
Kind::bytes().or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("procid"),
Kind::integer().or_bytes().or_undefined(),
None,
)
.unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
.with_standard_vector_source_metadata();
assert_eq!(definitions, Some(expected_definition));
}
#[test]
fn config_tcp() {
let config: SyslogConfig = toml::from_str(
r#"
mode = "tcp"
address = "127.0.0.1:1235"
"#,
)
.unwrap();
assert!(matches!(config.mode, Mode::Tcp { .. }));
}
#[test]
fn config_tcp_with_receive_buffer_size() {
let config: SyslogConfig = toml::from_str(
r#"
mode = "tcp"
address = "127.0.0.1:1235"
receive_buffer_bytes = 256
"#,
)
.unwrap();
let receive_buffer_bytes = match config.mode {
Mode::Tcp {
receive_buffer_bytes,
..
} => receive_buffer_bytes,
_ => panic!("expected Mode::Tcp"),
};
assert_eq!(receive_buffer_bytes, Some(256));
}
#[test]
fn config_tcp_keepalive_empty() {
let config: SyslogConfig = toml::from_str(
r#"
mode = "tcp"
address = "127.0.0.1:1235"
"#,
)
.unwrap();
let keepalive = match config.mode {
Mode::Tcp { keepalive, .. } => keepalive,
_ => panic!("expected Mode::Tcp"),
};
assert_eq!(keepalive, None);
}
#[test]
fn config_tcp_keepalive_full() {
let config: SyslogConfig = toml::from_str(
r#"
mode = "tcp"
address = "127.0.0.1:1235"
keepalive.time_secs = 7200
"#,
)
.unwrap();
let keepalive = match config.mode {
Mode::Tcp { keepalive, .. } => keepalive,
_ => panic!("expected Mode::Tcp"),
};
let keepalive = keepalive.expect("keepalive config not set");
assert_eq!(keepalive.time_secs, Some(7200));
}
#[test]
fn config_udp() {
let config: SyslogConfig = toml::from_str(
r#"
mode = "udp"
address = "127.0.0.1:1235"
max_length = 32187
"#,
)
.unwrap();
assert!(matches!(config.mode, Mode::Udp { .. }));
}
#[test]
fn config_udp_with_receive_buffer_size() {
let config: SyslogConfig = toml::from_str(
r#"
mode = "udp"
address = "127.0.0.1:1235"
max_length = 32187
receive_buffer_bytes = 256
"#,
)
.unwrap();
let receive_buffer_bytes = match config.mode {
Mode::Udp {
receive_buffer_bytes,
..
} => receive_buffer_bytes,
_ => panic!("expected Mode::Udp"),
};
assert_eq!(receive_buffer_bytes, Some(256));
}
#[cfg(unix)]
#[test]
fn config_unix() {
let config: SyslogConfig = toml::from_str(
r#"
mode = "unix"
path = "127.0.0.1:1235"
"#,
)
.unwrap();
assert!(matches!(config.mode, Mode::Unix { .. }));
}
#[cfg(unix)]
#[test]
fn config_unix_permissions() {
let config: SyslogConfig = toml::from_str(
r#"
mode = "unix"
path = "127.0.0.1:1235"
socket_file_mode = 0o777
"#,
)
.unwrap();
let socket_file_mode = match config.mode {
Mode::Unix {
path: _,
socket_file_mode,
} => socket_file_mode,
_ => panic!("expected Mode::Unix"),
};
assert_eq!(socket_file_mode, Some(0o777));
}
#[test]
fn syslog_ng_network_syslog_protocol() {
let msg = "i am foobar";
let raw = format!(
r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {}{} {}"#,
r#"[meta sequenceId="1" sysUpTime="37" language="EN"]"#,
r#"[origin ip="192.168.0.1" software="test"]"#,
msg
);
let mut expected = Event::Log(LogEvent::from(msg));
{
let expected = expected.as_mut_log();
expected.insert(
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
.single()
.expect("invalid timestamp"),
);
expected.insert(
log_schema().source_type_key_target_path().unwrap(),
"syslog",
);
expected.insert("host", "74794bfb6795");
expected.insert("hostname", "74794bfb6795");
expected.insert("meta.sequenceId", "1");
expected.insert("meta.sysUpTime", "37");
expected.insert("meta.language", "EN");
expected.insert("origin.software", "test");
expected.insert("origin.ip", "192.168.0.1");
expected.insert("severity", "notice");
expected.insert("facility", "user");
expected.insert("version", 1);
expected.insert("appname", "root");
expected.insert("procid", 8449);
expected.insert("source_ip", "192.168.0.254");
}
assert_event_data_eq!(
event_from_bytes(
"host",
Some(Bytes::from("192.168.0.254")),
raw.into(),
LogNamespace::Legacy
)
.unwrap(),
expected
);
}
#[test]
fn handles_incorrect_sd_element() {
let msg = "qwerty";
let raw = format!(
r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
r"[incorrect x]", msg
);
let mut expected = Event::Log(LogEvent::from(msg));
{
let expected = expected.as_mut_log();
expected.insert(
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
.single()
.expect("invalid timestamp"),
);
expected.insert(
log_schema().host_key().unwrap().to_string().as_str(),
"74794bfb6795",
);
expected.insert("hostname", "74794bfb6795");
expected.insert(
log_schema().source_type_key_target_path().unwrap(),
"syslog",
);
expected.insert("severity", "notice");
expected.insert("facility", "user");
expected.insert("version", 1);
expected.insert("appname", "root");
expected.insert("procid", 8449);
expected.insert("source_ip", "192.168.0.254");
}
let event = event_from_bytes(
"host",
Some(Bytes::from("192.168.0.254")),
raw.into(),
LogNamespace::Legacy,
)
.unwrap();
assert_event_data_eq!(event, expected);
let raw = format!(
r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
r"[incorrect x=]", msg
);
let event = event_from_bytes(
"host",
Some(Bytes::from("192.168.0.254")),
raw.into(),
LogNamespace::Legacy,
)
.unwrap();
assert_event_data_eq!(event, expected);
}
#[test]
fn handles_empty_sd_element() {
fn there_is_map_called_empty(event: Event) -> bool {
event
.as_log()
.get("empty")
.expect("empty exists")
.is_object()
}
let msg = format!(
r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
r"[empty]"
);
let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
assert!(there_is_map_called_empty(event));
let msg = format!(
r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
r#"[non_empty x="1"][empty]"#
);
let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
assert!(there_is_map_called_empty(event));
let msg = format!(
r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
r#"[empty][non_empty x="1"]"#
);
let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
assert!(there_is_map_called_empty(event));
let msg = format!(
r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
r#"[empty not_really="testing the test"]"#
);
let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
assert!(there_is_map_called_empty(event));
}
#[test]
fn handles_weird_whitespace() {
let raw = r#"
<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
"#;
let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#;
assert_event_data_eq!(
event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap(),
event_from_bytes(
"host",
None,
cleaned.to_owned().into(),
LogNamespace::Legacy
)
.unwrap()
);
}
#[test]
fn handles_dots_in_sdata() {
let raw =
r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin foo.bar="baz"] hello"#;
let event =
event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap();
assert_eq!(
event.as_log().get(r#"origin."foo.bar""#),
Some(&Value::from("baz"))
);
}
#[test]
fn syslog_ng_default_network() {
let msg = "i am foobar";
let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {}"#, msg);
let event = event_from_bytes(
"host",
Some(Bytes::from("192.168.0.254")),
raw.into(),
LogNamespace::Legacy,
)
.unwrap();
let mut expected = Event::Log(LogEvent::from(msg));
{
let value = event.as_log().get("timestamp").unwrap();
let year = value.as_timestamp().unwrap().naive_local().year();
let expected = expected.as_mut_log();
let expected_date: DateTime<Utc> = Local
.with_ymd_and_hms(year, 2, 13, 20, 7, 26)
.single()
.expect("invalid timestamp")
.into();
expected.insert(
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
expected_date,
);
expected.insert(
log_schema().host_key().unwrap().to_string().as_str(),
"74794bfb6795",
);
expected.insert(
log_schema().source_type_key_target_path().unwrap(),
"syslog",
);
expected.insert("hostname", "74794bfb6795");
expected.insert("severity", "notice");
expected.insert("facility", "user");
expected.insert("appname", "root");
expected.insert("procid", 8539);
expected.insert("source_ip", "192.168.0.254");
}
assert_event_data_eq!(event, expected);
}
#[test]
fn rsyslog_omfwd_tcp_default() {
let msg = "start";
let raw = format!(
r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {}"#,
msg
);
let event = event_from_bytes(
"host",
Some(Bytes::from("192.168.0.254")),
raw.into(),
LogNamespace::Legacy,
)
.unwrap();
let mut expected = Event::Log(LogEvent::from(msg));
{
let value = event.as_log().get("timestamp").unwrap();
let year = value.as_timestamp().unwrap().naive_local().year();
let expected = expected.as_mut_log();
let expected_date: DateTime<Utc> = Local
.with_ymd_and_hms(year, 2, 13, 21, 31, 56)
.single()
.expect("invalid timestamp")
.into();
expected.insert(
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
expected_date,
);
expected.insert(
log_schema().source_type_key_target_path().unwrap(),
"syslog",
);
expected.insert("host", "74794bfb6795");
expected.insert("hostname", "74794bfb6795");
expected.insert("severity", "info");
expected.insert("facility", "local7");
expected.insert("appname", "liblogging-stdlog");
expected.insert("origin.software", "rsyslogd");
expected.insert("origin.swVersion", "8.24.0");
expected.insert("source_ip", "192.168.0.254");
expected.insert(event_path!("origin", "x-pid"), "8979");
expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
}
assert_event_data_eq!(event, expected);
}
#[test]
fn rsyslog_omfwd_tcp_forward_format() {
let msg = "start";
let raw = format!(
r#"<190>2019-02-13T21:53:30.605850+00:00 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="9043" x-info="http://www.rsyslog.com"] {}"#,
msg
);
let mut expected = Event::Log(LogEvent::from(msg));
{
let expected = expected.as_mut_log();
expected.insert(
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
Utc.with_ymd_and_hms(2019, 2, 13, 21, 53, 30)
.single()
.and_then(|t| t.with_nanosecond(605_850 * 1000))
.expect("invalid timestamp"),
);
expected.insert(
log_schema().source_type_key_target_path().unwrap(),
"syslog",
);
expected.insert("host", "74794bfb6795");
expected.insert("hostname", "74794bfb6795");
expected.insert("severity", "info");
expected.insert("facility", "local7");
expected.insert("appname", "liblogging-stdlog");
expected.insert("origin.software", "rsyslogd");
expected.insert("origin.swVersion", "8.24.0");
expected.insert(event_path!("origin", "x-pid"), "9043");
expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
}
assert_event_data_eq!(
event_from_bytes("host", None, raw.into(), LogNamespace::Legacy).unwrap(),
expected
);
}
#[tokio::test]
async fn test_tcp_syslog() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let num_messages: usize = 10000;
let in_addr = next_addr();
let config = SyslogConfig::from_mode(Mode::Tcp {
address: in_addr.into(),
permit_origin: None,
keepalive: None,
tls: None,
receive_buffer_bytes: None,
connection_limit: None,
});
let key = ComponentKey::from("in");
let (tx, rx) = SourceSender::new_test();
let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
let shutdown_complete = shutdown.shutdown_tripwire();
let source = config
.build(context)
.await
.expect("source should not fail to build");
tokio::spawn(source);
wait_for_tcp(in_addr).await;
let output_events = CountReceiver::receive_events(rx);
let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
.map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
.collect();
let input_lines: Vec<String> =
input_messages.iter().map(|msg| msg.to_string()).collect();
send_lines(in_addr, input_lines).await.unwrap();
sleep(Duration::from_secs(2)).await;
shutdown
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;
let output_events = output_events.await;
assert_eq!(output_events.len(), num_messages);
let output_messages: Vec<SyslogMessageRfc5424> = output_events
.into_iter()
.map(|mut e| {
e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
})
.collect();
assert_eq!(output_messages, input_messages);
})
.await;
}
#[cfg(unix)]
#[tokio::test]
async fn test_unix_stream_syslog() {
use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
use futures_util::{stream, SinkExt};
use std::os::unix::net::UnixStream as StdUnixStream;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio_util::codec::{FramedWrite, LinesCodec};
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let num_messages: usize = 1;
let in_path = tempfile::tempdir().unwrap().into_path().join("stream_test");
let config = SyslogConfig::from_mode(Mode::Unix {
path: in_path.clone(),
socket_file_mode: None,
});
let key = ComponentKey::from("in");
let (tx, rx) = SourceSender::new_test();
let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
let shutdown_complete = shutdown.shutdown_tripwire();
let source = config
.build(context)
.await
.expect("source should not fail to build");
tokio::spawn(source);
while StdUnixStream::connect(&in_path).is_err() {
tokio::task::yield_now().await;
}
let output_events = CountReceiver::receive_events(rx);
let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
.map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
.collect();
let stream = UnixStream::connect(&in_path).await.unwrap();
let mut sink = FramedWrite::new(stream, LinesCodec::new());
let lines: Vec<String> = input_messages.iter().map(|msg| msg.to_string()).collect();
let mut lines = stream::iter(lines).map(Ok);
sink.send_all(&mut lines).await.unwrap();
let stream = sink.get_mut();
stream.shutdown().await.unwrap();
sleep(Duration::from_secs(1)).await;
shutdown
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;
let output_events = output_events.await;
assert_eq!(output_events.len(), num_messages);
let output_messages: Vec<SyslogMessageRfc5424> = output_events
.into_iter()
.map(|mut e| {
e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
})
.collect();
assert_eq!(output_messages, input_messages);
})
.await;
}
#[tokio::test]
async fn test_octet_counting_syslog() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let num_messages: usize = 10000;
let in_addr = next_addr();
let config = SyslogConfig::from_mode(Mode::Tcp {
address: in_addr.into(),
permit_origin: None,
keepalive: None,
tls: None,
receive_buffer_bytes: None,
connection_limit: None,
});
let key = ComponentKey::from("in");
let (tx, rx) = SourceSender::new_test();
let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
let shutdown_complete = shutdown.shutdown_tripwire();
let source = config
.build(context)
.await
.expect("source should not fail to build");
tokio::spawn(source);
wait_for_tcp(in_addr).await;
let output_events = CountReceiver::receive_events(rx);
let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
.map(|i| {
let mut msg = SyslogMessageRfc5424::random(i, 30, 4, 3, 3);
msg.message.push('\n');
msg.message.push_str(&random_string(30));
msg
})
.collect();
let codec = BytesCodec::new();
let input_lines: Vec<Bytes> = input_messages
.iter()
.map(|msg| {
let s = msg.to_string();
format!("{} {}", s.len(), s).into()
})
.collect();
send_encodable(in_addr, codec, input_lines).await.unwrap();
sleep(Duration::from_secs(2)).await;
shutdown
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;
let output_events = output_events.await;
assert_eq!(output_events.len(), num_messages);
let output_messages: Vec<SyslogMessageRfc5424> = output_events
.into_iter()
.map(|mut e| {
e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
})
.collect();
assert_eq!(output_messages, input_messages);
})
.await;
}
#[derive(Deserialize, PartialEq, Clone, Debug)]
struct SyslogMessageRfc5424 {
msgid: String,
severity: Severity,
facility: Facility,
version: u8,
timestamp: String,
host: String,
source_type: String,
appname: String,
procid: usize,
message: String,
#[serde(flatten)]
structured_data: StructuredData,
}
impl SyslogMessageRfc5424 {
fn random(
id: usize,
msg_len: usize,
field_len: usize,
max_map_size: usize,
max_children: usize,
) -> Self {
let msg = random_string(msg_len);
let structured_data = random_structured_data(max_map_size, max_children, field_len);
let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
Self {
msgid: format!("test{}", id),
severity: Severity::LOG_INFO,
facility: Facility::LOG_USER,
version: 1,
timestamp,
host: "hogwarts".to_owned(),
source_type: "syslog".to_owned(),
appname: "harry".to_owned(),
procid: thread_rng().gen_range(0..32768),
structured_data,
message: msg,
}
}
}
impl fmt::Display for SyslogMessageRfc5424 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"<{}>{} {} {} {} {} {} {} {}",
encode_priority(self.severity, self.facility),
self.version,
self.timestamp,
self.host,
self.appname,
self.procid,
self.msgid,
format_structured_data_rfc5424(&self.structured_data),
self.message
)
}
}
impl From<Event> for SyslogMessageRfc5424 {
fn from(e: Event) -> Self {
let (value, _) = e.into_log().into_parts();
let mut fields = value.into_object().unwrap();
Self {
msgid: fields.remove("msgid").map(value_to_string).unwrap(),
severity: fields
.remove("severity")
.map(value_to_string)
.and_then(|s| Severity::from_str(s.as_str()))
.unwrap(),
facility: fields
.remove("facility")
.map(value_to_string)
.and_then(|s| Facility::from_str(s.as_str()))
.unwrap(),
version: fields
.remove("version")
.map(value_to_string)
.map(|s| u8::from_str(s.as_str()).unwrap())
.unwrap(),
timestamp: fields.remove("timestamp").map(value_to_string).unwrap(),
host: fields.remove("host").map(value_to_string).unwrap(),
source_type: fields.remove("source_type").map(value_to_string).unwrap(),
appname: fields.remove("appname").map(value_to_string).unwrap(),
procid: fields
.remove("procid")
.map(value_to_string)
.map(|s| usize::from_str(s.as_str()).unwrap())
.unwrap(),
message: fields.remove("message").map(value_to_string).unwrap(),
structured_data: structured_data_from_fields(fields),
}
}
}
fn structured_data_from_fields(fields: ObjectMap) -> StructuredData {
let mut structured_data = StructuredData::default();
for (key, value) in fields.into_iter() {
let subfields = value
.into_object()
.unwrap()
.into_iter()
.map(|(k, v)| (k.into(), value_to_string(v)))
.collect();
structured_data.insert(key.into(), subfields);
}
structured_data
}
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
#[derive(Copy, Clone, Deserialize, PartialEq, Eq, Debug)]
pub enum Severity {
#[serde(rename(deserialize = "emergency"))]
LOG_EMERG,
#[serde(rename(deserialize = "alert"))]
LOG_ALERT,
#[serde(rename(deserialize = "critical"))]
LOG_CRIT,
#[serde(rename(deserialize = "error"))]
LOG_ERR,
#[serde(rename(deserialize = "warn"))]
LOG_WARNING,
#[serde(rename(deserialize = "notice"))]
LOG_NOTICE,
#[serde(rename(deserialize = "info"))]
LOG_INFO,
#[serde(rename(deserialize = "debug"))]
LOG_DEBUG,
}
impl Severity {
fn from_str(s: &str) -> Option<Self> {
match s {
"emergency" => Some(Self::LOG_EMERG),
"alert" => Some(Self::LOG_ALERT),
"critical" => Some(Self::LOG_CRIT),
"error" => Some(Self::LOG_ERR),
"warn" => Some(Self::LOG_WARNING),
"notice" => Some(Self::LOG_NOTICE),
"info" => Some(Self::LOG_INFO),
"debug" => Some(Self::LOG_DEBUG),
x => {
#[allow(clippy::print_stdout)]
{
println!("converting severity str, got {}", x);
}
None
}
}
}
}
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
#[derive(Copy, Clone, PartialEq, Eq, Deserialize, Debug)]
pub enum Facility {
#[serde(rename(deserialize = "kernel"))]
LOG_KERN = 0 << 3,
#[serde(rename(deserialize = "user"))]
LOG_USER = 1 << 3,
#[serde(rename(deserialize = "mail"))]
LOG_MAIL = 2 << 3,
#[serde(rename(deserialize = "daemon"))]
LOG_DAEMON = 3 << 3,
#[serde(rename(deserialize = "auth"))]
LOG_AUTH = 4 << 3,
#[serde(rename(deserialize = "syslog"))]
LOG_SYSLOG = 5 << 3,
}
impl Facility {
fn from_str(s: &str) -> Option<Self> {
match s {
"kernel" => Some(Self::LOG_KERN),
"user" => Some(Self::LOG_USER),
"mail" => Some(Self::LOG_MAIL),
"daemon" => Some(Self::LOG_DAEMON),
"auth" => Some(Self::LOG_AUTH),
"syslog" => Some(Self::LOG_SYSLOG),
_ => None,
}
}
}
type StructuredData = HashMap<String, HashMap<String, String>>;
fn random_structured_data(
max_map_size: usize,
max_children: usize,
field_len: usize,
) -> StructuredData {
let amount = thread_rng().gen_range(0..max_children);
random_maps(max_map_size, field_len)
.filter(|m| !m.is_empty()) .take(amount)
.enumerate()
.map(|(i, map)| (format!("id{}", i), map))
.collect()
}
fn format_structured_data_rfc5424(data: &StructuredData) -> String {
if data.is_empty() {
"-".to_string()
} else {
let mut res = String::new();
for (id, params) in data {
res = res + "[" + id;
for (name, value) in params {
res = res + " " + name + "=\"" + value + "\"";
}
res += "]";
}
res
}
}
const fn encode_priority(severity: Severity, facility: Facility) -> u8 {
facility as u8 | severity as u8
}
fn value_to_string(v: Value) -> String {
if v.is_bytes() {
let buf = v.as_bytes().unwrap();
String::from_utf8_lossy(buf).to_string()
} else if v.is_timestamp() {
let ts = v.as_timestamp().unwrap();
ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
} else {
v.to_string()
}
}
}