1use chrono::Utc;
2use fakedata::logs::*;
3use futures::StreamExt;
4use rand::prelude::IndexedRandom;
5use serde_with::serde_as;
6use snafu::Snafu;
7use std::task::Poll;
8use tokio::time::{self, Duration};
9use tokio_util::codec::FramedRead;
10use vector_lib::configurable::configurable_component;
11use vector_lib::internal_event::{
12 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
13};
14use vector_lib::lookup::{owned_value_path, path};
15use vector_lib::{
16 codecs::{
17 decoding::{DeserializerConfig, FramingConfig},
18 StreamDecodingError,
19 },
20 config::DataType,
21};
22use vector_lib::{
23 config::{LegacyKey, LogNamespace},
24 EstimatedJsonEncodedSizeOf,
25};
26use vrl::value::Kind;
27
28use crate::{
29 codecs::{Decoder, DecodingConfig},
30 config::{SourceConfig, SourceContext, SourceOutput},
31 internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
32 serde::{default_decoding, default_framing_message_based},
33 shutdown::ShutdownSignal,
34 SourceSender,
35};
36
37#[serde_as]
39#[configurable_component(source(
40 "demo_logs",
41 "Generate fake log events, which can be useful for testing and demos."
42))]
43#[derive(Clone, Debug, Derivative)]
44#[derivative(Default)]
45pub struct DemoLogsConfig {
46 #[serde(alias = "batch_interval")]
51 #[derivative(Default(value = "default_interval()"))]
52 #[serde(default = "default_interval")]
53 #[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
54 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
55 pub interval: Duration,
56
57 #[derivative(Default(value = "default_count()"))]
61 #[serde(default = "default_count")]
62 pub count: usize,
63
64 #[serde(flatten)]
65 #[configurable(metadata(
66 docs::enum_tag_description = "The format of the randomly generated output."
67 ))]
68 pub format: OutputFormat,
69
70 #[configurable(derived)]
71 #[derivative(Default(value = "default_framing_message_based()"))]
72 #[serde(default = "default_framing_message_based")]
73 pub framing: FramingConfig,
74
75 #[configurable(derived)]
76 #[derivative(Default(value = "default_decoding()"))]
77 #[serde(default = "default_decoding")]
78 pub decoding: DeserializerConfig,
79
80 #[serde(default)]
82 #[configurable(metadata(docs::hidden))]
83 pub log_namespace: Option<bool>,
84}
85
86const fn default_interval() -> Duration {
87 Duration::from_secs(1)
88}
89
90const fn default_count() -> usize {
91 isize::MAX as usize
92}
93
94#[derive(Debug, PartialEq, Eq, Snafu)]
95pub enum DemoLogsConfigError {
96 #[snafu(display("A non-empty list of lines is required for the shuffle format"))]
97 ShuffleDemoLogsItemsEmpty,
98}
99
100#[configurable_component]
102#[derive(Clone, Debug, Derivative)]
103#[derivative(Default)]
104#[serde(tag = "format", rename_all = "snake_case")]
105#[configurable(metadata(
106 docs::enum_tag_description = "The format of the randomly generated output."
107))]
108pub enum OutputFormat {
109 Shuffle {
111 #[serde(default)]
113 sequence: bool,
114 #[configurable(metadata(docs::examples = "lines_example()"))]
116 lines: Vec<String>,
117 },
118
119 ApacheCommon,
123
124 ApacheError,
128
129 #[serde(alias = "rfc5424")]
133 Syslog,
134
135 #[serde(alias = "rfc3164")]
139 BsdSyslog,
140
141 #[derivative(Default)]
145 Json,
146}
147
148const fn lines_example() -> [&'static str; 2] {
149 ["line1", "line2"]
150}
151
152impl OutputFormat {
153 fn generate_line(&self, n: usize) -> String {
154 emit!(DemoLogsEventProcessed);
155
156 match self {
157 Self::Shuffle { sequence, lines } => Self::shuffle_generate(*sequence, lines, n),
158 Self::ApacheCommon => apache_common_log_line(),
159 Self::ApacheError => apache_error_log_line(),
160 Self::Syslog => syslog_5424_log_line(),
161 Self::BsdSyslog => syslog_3164_log_line(),
162 Self::Json => json_log_line(),
163 }
164 }
165
166 fn shuffle_generate(sequence: bool, lines: &[String], n: usize) -> String {
167 let line = lines.choose(&mut rand::rng()).unwrap();
169
170 if sequence {
171 format!("{n} {line}")
172 } else {
173 line.into()
174 }
175 }
176
177 pub(self) fn validate(&self) -> Result<(), DemoLogsConfigError> {
179 match self {
180 Self::Shuffle { lines, .. } => {
181 if lines.is_empty() {
182 Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
183 } else {
184 Ok(())
185 }
186 }
187 _ => Ok(()),
188 }
189 }
190}
191
192impl DemoLogsConfig {
193 #[cfg(test)]
194 pub fn repeat(
195 lines: Vec<String>,
196 count: usize,
197 interval: Duration,
198 log_namespace: Option<bool>,
199 ) -> Self {
200 Self {
201 count,
202 interval,
203 format: OutputFormat::Shuffle {
204 lines,
205 sequence: false,
206 },
207 framing: default_framing_message_based(),
208 decoding: default_decoding(),
209 log_namespace,
210 }
211 }
212}
213
214async fn demo_logs_source(
215 interval: Duration,
216 count: usize,
217 format: OutputFormat,
218 decoder: Decoder,
219 mut shutdown: ShutdownSignal,
220 mut out: SourceSender,
221 log_namespace: LogNamespace,
222) -> Result<(), ()> {
223 let interval: Option<Duration> = (interval != Duration::ZERO).then_some(interval);
224 let mut interval = interval.map(time::interval);
225
226 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
227 let events_received = register!(EventsReceived);
228
229 for n in 0..count {
230 if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
231 break;
232 }
233
234 if let Some(interval) = &mut interval {
235 interval.tick().await;
236 }
237 bytes_received.emit(ByteSize(0));
238
239 let line = format.generate_line(n);
240
241 let mut stream = FramedRead::new(line.as_bytes(), decoder.clone());
242 while let Some(next) = stream.next().await {
243 match next {
244 Ok((events, _byte_size)) => {
245 let count = events.len();
246 let byte_size = events.estimated_json_encoded_size_of();
247 events_received.emit(CountByteSize(count, byte_size));
248 let now = Utc::now();
249
250 let events = events.into_iter().map(|mut event| {
251 let log = event.as_mut_log();
252 log_namespace.insert_standard_vector_source_metadata(
253 log,
254 DemoLogsConfig::NAME,
255 now,
256 );
257 log_namespace.insert_source_metadata(
258 DemoLogsConfig::NAME,
259 log,
260 Some(LegacyKey::InsertIfEmpty(path!("service"))),
261 path!("service"),
262 "vector",
263 );
264 log_namespace.insert_source_metadata(
265 DemoLogsConfig::NAME,
266 log,
267 Some(LegacyKey::InsertIfEmpty(path!("host"))),
268 path!("host"),
269 "localhost",
270 );
271
272 event
273 });
274 out.send_batch(events).await.map_err(|_| {
275 emit!(StreamClosedError { count });
276 })?;
277 }
278 Err(error) => {
279 if !error.can_continue() {
282 break;
283 }
284 }
285 }
286 }
287 }
288
289 Ok(())
290}
291
292impl_generate_config_from_default!(DemoLogsConfig);
293
294#[async_trait::async_trait]
295#[typetag::serde(name = "demo_logs")]
296impl SourceConfig for DemoLogsConfig {
297 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
298 let log_namespace = cx.log_namespace(self.log_namespace);
299
300 self.format.validate()?;
301 let decoder =
302 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
303 .build()?;
304 Ok(Box::pin(demo_logs_source(
305 self.interval,
306 self.count,
307 self.format.clone(),
308 decoder,
309 cx.shutdown,
310 cx.out,
311 log_namespace,
312 )))
313 }
314
315 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
316 let log_namespace = global_log_namespace.merge(self.log_namespace);
319
320 let schema_definition = self
321 .decoding
322 .schema_definition(log_namespace)
323 .with_standard_vector_source_metadata()
324 .with_source_metadata(
325 DemoLogsConfig::NAME,
326 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
327 &owned_value_path!("service"),
328 Kind::bytes(),
329 Some("service"),
330 );
331
332 vec![SourceOutput::new_maybe_logs(
333 DataType::Log,
334 schema_definition,
335 )]
336 }
337
338 fn can_acknowledge(&self) -> bool {
339 false
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use std::time::{Duration, Instant};
346
347 use futures::{poll, Stream, StreamExt};
348
349 use super::*;
350 use crate::{
351 config::log_schema,
352 event::Event,
353 shutdown::ShutdownSignal,
354 test_util::components::{assert_source_compliance, SOURCE_TAGS},
355 SourceSender,
356 };
357
358 #[test]
359 fn generate_config() {
360 crate::test_util::test_generate_config::<DemoLogsConfig>();
361 }
362
363 async fn runit(config: &str) -> impl Stream<Item = Event> + use<> {
364 assert_source_compliance(&SOURCE_TAGS, async {
365 let (tx, rx) = SourceSender::new_test();
366 let config: DemoLogsConfig = toml::from_str(config).unwrap();
367 let decoder = DecodingConfig::new(
368 default_framing_message_based(),
369 default_decoding(),
370 LogNamespace::Legacy,
371 )
372 .build()
373 .unwrap();
374 demo_logs_source(
375 config.interval,
376 config.count,
377 config.format,
378 decoder,
379 ShutdownSignal::noop(),
380 tx,
381 LogNamespace::Legacy,
382 )
383 .await
384 .unwrap();
385
386 rx
387 })
388 .await
389 }
390
391 #[test]
392 fn config_shuffle_lines_not_empty() {
393 let empty_lines: Vec<String> = Vec::new();
394
395 let errant_config = DemoLogsConfig {
396 format: OutputFormat::Shuffle {
397 sequence: false,
398 lines: empty_lines,
399 },
400 ..DemoLogsConfig::default()
401 };
402
403 assert_eq!(
404 errant_config.format.validate(),
405 Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
406 );
407 }
408
409 #[tokio::test]
410 async fn shuffle_demo_logs_copies_lines() {
411 let message_key = log_schema().message_key().unwrap().to_string();
412 let mut rx = runit(
413 r#"format = "shuffle"
414 lines = ["one", "two", "three", "four"]
415 count = 5"#,
416 )
417 .await;
418
419 let lines = &["one", "two", "three", "four"];
420
421 for _ in 0..5 {
422 let event = match poll!(rx.next()) {
423 Poll::Ready(event) => event.unwrap(),
424 _ => unreachable!(),
425 };
426 let log = event.as_log();
427 let message = log[&message_key].to_string_lossy();
428 assert!(lines.contains(&&*message));
429 }
430
431 assert_eq!(poll!(rx.next()), Poll::Ready(None));
432 }
433
434 #[tokio::test]
435 async fn shuffle_demo_logs_limits_count() {
436 let mut rx = runit(
437 r#"format = "shuffle"
438 lines = ["one", "two"]
439 count = 5"#,
440 )
441 .await;
442
443 for _ in 0..5 {
444 assert!(poll!(rx.next()).is_ready());
445 }
446 assert_eq!(poll!(rx.next()), Poll::Ready(None));
447 }
448
449 #[tokio::test]
450 async fn shuffle_demo_logs_adds_sequence() {
451 let message_key = log_schema().message_key().unwrap().to_string();
452 let mut rx = runit(
453 r#"format = "shuffle"
454 lines = ["one", "two"]
455 sequence = true
456 count = 5"#,
457 )
458 .await;
459
460 for n in 0..5 {
461 let event = match poll!(rx.next()) {
462 Poll::Ready(event) => event.unwrap(),
463 _ => unreachable!(),
464 };
465 let log = event.as_log();
466 let message = log[&message_key].to_string_lossy();
467 assert!(message.starts_with(&n.to_string()));
468 }
469
470 assert_eq!(poll!(rx.next()), Poll::Ready(None));
471 }
472
473 #[tokio::test]
474 async fn shuffle_demo_logs_obeys_interval() {
475 let start = Instant::now();
476 let mut rx = runit(
477 r#"format = "shuffle"
478 lines = ["one", "two"]
479 count = 3
480 interval = 1.0"#,
481 )
482 .await;
483
484 for _ in 0..3 {
485 assert!(poll!(rx.next()).is_ready());
486 }
487 assert_eq!(poll!(rx.next()), Poll::Ready(None));
488
489 let duration = start.elapsed();
490 assert!(duration >= Duration::from_secs(2));
491 }
492
493 #[tokio::test]
494 async fn host_is_set() {
495 let host_key = log_schema().host_key().unwrap().to_string();
496 let mut rx = runit(
497 r#"format = "syslog"
498 count = 5"#,
499 )
500 .await;
501
502 let event = match poll!(rx.next()) {
503 Poll::Ready(event) => event.unwrap(),
504 _ => unreachable!(),
505 };
506 let log = event.as_log();
507 let host = log[&host_key].to_string_lossy();
508 assert_eq!("localhost", host);
509 }
510
511 #[tokio::test]
512 async fn apache_common_format_generates_output() {
513 let mut rx = runit(
514 r#"format = "apache_common"
515 count = 5"#,
516 )
517 .await;
518
519 for _ in 0..5 {
520 assert!(poll!(rx.next()).is_ready());
521 }
522 assert_eq!(poll!(rx.next()), Poll::Ready(None));
523 }
524
525 #[tokio::test]
526 async fn apache_error_format_generates_output() {
527 let mut rx = runit(
528 r#"format = "apache_error"
529 count = 5"#,
530 )
531 .await;
532
533 for _ in 0..5 {
534 assert!(poll!(rx.next()).is_ready());
535 }
536 assert_eq!(poll!(rx.next()), Poll::Ready(None));
537 }
538
539 #[tokio::test]
540 async fn syslog_5424_format_generates_output() {
541 let mut rx = runit(
542 r#"format = "syslog"
543 count = 5"#,
544 )
545 .await;
546
547 for _ in 0..5 {
548 assert!(poll!(rx.next()).is_ready());
549 }
550 assert_eq!(poll!(rx.next()), Poll::Ready(None));
551 }
552
553 #[tokio::test]
554 async fn syslog_3164_format_generates_output() {
555 let mut rx = runit(
556 r#"format = "bsd_syslog"
557 count = 5"#,
558 )
559 .await;
560
561 for _ in 0..5 {
562 assert!(poll!(rx.next()).is_ready());
563 }
564 assert_eq!(poll!(rx.next()), Poll::Ready(None));
565 }
566
567 #[tokio::test]
568 async fn json_format_generates_output() {
569 let message_key = log_schema().message_key().unwrap().to_string();
570 let mut rx = runit(
571 r#"format = "json"
572 count = 5"#,
573 )
574 .await;
575
576 for _ in 0..5 {
577 let event = match poll!(rx.next()) {
578 Poll::Ready(event) => event.unwrap(),
579 _ => unreachable!(),
580 };
581 let log = event.as_log();
582 let message = log[&message_key].to_string_lossy();
583 assert!(serde_json::from_str::<serde_json::Value>(&message).is_ok());
584 }
585 assert_eq!(poll!(rx.next()), Poll::Ready(None));
586 }
587}