1use std::task::Poll;
2
3use chrono::Utc;
4use fakedata::logs::*;
5use futures::StreamExt;
6use rand::prelude::IndexedRandom;
7use serde_with::serde_as;
8use snafu::Snafu;
9use tokio::time::{self, Duration};
10use vector_lib::{
11 EstimatedJsonEncodedSizeOf,
12 codecs::{
13 DecoderFramedRead, StreamDecodingError,
14 decoding::{DeserializerConfig, FramingConfig},
15 },
16 config::{DataType, LegacyKey, LogNamespace},
17 configurable::configurable_component,
18 internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
19 lookup::{owned_value_path, path},
20};
21use vrl::value::Kind;
22
23use crate::{
24 SourceSender,
25 codecs::{Decoder, DecodingConfig},
26 config::{SourceConfig, SourceContext, SourceOutput},
27 internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
28 serde::{default_decoding, default_framing_message_based},
29 shutdown::ShutdownSignal,
30};
31
32#[serde_as]
34#[configurable_component(source(
35 "demo_logs",
36 "Generate fake log events, which can be useful for testing and demos."
37))]
38#[derive(Clone, Debug, Derivative)]
39#[derivative(Default)]
40pub struct DemoLogsConfig {
41 #[serde(alias = "batch_interval")]
46 #[derivative(Default(value = "default_interval()"))]
47 #[serde(default = "default_interval")]
48 #[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
49 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
50 pub interval: Duration,
51
52 #[derivative(Default(value = "default_count()"))]
56 #[serde(default = "default_count")]
57 pub count: usize,
58
59 #[serde(flatten)]
60 #[configurable(metadata(
61 docs::enum_tag_description = "The format of the randomly generated output."
62 ))]
63 pub format: OutputFormat,
64
65 #[configurable(derived)]
66 #[derivative(Default(value = "default_framing_message_based()"))]
67 #[serde(default = "default_framing_message_based")]
68 pub framing: FramingConfig,
69
70 #[configurable(derived)]
71 #[derivative(Default(value = "default_decoding()"))]
72 #[serde(default = "default_decoding")]
73 pub decoding: DeserializerConfig,
74
75 #[serde(default)]
77 #[configurable(metadata(docs::hidden))]
78 pub log_namespace: Option<bool>,
79}
80
81const fn default_interval() -> Duration {
82 Duration::from_secs(1)
83}
84
85const fn default_count() -> usize {
86 isize::MAX as usize
87}
88
89#[derive(Debug, PartialEq, Eq, Snafu)]
90pub enum DemoLogsConfigError {
91 #[snafu(display("A non-empty list of lines is required for the shuffle format"))]
92 ShuffleDemoLogsItemsEmpty,
93}
94
95#[configurable_component]
97#[derive(Clone, Debug, Default)]
98#[serde(tag = "format", rename_all = "snake_case")]
99#[configurable(metadata(
100 docs::enum_tag_description = "The format of the randomly generated output."
101))]
102pub enum OutputFormat {
103 Shuffle {
105 #[serde(default)]
107 sequence: bool,
108 #[configurable(metadata(docs::examples = "lines_example()"))]
110 lines: Vec<String>,
111 },
112
113 ApacheCommon,
117
118 ApacheError,
122
123 #[serde(alias = "rfc5424")]
127 Syslog,
128
129 #[serde(alias = "rfc3164")]
133 BsdSyslog,
134
135 #[default]
139 Json,
140}
141
142const fn lines_example() -> [&'static str; 2] {
143 ["line1", "line2"]
144}
145
146impl OutputFormat {
147 fn generate_line(&self, n: usize) -> String {
148 emit!(DemoLogsEventProcessed);
149
150 match self {
151 Self::Shuffle { sequence, lines } => Self::shuffle_generate(*sequence, lines, n),
152 Self::ApacheCommon => apache_common_log_line(),
153 Self::ApacheError => apache_error_log_line(),
154 Self::Syslog => syslog_5424_log_line(),
155 Self::BsdSyslog => syslog_3164_log_line(),
156 Self::Json => json_log_line(),
157 }
158 }
159
160 fn shuffle_generate(sequence: bool, lines: &[String], n: usize) -> String {
161 let line = lines.choose(&mut rand::rng()).unwrap();
163
164 if sequence {
165 format!("{n} {line}")
166 } else {
167 line.into()
168 }
169 }
170
171 pub(self) const fn validate(&self) -> Result<(), DemoLogsConfigError> {
173 match self {
174 Self::Shuffle { lines, .. } => {
175 if lines.is_empty() {
176 Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
177 } else {
178 Ok(())
179 }
180 }
181 _ => Ok(()),
182 }
183 }
184}
185
186impl DemoLogsConfig {
187 #[cfg(test)]
188 pub fn repeat(
189 lines: Vec<String>,
190 count: usize,
191 interval: Duration,
192 log_namespace: Option<bool>,
193 ) -> Self {
194 Self {
195 count,
196 interval,
197 format: OutputFormat::Shuffle {
198 lines,
199 sequence: false,
200 },
201 framing: default_framing_message_based(),
202 decoding: default_decoding(),
203 log_namespace,
204 }
205 }
206}
207
208async fn demo_logs_source(
209 interval: Duration,
210 count: usize,
211 format: OutputFormat,
212 decoder: Decoder,
213 mut shutdown: ShutdownSignal,
214 mut out: SourceSender,
215 log_namespace: LogNamespace,
216) -> Result<(), ()> {
217 let interval: Option<Duration> = (interval != Duration::ZERO).then_some(interval);
218 let mut interval = interval.map(time::interval);
219
220 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
221 let events_received = register!(EventsReceived);
222
223 for n in 0..count {
224 if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
225 break;
226 }
227
228 if let Some(interval) = &mut interval {
229 interval.tick().await;
230 }
231 bytes_received.emit(ByteSize(0));
232
233 let line = format.generate_line(n);
234
235 let mut stream = DecoderFramedRead::new(line.as_bytes(), decoder.clone());
236 while let Some(next) = stream.next().await {
237 match next {
238 Ok((events, _byte_size)) => {
239 let count = events.len();
240 let byte_size = events.estimated_json_encoded_size_of();
241 events_received.emit(CountByteSize(count, byte_size));
242 let now = Utc::now();
243
244 let events = events.into_iter().map(|mut event| {
245 let log = event.as_mut_log();
246 log_namespace.insert_standard_vector_source_metadata(
247 log,
248 DemoLogsConfig::NAME,
249 now,
250 );
251 log_namespace.insert_source_metadata(
252 DemoLogsConfig::NAME,
253 log,
254 Some(LegacyKey::InsertIfEmpty(path!("service"))),
255 path!("service"),
256 "vector",
257 );
258 log_namespace.insert_source_metadata(
259 DemoLogsConfig::NAME,
260 log,
261 Some(LegacyKey::InsertIfEmpty(path!("host"))),
262 path!("host"),
263 "localhost",
264 );
265
266 event
267 });
268 out.send_batch(events).await.map_err(|_| {
269 emit!(StreamClosedError { count });
270 })?;
271 }
272 Err(error) => {
273 if !error.can_continue() {
276 break;
277 }
278 }
279 }
280 }
281 }
282
283 Ok(())
284}
285
286impl_generate_config_from_default!(DemoLogsConfig);
287
288#[async_trait::async_trait]
289#[typetag::serde(name = "demo_logs")]
290impl SourceConfig for DemoLogsConfig {
291 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
292 let log_namespace = cx.log_namespace(self.log_namespace);
293
294 self.format.validate()?;
295 let decoder =
296 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
297 .build()?;
298 Ok(Box::pin(demo_logs_source(
299 self.interval,
300 self.count,
301 self.format.clone(),
302 decoder,
303 cx.shutdown,
304 cx.out,
305 log_namespace,
306 )))
307 }
308
309 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
310 let log_namespace = global_log_namespace.merge(self.log_namespace);
313
314 let schema_definition = self
315 .decoding
316 .schema_definition(log_namespace)
317 .with_standard_vector_source_metadata()
318 .with_source_metadata(
319 DemoLogsConfig::NAME,
320 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
321 &owned_value_path!("service"),
322 Kind::bytes(),
323 Some("service"),
324 );
325
326 vec![SourceOutput::new_maybe_logs(
327 DataType::Log,
328 schema_definition,
329 )]
330 }
331
332 fn can_acknowledge(&self) -> bool {
333 false
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use std::time::{Duration, Instant};
340
341 use futures::{Stream, StreamExt, poll};
342
343 use super::*;
344 use crate::{
345 SourceSender,
346 config::log_schema,
347 event::Event,
348 shutdown::ShutdownSignal,
349 test_util::components::{SOURCE_TAGS, assert_source_compliance},
350 };
351
352 #[test]
353 fn generate_config() {
354 crate::test_util::test_generate_config::<DemoLogsConfig>();
355 }
356
357 async fn runit(config: &str) -> impl Stream<Item = Event> + use<> {
358 assert_source_compliance(&SOURCE_TAGS, async {
359 let (tx, rx) = SourceSender::new_test();
360 let config: DemoLogsConfig = toml::from_str(config).unwrap();
361 let decoder = DecodingConfig::new(
362 default_framing_message_based(),
363 default_decoding(),
364 LogNamespace::Legacy,
365 )
366 .build()
367 .unwrap();
368 demo_logs_source(
369 config.interval,
370 config.count,
371 config.format,
372 decoder,
373 ShutdownSignal::noop(),
374 tx,
375 LogNamespace::Legacy,
376 )
377 .await
378 .unwrap();
379
380 rx
381 })
382 .await
383 }
384
385 #[test]
386 fn config_shuffle_lines_not_empty() {
387 let empty_lines: Vec<String> = Vec::new();
388
389 let errant_config = DemoLogsConfig {
390 format: OutputFormat::Shuffle {
391 sequence: false,
392 lines: empty_lines,
393 },
394 ..DemoLogsConfig::default()
395 };
396
397 assert_eq!(
398 errant_config.format.validate(),
399 Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
400 );
401 }
402
403 #[tokio::test]
404 async fn shuffle_demo_logs_copies_lines() {
405 let message_key = log_schema().message_key().unwrap().to_string();
406 let mut rx = runit(
407 r#"format = "shuffle"
408 lines = ["one", "two", "three", "four"]
409 count = 5"#,
410 )
411 .await;
412
413 let lines = &["one", "two", "three", "four"];
414
415 for _ in 0..5 {
416 let event = match poll!(rx.next()) {
417 Poll::Ready(event) => event.unwrap(),
418 _ => unreachable!(),
419 };
420 let log = event.as_log();
421 let message = log[&message_key].to_string_lossy();
422 assert!(lines.contains(&&*message));
423 }
424
425 assert_eq!(poll!(rx.next()), Poll::Ready(None));
426 }
427
428 #[tokio::test]
429 async fn shuffle_demo_logs_limits_count() {
430 let mut rx = runit(
431 r#"format = "shuffle"
432 lines = ["one", "two"]
433 count = 5"#,
434 )
435 .await;
436
437 for _ in 0..5 {
438 assert!(poll!(rx.next()).is_ready());
439 }
440 assert_eq!(poll!(rx.next()), Poll::Ready(None));
441 }
442
443 #[tokio::test]
444 async fn shuffle_demo_logs_adds_sequence() {
445 let message_key = log_schema().message_key().unwrap().to_string();
446 let mut rx = runit(
447 r#"format = "shuffle"
448 lines = ["one", "two"]
449 sequence = true
450 count = 5"#,
451 )
452 .await;
453
454 for n in 0..5 {
455 let event = match poll!(rx.next()) {
456 Poll::Ready(event) => event.unwrap(),
457 _ => unreachable!(),
458 };
459 let log = event.as_log();
460 let message = log[&message_key].to_string_lossy();
461 assert!(message.starts_with(&n.to_string()));
462 }
463
464 assert_eq!(poll!(rx.next()), Poll::Ready(None));
465 }
466
467 #[tokio::test]
468 async fn shuffle_demo_logs_obeys_interval() {
469 let start = Instant::now();
470 let mut rx = runit(
471 r#"format = "shuffle"
472 lines = ["one", "two"]
473 count = 3
474 interval = 1.0"#,
475 )
476 .await;
477
478 for _ in 0..3 {
479 assert!(poll!(rx.next()).is_ready());
480 }
481 assert_eq!(poll!(rx.next()), Poll::Ready(None));
482
483 let duration = start.elapsed();
484 assert!(duration >= Duration::from_secs(2));
485 }
486
487 #[tokio::test]
488 async fn host_is_set() {
489 let host_key = log_schema().host_key().unwrap().to_string();
490 let mut rx = runit(
491 r#"format = "syslog"
492 count = 5"#,
493 )
494 .await;
495
496 let event = match poll!(rx.next()) {
497 Poll::Ready(event) => event.unwrap(),
498 _ => unreachable!(),
499 };
500 let log = event.as_log();
501 let host = log[&host_key].to_string_lossy();
502 assert_eq!("localhost", host);
503 }
504
505 #[tokio::test]
506 async fn apache_common_format_generates_output() {
507 let mut rx = runit(
508 r#"format = "apache_common"
509 count = 5"#,
510 )
511 .await;
512
513 for _ in 0..5 {
514 assert!(poll!(rx.next()).is_ready());
515 }
516 assert_eq!(poll!(rx.next()), Poll::Ready(None));
517 }
518
519 #[tokio::test]
520 async fn apache_error_format_generates_output() {
521 let mut rx = runit(
522 r#"format = "apache_error"
523 count = 5"#,
524 )
525 .await;
526
527 for _ in 0..5 {
528 assert!(poll!(rx.next()).is_ready());
529 }
530 assert_eq!(poll!(rx.next()), Poll::Ready(None));
531 }
532
533 #[tokio::test]
534 async fn syslog_5424_format_generates_output() {
535 let mut rx = runit(
536 r#"format = "syslog"
537 count = 5"#,
538 )
539 .await;
540
541 for _ in 0..5 {
542 assert!(poll!(rx.next()).is_ready());
543 }
544 assert_eq!(poll!(rx.next()), Poll::Ready(None));
545 }
546
547 #[tokio::test]
548 async fn syslog_3164_format_generates_output() {
549 let mut rx = runit(
550 r#"format = "bsd_syslog"
551 count = 5"#,
552 )
553 .await;
554
555 for _ in 0..5 {
556 assert!(poll!(rx.next()).is_ready());
557 }
558 assert_eq!(poll!(rx.next()), Poll::Ready(None));
559 }
560
561 #[tokio::test]
562 async fn json_format_generates_output() {
563 let message_key = log_schema().message_key().unwrap().to_string();
564 let mut rx = runit(
565 r#"format = "json"
566 count = 5"#,
567 )
568 .await;
569
570 for _ in 0..5 {
571 let event = match poll!(rx.next()) {
572 Poll::Ready(event) => event.unwrap(),
573 _ => unreachable!(),
574 };
575 let log = event.as_log();
576 let message = log[&message_key].to_string_lossy();
577 assert!(serde_json::from_str::<serde_json::Value>(&message).is_ok());
578 }
579 assert_eq!(poll!(rx.next()), Poll::Ready(None));
580 }
581}