use chrono::Utc;
use fakedata::logs::*;
use futures::StreamExt;
use rand::seq::SliceRandom;
use serde_with::serde_as;
use snafu::Snafu;
use std::task::Poll;
use tokio::time::{self, Duration};
use tokio_util::codec::FramedRead;
use vector_lib::codecs::{
decoding::{DeserializerConfig, FramingConfig},
StreamDecodingError,
};
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{
ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
};
use vector_lib::lookup::{owned_value_path, path};
use vector_lib::{
config::{LegacyKey, LogNamespace},
EstimatedJsonEncodedSizeOf,
};
use vrl::value::Kind;
use crate::{
codecs::{Decoder, DecodingConfig},
config::{SourceConfig, SourceContext, SourceOutput},
internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
serde::{default_decoding, default_framing_message_based},
shutdown::ShutdownSignal,
SourceSender,
};
#[serde_as]
#[configurable_component(source(
"demo_logs",
"Generate fake log events, which can be useful for testing and demos."
))]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
pub struct DemoLogsConfig {
#[serde(alias = "batch_interval")]
#[derivative(Default(value = "default_interval()"))]
#[serde(default = "default_interval")]
#[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
pub interval: Duration,
#[derivative(Default(value = "default_count()"))]
#[serde(default = "default_count")]
pub count: usize,
#[serde(flatten)]
#[configurable(metadata(
docs::enum_tag_description = "The format of the randomly generated output."
))]
pub format: OutputFormat,
#[configurable(derived)]
#[derivative(Default(value = "default_framing_message_based()"))]
#[serde(default = "default_framing_message_based")]
pub framing: FramingConfig,
#[configurable(derived)]
#[derivative(Default(value = "default_decoding()"))]
#[serde(default = "default_decoding")]
pub decoding: DeserializerConfig,
#[serde(default)]
#[configurable(metadata(docs::hidden))]
pub log_namespace: Option<bool>,
}
const fn default_interval() -> Duration {
Duration::from_secs(1)
}
const fn default_count() -> usize {
isize::MAX as usize
}
#[derive(Debug, PartialEq, Eq, Snafu)]
pub enum DemoLogsConfigError {
#[snafu(display("A non-empty list of lines is required for the shuffle format"))]
ShuffleDemoLogsItemsEmpty,
}
#[configurable_component]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(tag = "format", rename_all = "snake_case")]
#[configurable(metadata(
docs::enum_tag_description = "The format of the randomly generated output."
))]
pub enum OutputFormat {
Shuffle {
#[serde(default)]
sequence: bool,
#[configurable(metadata(docs::examples = "lines_example()"))]
lines: Vec<String>,
},
ApacheCommon,
ApacheError,
#[serde(alias = "rfc5424")]
Syslog,
#[serde(alias = "rfc3164")]
BsdSyslog,
#[derivative(Default)]
Json,
}
const fn lines_example() -> [&'static str; 2] {
["line1", "line2"]
}
impl OutputFormat {
fn generate_line(&self, n: usize) -> String {
emit!(DemoLogsEventProcessed);
match self {
Self::Shuffle {
sequence,
ref lines,
} => Self::shuffle_generate(*sequence, lines, n),
Self::ApacheCommon => apache_common_log_line(),
Self::ApacheError => apache_error_log_line(),
Self::Syslog => syslog_5424_log_line(),
Self::BsdSyslog => syslog_3164_log_line(),
Self::Json => json_log_line(),
}
}
fn shuffle_generate(sequence: bool, lines: &[String], n: usize) -> String {
let line = lines.choose(&mut rand::thread_rng()).unwrap();
if sequence {
format!("{} {}", n, line)
} else {
line.into()
}
}
pub(self) fn validate(&self) -> Result<(), DemoLogsConfigError> {
match self {
Self::Shuffle { lines, .. } => {
if lines.is_empty() {
Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
} else {
Ok(())
}
}
_ => Ok(()),
}
}
}
impl DemoLogsConfig {
#[cfg(test)]
pub fn repeat(
lines: Vec<String>,
count: usize,
interval: Duration,
log_namespace: Option<bool>,
) -> Self {
Self {
count,
interval,
format: OutputFormat::Shuffle {
lines,
sequence: false,
},
framing: default_framing_message_based(),
decoding: default_decoding(),
log_namespace,
}
}
}
async fn demo_logs_source(
interval: Duration,
count: usize,
format: OutputFormat,
decoder: Decoder,
mut shutdown: ShutdownSignal,
mut out: SourceSender,
log_namespace: LogNamespace,
) -> Result<(), ()> {
let interval: Option<Duration> = (interval != Duration::ZERO).then_some(interval);
let mut interval = interval.map(time::interval);
let bytes_received = register!(BytesReceived::from(Protocol::NONE));
let events_received = register!(EventsReceived);
for n in 0..count {
if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
break;
}
if let Some(interval) = &mut interval {
interval.tick().await;
}
bytes_received.emit(ByteSize(0));
let line = format.generate_line(n);
let mut stream = FramedRead::new(line.as_bytes(), decoder.clone());
while let Some(next) = stream.next().await {
match next {
Ok((events, _byte_size)) => {
let count = events.len();
let byte_size = events.estimated_json_encoded_size_of();
events_received.emit(CountByteSize(count, byte_size));
let now = Utc::now();
let events = events.into_iter().map(|mut event| {
let log = event.as_mut_log();
log_namespace.insert_standard_vector_source_metadata(
log,
DemoLogsConfig::NAME,
now,
);
log_namespace.insert_source_metadata(
DemoLogsConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!("service"))),
path!("service"),
"vector",
);
log_namespace.insert_source_metadata(
DemoLogsConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!("host"))),
path!("host"),
"localhost",
);
event
});
out.send_batch(events).await.map_err(|_| {
emit!(StreamClosedError { count });
})?;
}
Err(error) => {
if !error.can_continue() {
break;
}
}
}
}
}
Ok(())
}
impl_generate_config_from_default!(DemoLogsConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "demo_logs")]
impl SourceConfig for DemoLogsConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
self.format.validate()?;
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;
Ok(Box::pin(demo_logs_source(
self.interval,
self.count,
self.format.clone(),
decoder,
cx.shutdown,
cx.out,
log_namespace,
)))
}
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let schema_definition = self
.decoding
.schema_definition(log_namespace)
.with_standard_vector_source_metadata()
.with_source_metadata(
DemoLogsConfig::NAME,
Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
&owned_value_path!("service"),
Kind::bytes(),
Some("service"),
);
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
}
fn can_acknowledge(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use futures::{poll, Stream, StreamExt};
use super::*;
use crate::{
config::log_schema,
event::Event,
shutdown::ShutdownSignal,
test_util::components::{assert_source_compliance, SOURCE_TAGS},
SourceSender,
};
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<DemoLogsConfig>();
}
async fn runit(config: &str) -> impl Stream<Item = Event> {
assert_source_compliance(&SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let config: DemoLogsConfig = toml::from_str(config).unwrap();
let decoder = DecodingConfig::new(
default_framing_message_based(),
default_decoding(),
LogNamespace::Legacy,
)
.build()
.unwrap();
demo_logs_source(
config.interval,
config.count,
config.format,
decoder,
ShutdownSignal::noop(),
tx,
LogNamespace::Legacy,
)
.await
.unwrap();
rx
})
.await
}
#[test]
fn config_shuffle_lines_not_empty() {
let empty_lines: Vec<String> = Vec::new();
let errant_config = DemoLogsConfig {
format: OutputFormat::Shuffle {
sequence: false,
lines: empty_lines,
},
..DemoLogsConfig::default()
};
assert_eq!(
errant_config.format.validate(),
Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
);
}
#[tokio::test]
async fn shuffle_demo_logs_copies_lines() {
let message_key = log_schema().message_key().unwrap().to_string();
let mut rx = runit(
r#"format = "shuffle"
lines = ["one", "two", "three", "four"]
count = 5"#,
)
.await;
let lines = &["one", "two", "three", "four"];
for _ in 0..5 {
let event = match poll!(rx.next()) {
Poll::Ready(event) => event.unwrap(),
_ => unreachable!(),
};
let log = event.as_log();
let message = log[&message_key].to_string_lossy();
assert!(lines.contains(&&*message));
}
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}
#[tokio::test]
async fn shuffle_demo_logs_limits_count() {
let mut rx = runit(
r#"format = "shuffle"
lines = ["one", "two"]
count = 5"#,
)
.await;
for _ in 0..5 {
assert!(poll!(rx.next()).is_ready());
}
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}
#[tokio::test]
async fn shuffle_demo_logs_adds_sequence() {
let message_key = log_schema().message_key().unwrap().to_string();
let mut rx = runit(
r#"format = "shuffle"
lines = ["one", "two"]
sequence = true
count = 5"#,
)
.await;
for n in 0..5 {
let event = match poll!(rx.next()) {
Poll::Ready(event) => event.unwrap(),
_ => unreachable!(),
};
let log = event.as_log();
let message = log[&message_key].to_string_lossy();
assert!(message.starts_with(&n.to_string()));
}
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}
#[tokio::test]
async fn shuffle_demo_logs_obeys_interval() {
let start = Instant::now();
let mut rx = runit(
r#"format = "shuffle"
lines = ["one", "two"]
count = 3
interval = 1.0"#,
)
.await;
for _ in 0..3 {
assert!(poll!(rx.next()).is_ready());
}
assert_eq!(poll!(rx.next()), Poll::Ready(None));
let duration = start.elapsed();
assert!(duration >= Duration::from_secs(2));
}
#[tokio::test]
async fn host_is_set() {
let host_key = log_schema().host_key().unwrap().to_string();
let mut rx = runit(
r#"format = "syslog"
count = 5"#,
)
.await;
let event = match poll!(rx.next()) {
Poll::Ready(event) => event.unwrap(),
_ => unreachable!(),
};
let log = event.as_log();
let host = log[&host_key].to_string_lossy();
assert_eq!("localhost", host);
}
#[tokio::test]
async fn apache_common_format_generates_output() {
let mut rx = runit(
r#"format = "apache_common"
count = 5"#,
)
.await;
for _ in 0..5 {
assert!(poll!(rx.next()).is_ready());
}
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}
#[tokio::test]
async fn apache_error_format_generates_output() {
let mut rx = runit(
r#"format = "apache_error"
count = 5"#,
)
.await;
for _ in 0..5 {
assert!(poll!(rx.next()).is_ready());
}
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}
#[tokio::test]
async fn syslog_5424_format_generates_output() {
let mut rx = runit(
r#"format = "syslog"
count = 5"#,
)
.await;
for _ in 0..5 {
assert!(poll!(rx.next()).is_ready());
}
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}
#[tokio::test]
async fn syslog_3164_format_generates_output() {
let mut rx = runit(
r#"format = "bsd_syslog"
count = 5"#,
)
.await;
for _ in 0..5 {
assert!(poll!(rx.next()).is_ready());
}
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}
#[tokio::test]
async fn json_format_generates_output() {
let message_key = log_schema().message_key().unwrap().to_string();
let mut rx = runit(
r#"format = "json"
count = 5"#,
)
.await;
for _ in 0..5 {
let event = match poll!(rx.next()) {
Poll::Ready(event) => event.unwrap(),
_ => unreachable!(),
};
let log = event.as_log();
let message = log[&message_key].to_string_lossy();
assert!(serde_json::from_str::<serde_json::Value>(&message).is_ok());
}
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}
}