1use std::task::Poll;
2
3use chrono::Utc;
4use fakedata::logs::*;
5use futures::StreamExt;
6use rand::prelude::IndexedRandom;
7use serde_with::serde_as;
8use snafu::Snafu;
9use tokio::time::{self, Duration};
10use tokio_util::codec::FramedRead;
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::{
14 StreamDecodingError,
15 decoding::{DeserializerConfig, FramingConfig},
16 },
17 config::{DataType, LegacyKey, LogNamespace},
18 configurable::configurable_component,
19 internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
20 lookup::{owned_value_path, path},
21};
22use vrl::value::Kind;
23
24use crate::{
25 SourceSender,
26 codecs::{Decoder, DecodingConfig},
27 config::{SourceConfig, SourceContext, SourceOutput},
28 internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
29 serde::{default_decoding, default_framing_message_based},
30 shutdown::ShutdownSignal,
31};
32
33#[serde_as]
35#[configurable_component(source(
36 "demo_logs",
37 "Generate fake log events, which can be useful for testing and demos."
38))]
39#[derive(Clone, Debug, Derivative)]
40#[derivative(Default)]
41pub struct DemoLogsConfig {
42 #[serde(alias = "batch_interval")]
47 #[derivative(Default(value = "default_interval()"))]
48 #[serde(default = "default_interval")]
49 #[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
50 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
51 pub interval: Duration,
52
53 #[derivative(Default(value = "default_count()"))]
57 #[serde(default = "default_count")]
58 pub count: usize,
59
60 #[serde(flatten)]
61 #[configurable(metadata(
62 docs::enum_tag_description = "The format of the randomly generated output."
63 ))]
64 pub format: OutputFormat,
65
66 #[configurable(derived)]
67 #[derivative(Default(value = "default_framing_message_based()"))]
68 #[serde(default = "default_framing_message_based")]
69 pub framing: FramingConfig,
70
71 #[configurable(derived)]
72 #[derivative(Default(value = "default_decoding()"))]
73 #[serde(default = "default_decoding")]
74 pub decoding: DeserializerConfig,
75
76 #[serde(default)]
78 #[configurable(metadata(docs::hidden))]
79 pub log_namespace: Option<bool>,
80}
81
82const fn default_interval() -> Duration {
83 Duration::from_secs(1)
84}
85
86const fn default_count() -> usize {
87 isize::MAX as usize
88}
89
90#[derive(Debug, PartialEq, Eq, Snafu)]
91pub enum DemoLogsConfigError {
92 #[snafu(display("A non-empty list of lines is required for the shuffle format"))]
93 ShuffleDemoLogsItemsEmpty,
94}
95
96#[configurable_component]
98#[derive(Clone, Debug, Derivative)]
99#[derivative(Default)]
100#[serde(tag = "format", rename_all = "snake_case")]
101#[configurable(metadata(
102 docs::enum_tag_description = "The format of the randomly generated output."
103))]
104pub enum OutputFormat {
105 Shuffle {
107 #[serde(default)]
109 sequence: bool,
110 #[configurable(metadata(docs::examples = "lines_example()"))]
112 lines: Vec<String>,
113 },
114
115 ApacheCommon,
119
120 ApacheError,
124
125 #[serde(alias = "rfc5424")]
129 Syslog,
130
131 #[serde(alias = "rfc3164")]
135 BsdSyslog,
136
137 #[derivative(Default)]
141 Json,
142}
143
144const fn lines_example() -> [&'static str; 2] {
145 ["line1", "line2"]
146}
147
148impl OutputFormat {
149 fn generate_line(&self, n: usize) -> String {
150 emit!(DemoLogsEventProcessed);
151
152 match self {
153 Self::Shuffle { sequence, lines } => Self::shuffle_generate(*sequence, lines, n),
154 Self::ApacheCommon => apache_common_log_line(),
155 Self::ApacheError => apache_error_log_line(),
156 Self::Syslog => syslog_5424_log_line(),
157 Self::BsdSyslog => syslog_3164_log_line(),
158 Self::Json => json_log_line(),
159 }
160 }
161
162 fn shuffle_generate(sequence: bool, lines: &[String], n: usize) -> String {
163 let line = lines.choose(&mut rand::rng()).unwrap();
165
166 if sequence {
167 format!("{n} {line}")
168 } else {
169 line.into()
170 }
171 }
172
173 pub(self) const fn validate(&self) -> Result<(), DemoLogsConfigError> {
175 match self {
176 Self::Shuffle { lines, .. } => {
177 if lines.is_empty() {
178 Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
179 } else {
180 Ok(())
181 }
182 }
183 _ => Ok(()),
184 }
185 }
186}
187
188impl DemoLogsConfig {
189 #[cfg(test)]
190 pub fn repeat(
191 lines: Vec<String>,
192 count: usize,
193 interval: Duration,
194 log_namespace: Option<bool>,
195 ) -> Self {
196 Self {
197 count,
198 interval,
199 format: OutputFormat::Shuffle {
200 lines,
201 sequence: false,
202 },
203 framing: default_framing_message_based(),
204 decoding: default_decoding(),
205 log_namespace,
206 }
207 }
208}
209
210async fn demo_logs_source(
211 interval: Duration,
212 count: usize,
213 format: OutputFormat,
214 decoder: Decoder,
215 mut shutdown: ShutdownSignal,
216 mut out: SourceSender,
217 log_namespace: LogNamespace,
218) -> Result<(), ()> {
219 let interval: Option<Duration> = (interval != Duration::ZERO).then_some(interval);
220 let mut interval = interval.map(time::interval);
221
222 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
223 let events_received = register!(EventsReceived);
224
225 for n in 0..count {
226 if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
227 break;
228 }
229
230 if let Some(interval) = &mut interval {
231 interval.tick().await;
232 }
233 bytes_received.emit(ByteSize(0));
234
235 let line = format.generate_line(n);
236
237 let mut stream = FramedRead::new(line.as_bytes(), decoder.clone());
238 while let Some(next) = stream.next().await {
239 match next {
240 Ok((events, _byte_size)) => {
241 let count = events.len();
242 let byte_size = events.estimated_json_encoded_size_of();
243 events_received.emit(CountByteSize(count, byte_size));
244 let now = Utc::now();
245
246 let events = events.into_iter().map(|mut event| {
247 let log = event.as_mut_log();
248 log_namespace.insert_standard_vector_source_metadata(
249 log,
250 DemoLogsConfig::NAME,
251 now,
252 );
253 log_namespace.insert_source_metadata(
254 DemoLogsConfig::NAME,
255 log,
256 Some(LegacyKey::InsertIfEmpty(path!("service"))),
257 path!("service"),
258 "vector",
259 );
260 log_namespace.insert_source_metadata(
261 DemoLogsConfig::NAME,
262 log,
263 Some(LegacyKey::InsertIfEmpty(path!("host"))),
264 path!("host"),
265 "localhost",
266 );
267
268 event
269 });
270 out.send_batch(events).await.map_err(|_| {
271 emit!(StreamClosedError { count });
272 })?;
273 }
274 Err(error) => {
275 if !error.can_continue() {
278 break;
279 }
280 }
281 }
282 }
283 }
284
285 Ok(())
286}
287
288impl_generate_config_from_default!(DemoLogsConfig);
289
290#[async_trait::async_trait]
291#[typetag::serde(name = "demo_logs")]
292impl SourceConfig for DemoLogsConfig {
293 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
294 let log_namespace = cx.log_namespace(self.log_namespace);
295
296 self.format.validate()?;
297 let decoder =
298 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
299 .build()?;
300 Ok(Box::pin(demo_logs_source(
301 self.interval,
302 self.count,
303 self.format.clone(),
304 decoder,
305 cx.shutdown,
306 cx.out,
307 log_namespace,
308 )))
309 }
310
311 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
312 let log_namespace = global_log_namespace.merge(self.log_namespace);
315
316 let schema_definition = self
317 .decoding
318 .schema_definition(log_namespace)
319 .with_standard_vector_source_metadata()
320 .with_source_metadata(
321 DemoLogsConfig::NAME,
322 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
323 &owned_value_path!("service"),
324 Kind::bytes(),
325 Some("service"),
326 );
327
328 vec![SourceOutput::new_maybe_logs(
329 DataType::Log,
330 schema_definition,
331 )]
332 }
333
334 fn can_acknowledge(&self) -> bool {
335 false
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use std::time::{Duration, Instant};
342
343 use futures::{Stream, StreamExt, poll};
344
345 use super::*;
346 use crate::{
347 SourceSender,
348 config::log_schema,
349 event::Event,
350 shutdown::ShutdownSignal,
351 test_util::components::{SOURCE_TAGS, assert_source_compliance},
352 };
353
354 #[test]
355 fn generate_config() {
356 crate::test_util::test_generate_config::<DemoLogsConfig>();
357 }
358
359 async fn runit(config: &str) -> impl Stream<Item = Event> + use<> {
360 assert_source_compliance(&SOURCE_TAGS, async {
361 let (tx, rx) = SourceSender::new_test();
362 let config: DemoLogsConfig = toml::from_str(config).unwrap();
363 let decoder = DecodingConfig::new(
364 default_framing_message_based(),
365 default_decoding(),
366 LogNamespace::Legacy,
367 )
368 .build()
369 .unwrap();
370 demo_logs_source(
371 config.interval,
372 config.count,
373 config.format,
374 decoder,
375 ShutdownSignal::noop(),
376 tx,
377 LogNamespace::Legacy,
378 )
379 .await
380 .unwrap();
381
382 rx
383 })
384 .await
385 }
386
387 #[test]
388 fn config_shuffle_lines_not_empty() {
389 let empty_lines: Vec<String> = Vec::new();
390
391 let errant_config = DemoLogsConfig {
392 format: OutputFormat::Shuffle {
393 sequence: false,
394 lines: empty_lines,
395 },
396 ..DemoLogsConfig::default()
397 };
398
399 assert_eq!(
400 errant_config.format.validate(),
401 Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
402 );
403 }
404
405 #[tokio::test]
406 async fn shuffle_demo_logs_copies_lines() {
407 let message_key = log_schema().message_key().unwrap().to_string();
408 let mut rx = runit(
409 r#"format = "shuffle"
410 lines = ["one", "two", "three", "four"]
411 count = 5"#,
412 )
413 .await;
414
415 let lines = &["one", "two", "three", "four"];
416
417 for _ in 0..5 {
418 let event = match poll!(rx.next()) {
419 Poll::Ready(event) => event.unwrap(),
420 _ => unreachable!(),
421 };
422 let log = event.as_log();
423 let message = log[&message_key].to_string_lossy();
424 assert!(lines.contains(&&*message));
425 }
426
427 assert_eq!(poll!(rx.next()), Poll::Ready(None));
428 }
429
430 #[tokio::test]
431 async fn shuffle_demo_logs_limits_count() {
432 let mut rx = runit(
433 r#"format = "shuffle"
434 lines = ["one", "two"]
435 count = 5"#,
436 )
437 .await;
438
439 for _ in 0..5 {
440 assert!(poll!(rx.next()).is_ready());
441 }
442 assert_eq!(poll!(rx.next()), Poll::Ready(None));
443 }
444
445 #[tokio::test]
446 async fn shuffle_demo_logs_adds_sequence() {
447 let message_key = log_schema().message_key().unwrap().to_string();
448 let mut rx = runit(
449 r#"format = "shuffle"
450 lines = ["one", "two"]
451 sequence = true
452 count = 5"#,
453 )
454 .await;
455
456 for n in 0..5 {
457 let event = match poll!(rx.next()) {
458 Poll::Ready(event) => event.unwrap(),
459 _ => unreachable!(),
460 };
461 let log = event.as_log();
462 let message = log[&message_key].to_string_lossy();
463 assert!(message.starts_with(&n.to_string()));
464 }
465
466 assert_eq!(poll!(rx.next()), Poll::Ready(None));
467 }
468
469 #[tokio::test]
470 async fn shuffle_demo_logs_obeys_interval() {
471 let start = Instant::now();
472 let mut rx = runit(
473 r#"format = "shuffle"
474 lines = ["one", "two"]
475 count = 3
476 interval = 1.0"#,
477 )
478 .await;
479
480 for _ in 0..3 {
481 assert!(poll!(rx.next()).is_ready());
482 }
483 assert_eq!(poll!(rx.next()), Poll::Ready(None));
484
485 let duration = start.elapsed();
486 assert!(duration >= Duration::from_secs(2));
487 }
488
489 #[tokio::test]
490 async fn host_is_set() {
491 let host_key = log_schema().host_key().unwrap().to_string();
492 let mut rx = runit(
493 r#"format = "syslog"
494 count = 5"#,
495 )
496 .await;
497
498 let event = match poll!(rx.next()) {
499 Poll::Ready(event) => event.unwrap(),
500 _ => unreachable!(),
501 };
502 let log = event.as_log();
503 let host = log[&host_key].to_string_lossy();
504 assert_eq!("localhost", host);
505 }
506
507 #[tokio::test]
508 async fn apache_common_format_generates_output() {
509 let mut rx = runit(
510 r#"format = "apache_common"
511 count = 5"#,
512 )
513 .await;
514
515 for _ in 0..5 {
516 assert!(poll!(rx.next()).is_ready());
517 }
518 assert_eq!(poll!(rx.next()), Poll::Ready(None));
519 }
520
521 #[tokio::test]
522 async fn apache_error_format_generates_output() {
523 let mut rx = runit(
524 r#"format = "apache_error"
525 count = 5"#,
526 )
527 .await;
528
529 for _ in 0..5 {
530 assert!(poll!(rx.next()).is_ready());
531 }
532 assert_eq!(poll!(rx.next()), Poll::Ready(None));
533 }
534
535 #[tokio::test]
536 async fn syslog_5424_format_generates_output() {
537 let mut rx = runit(
538 r#"format = "syslog"
539 count = 5"#,
540 )
541 .await;
542
543 for _ in 0..5 {
544 assert!(poll!(rx.next()).is_ready());
545 }
546 assert_eq!(poll!(rx.next()), Poll::Ready(None));
547 }
548
549 #[tokio::test]
550 async fn syslog_3164_format_generates_output() {
551 let mut rx = runit(
552 r#"format = "bsd_syslog"
553 count = 5"#,
554 )
555 .await;
556
557 for _ in 0..5 {
558 assert!(poll!(rx.next()).is_ready());
559 }
560 assert_eq!(poll!(rx.next()), Poll::Ready(None));
561 }
562
563 #[tokio::test]
564 async fn json_format_generates_output() {
565 let message_key = log_schema().message_key().unwrap().to_string();
566 let mut rx = runit(
567 r#"format = "json"
568 count = 5"#,
569 )
570 .await;
571
572 for _ in 0..5 {
573 let event = match poll!(rx.next()) {
574 Poll::Ready(event) => event.unwrap(),
575 _ => unreachable!(),
576 };
577 let log = event.as_log();
578 let message = log[&message_key].to_string_lossy();
579 assert!(serde_json::from_str::<serde_json::Value>(&message).is_ok());
580 }
581 assert_eq!(poll!(rx.next()), Poll::Ready(None));
582 }
583}