vector/sources/
demo_logs.rs

1use std::task::Poll;
2
3use chrono::Utc;
4use fakedata::logs::*;
5use futures::StreamExt;
6use rand::prelude::IndexedRandom;
7use serde_with::serde_as;
8use snafu::Snafu;
9use tokio::time::{self, Duration};
10use tokio_util::codec::FramedRead;
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::{
14        StreamDecodingError,
15        decoding::{DeserializerConfig, FramingConfig},
16    },
17    config::{DataType, LegacyKey, LogNamespace},
18    configurable::configurable_component,
19    internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
20    lookup::{owned_value_path, path},
21};
22use vrl::value::Kind;
23
24use crate::{
25    SourceSender,
26    codecs::{Decoder, DecodingConfig},
27    config::{SourceConfig, SourceContext, SourceOutput},
28    internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
29    serde::{default_decoding, default_framing_message_based},
30    shutdown::ShutdownSignal,
31};
32
33/// Configuration for the `demo_logs` source.
34#[serde_as]
35#[configurable_component(source(
36    "demo_logs",
37    "Generate fake log events, which can be useful for testing and demos."
38))]
39#[derive(Clone, Debug, Derivative)]
40#[derivative(Default)]
41pub struct DemoLogsConfig {
42    /// The amount of time, in seconds, to pause between each batch of output lines.
43    ///
44    /// The default is one batch per second. To remove the delay and output batches as quickly as possible, set
45    /// `interval` to `0.0`.
46    #[serde(alias = "batch_interval")]
47    #[derivative(Default(value = "default_interval()"))]
48    #[serde(default = "default_interval")]
49    #[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
50    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
51    pub interval: Duration,
52
53    /// The total number of lines to output.
54    ///
55    /// By default, the source continuously prints logs (infinitely).
56    #[derivative(Default(value = "default_count()"))]
57    #[serde(default = "default_count")]
58    pub count: usize,
59
60    #[serde(flatten)]
61    #[configurable(metadata(
62        docs::enum_tag_description = "The format of the randomly generated output."
63    ))]
64    pub format: OutputFormat,
65
66    #[configurable(derived)]
67    #[derivative(Default(value = "default_framing_message_based()"))]
68    #[serde(default = "default_framing_message_based")]
69    pub framing: FramingConfig,
70
71    #[configurable(derived)]
72    #[derivative(Default(value = "default_decoding()"))]
73    #[serde(default = "default_decoding")]
74    pub decoding: DeserializerConfig,
75
76    /// The namespace to use for logs. This overrides the global setting.
77    #[serde(default)]
78    #[configurable(metadata(docs::hidden))]
79    pub log_namespace: Option<bool>,
80}
81
82const fn default_interval() -> Duration {
83    Duration::from_secs(1)
84}
85
86const fn default_count() -> usize {
87    isize::MAX as usize
88}
89
90#[derive(Debug, PartialEq, Eq, Snafu)]
91pub enum DemoLogsConfigError {
92    #[snafu(display("A non-empty list of lines is required for the shuffle format"))]
93    ShuffleDemoLogsItemsEmpty,
94}
95
96/// Output format configuration.
97#[configurable_component]
98#[derive(Clone, Debug, Derivative)]
99#[derivative(Default)]
100#[serde(tag = "format", rename_all = "snake_case")]
101#[configurable(metadata(
102    docs::enum_tag_description = "The format of the randomly generated output."
103))]
104pub enum OutputFormat {
105    /// Lines are chosen at random from the list specified using `lines`.
106    Shuffle {
107        /// If `true`, each output line starts with an increasing sequence number, beginning with 0.
108        #[serde(default)]
109        sequence: bool,
110        /// The list of lines to output.
111        #[configurable(metadata(docs::examples = "lines_example()"))]
112        lines: Vec<String>,
113    },
114
115    /// Randomly generated logs in [Apache common][apache_common] format.
116    ///
117    /// [apache_common]: https://httpd.apache.org/docs/current/logs.html#common
118    ApacheCommon,
119
120    /// Randomly generated logs in [Apache error][apache_error] format.
121    ///
122    /// [apache_error]: https://httpd.apache.org/docs/current/logs.html#errorlog
123    ApacheError,
124
125    /// Randomly generated logs in Syslog format ([RFC 5424][syslog_5424]).
126    ///
127    /// [syslog_5424]: https://tools.ietf.org/html/rfc5424
128    #[serde(alias = "rfc5424")]
129    Syslog,
130
131    /// Randomly generated logs in Syslog format ([RFC 3164][syslog_3164]).
132    ///
133    /// [syslog_3164]: https://tools.ietf.org/html/rfc3164
134    #[serde(alias = "rfc3164")]
135    BsdSyslog,
136
137    /// Randomly generated HTTP server logs in [JSON][json] format.
138    ///
139    /// [json]: https://en.wikipedia.org/wiki/JSON
140    #[derivative(Default)]
141    Json,
142}
143
144const fn lines_example() -> [&'static str; 2] {
145    ["line1", "line2"]
146}
147
148impl OutputFormat {
149    fn generate_line(&self, n: usize) -> String {
150        emit!(DemoLogsEventProcessed);
151
152        match self {
153            Self::Shuffle { sequence, lines } => Self::shuffle_generate(*sequence, lines, n),
154            Self::ApacheCommon => apache_common_log_line(),
155            Self::ApacheError => apache_error_log_line(),
156            Self::Syslog => syslog_5424_log_line(),
157            Self::BsdSyslog => syslog_3164_log_line(),
158            Self::Json => json_log_line(),
159        }
160    }
161
162    fn shuffle_generate(sequence: bool, lines: &[String], n: usize) -> String {
163        // unwrap can be called here because `lines` can't be empty
164        let line = lines.choose(&mut rand::rng()).unwrap();
165
166        if sequence {
167            format!("{n} {line}")
168        } else {
169            line.into()
170        }
171    }
172
173    // Ensures that the `lines` list is non-empty if `Shuffle` is chosen
174    pub(self) const fn validate(&self) -> Result<(), DemoLogsConfigError> {
175        match self {
176            Self::Shuffle { lines, .. } => {
177                if lines.is_empty() {
178                    Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
179                } else {
180                    Ok(())
181                }
182            }
183            _ => Ok(()),
184        }
185    }
186}
187
188impl DemoLogsConfig {
189    #[cfg(test)]
190    pub fn repeat(
191        lines: Vec<String>,
192        count: usize,
193        interval: Duration,
194        log_namespace: Option<bool>,
195    ) -> Self {
196        Self {
197            count,
198            interval,
199            format: OutputFormat::Shuffle {
200                lines,
201                sequence: false,
202            },
203            framing: default_framing_message_based(),
204            decoding: default_decoding(),
205            log_namespace,
206        }
207    }
208}
209
210async fn demo_logs_source(
211    interval: Duration,
212    count: usize,
213    format: OutputFormat,
214    decoder: Decoder,
215    mut shutdown: ShutdownSignal,
216    mut out: SourceSender,
217    log_namespace: LogNamespace,
218) -> Result<(), ()> {
219    let interval: Option<Duration> = (interval != Duration::ZERO).then_some(interval);
220    let mut interval = interval.map(time::interval);
221
222    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
223    let events_received = register!(EventsReceived);
224
225    for n in 0..count {
226        if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
227            break;
228        }
229
230        if let Some(interval) = &mut interval {
231            interval.tick().await;
232        }
233        bytes_received.emit(ByteSize(0));
234
235        let line = format.generate_line(n);
236
237        let mut stream = FramedRead::new(line.as_bytes(), decoder.clone());
238        while let Some(next) = stream.next().await {
239            match next {
240                Ok((events, _byte_size)) => {
241                    let count = events.len();
242                    let byte_size = events.estimated_json_encoded_size_of();
243                    events_received.emit(CountByteSize(count, byte_size));
244                    let now = Utc::now();
245
246                    let events = events.into_iter().map(|mut event| {
247                        let log = event.as_mut_log();
248                        log_namespace.insert_standard_vector_source_metadata(
249                            log,
250                            DemoLogsConfig::NAME,
251                            now,
252                        );
253                        log_namespace.insert_source_metadata(
254                            DemoLogsConfig::NAME,
255                            log,
256                            Some(LegacyKey::InsertIfEmpty(path!("service"))),
257                            path!("service"),
258                            "vector",
259                        );
260                        log_namespace.insert_source_metadata(
261                            DemoLogsConfig::NAME,
262                            log,
263                            Some(LegacyKey::InsertIfEmpty(path!("host"))),
264                            path!("host"),
265                            "localhost",
266                        );
267
268                        event
269                    });
270                    out.send_batch(events).await.map_err(|_| {
271                        emit!(StreamClosedError { count });
272                    })?;
273                }
274                Err(error) => {
275                    // Error is logged by `crate::codecs::Decoder`, no further
276                    // handling is needed here.
277                    if !error.can_continue() {
278                        break;
279                    }
280                }
281            }
282        }
283    }
284
285    Ok(())
286}
287
288impl_generate_config_from_default!(DemoLogsConfig);
289
290#[async_trait::async_trait]
291#[typetag::serde(name = "demo_logs")]
292impl SourceConfig for DemoLogsConfig {
293    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
294        let log_namespace = cx.log_namespace(self.log_namespace);
295
296        self.format.validate()?;
297        let decoder =
298            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
299                .build()?;
300        Ok(Box::pin(demo_logs_source(
301            self.interval,
302            self.count,
303            self.format.clone(),
304            decoder,
305            cx.shutdown,
306            cx.out,
307            log_namespace,
308        )))
309    }
310
311    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
312        // There is a global and per-source `log_namespace` config. The source config overrides the global setting,
313        // and is merged here.
314        let log_namespace = global_log_namespace.merge(self.log_namespace);
315
316        let schema_definition = self
317            .decoding
318            .schema_definition(log_namespace)
319            .with_standard_vector_source_metadata()
320            .with_source_metadata(
321                DemoLogsConfig::NAME,
322                Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
323                &owned_value_path!("service"),
324                Kind::bytes(),
325                Some("service"),
326            );
327
328        vec![SourceOutput::new_maybe_logs(
329            DataType::Log,
330            schema_definition,
331        )]
332    }
333
334    fn can_acknowledge(&self) -> bool {
335        false
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use std::time::{Duration, Instant};
342
343    use futures::{Stream, StreamExt, poll};
344
345    use super::*;
346    use crate::{
347        SourceSender,
348        config::log_schema,
349        event::Event,
350        shutdown::ShutdownSignal,
351        test_util::components::{SOURCE_TAGS, assert_source_compliance},
352    };
353
354    #[test]
355    fn generate_config() {
356        crate::test_util::test_generate_config::<DemoLogsConfig>();
357    }
358
359    async fn runit(config: &str) -> impl Stream<Item = Event> + use<> {
360        assert_source_compliance(&SOURCE_TAGS, async {
361            let (tx, rx) = SourceSender::new_test();
362            let config: DemoLogsConfig = toml::from_str(config).unwrap();
363            let decoder = DecodingConfig::new(
364                default_framing_message_based(),
365                default_decoding(),
366                LogNamespace::Legacy,
367            )
368            .build()
369            .unwrap();
370            demo_logs_source(
371                config.interval,
372                config.count,
373                config.format,
374                decoder,
375                ShutdownSignal::noop(),
376                tx,
377                LogNamespace::Legacy,
378            )
379            .await
380            .unwrap();
381
382            rx
383        })
384        .await
385    }
386
387    #[test]
388    fn config_shuffle_lines_not_empty() {
389        let empty_lines: Vec<String> = Vec::new();
390
391        let errant_config = DemoLogsConfig {
392            format: OutputFormat::Shuffle {
393                sequence: false,
394                lines: empty_lines,
395            },
396            ..DemoLogsConfig::default()
397        };
398
399        assert_eq!(
400            errant_config.format.validate(),
401            Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
402        );
403    }
404
405    #[tokio::test]
406    async fn shuffle_demo_logs_copies_lines() {
407        let message_key = log_schema().message_key().unwrap().to_string();
408        let mut rx = runit(
409            r#"format = "shuffle"
410               lines = ["one", "two", "three", "four"]
411               count = 5"#,
412        )
413        .await;
414
415        let lines = &["one", "two", "three", "four"];
416
417        for _ in 0..5 {
418            let event = match poll!(rx.next()) {
419                Poll::Ready(event) => event.unwrap(),
420                _ => unreachable!(),
421            };
422            let log = event.as_log();
423            let message = log[&message_key].to_string_lossy();
424            assert!(lines.contains(&&*message));
425        }
426
427        assert_eq!(poll!(rx.next()), Poll::Ready(None));
428    }
429
430    #[tokio::test]
431    async fn shuffle_demo_logs_limits_count() {
432        let mut rx = runit(
433            r#"format = "shuffle"
434               lines = ["one", "two"]
435               count = 5"#,
436        )
437        .await;
438
439        for _ in 0..5 {
440            assert!(poll!(rx.next()).is_ready());
441        }
442        assert_eq!(poll!(rx.next()), Poll::Ready(None));
443    }
444
445    #[tokio::test]
446    async fn shuffle_demo_logs_adds_sequence() {
447        let message_key = log_schema().message_key().unwrap().to_string();
448        let mut rx = runit(
449            r#"format = "shuffle"
450               lines = ["one", "two"]
451               sequence = true
452               count = 5"#,
453        )
454        .await;
455
456        for n in 0..5 {
457            let event = match poll!(rx.next()) {
458                Poll::Ready(event) => event.unwrap(),
459                _ => unreachable!(),
460            };
461            let log = event.as_log();
462            let message = log[&message_key].to_string_lossy();
463            assert!(message.starts_with(&n.to_string()));
464        }
465
466        assert_eq!(poll!(rx.next()), Poll::Ready(None));
467    }
468
469    #[tokio::test]
470    async fn shuffle_demo_logs_obeys_interval() {
471        let start = Instant::now();
472        let mut rx = runit(
473            r#"format = "shuffle"
474               lines = ["one", "two"]
475               count = 3
476               interval = 1.0"#,
477        )
478        .await;
479
480        for _ in 0..3 {
481            assert!(poll!(rx.next()).is_ready());
482        }
483        assert_eq!(poll!(rx.next()), Poll::Ready(None));
484
485        let duration = start.elapsed();
486        assert!(duration >= Duration::from_secs(2));
487    }
488
489    #[tokio::test]
490    async fn host_is_set() {
491        let host_key = log_schema().host_key().unwrap().to_string();
492        let mut rx = runit(
493            r#"format = "syslog"
494            count = 5"#,
495        )
496        .await;
497
498        let event = match poll!(rx.next()) {
499            Poll::Ready(event) => event.unwrap(),
500            _ => unreachable!(),
501        };
502        let log = event.as_log();
503        let host = log[&host_key].to_string_lossy();
504        assert_eq!("localhost", host);
505    }
506
507    #[tokio::test]
508    async fn apache_common_format_generates_output() {
509        let mut rx = runit(
510            r#"format = "apache_common"
511            count = 5"#,
512        )
513        .await;
514
515        for _ in 0..5 {
516            assert!(poll!(rx.next()).is_ready());
517        }
518        assert_eq!(poll!(rx.next()), Poll::Ready(None));
519    }
520
521    #[tokio::test]
522    async fn apache_error_format_generates_output() {
523        let mut rx = runit(
524            r#"format = "apache_error"
525            count = 5"#,
526        )
527        .await;
528
529        for _ in 0..5 {
530            assert!(poll!(rx.next()).is_ready());
531        }
532        assert_eq!(poll!(rx.next()), Poll::Ready(None));
533    }
534
535    #[tokio::test]
536    async fn syslog_5424_format_generates_output() {
537        let mut rx = runit(
538            r#"format = "syslog"
539            count = 5"#,
540        )
541        .await;
542
543        for _ in 0..5 {
544            assert!(poll!(rx.next()).is_ready());
545        }
546        assert_eq!(poll!(rx.next()), Poll::Ready(None));
547    }
548
549    #[tokio::test]
550    async fn syslog_3164_format_generates_output() {
551        let mut rx = runit(
552            r#"format = "bsd_syslog"
553            count = 5"#,
554        )
555        .await;
556
557        for _ in 0..5 {
558            assert!(poll!(rx.next()).is_ready());
559        }
560        assert_eq!(poll!(rx.next()), Poll::Ready(None));
561    }
562
563    #[tokio::test]
564    async fn json_format_generates_output() {
565        let message_key = log_schema().message_key().unwrap().to_string();
566        let mut rx = runit(
567            r#"format = "json"
568            count = 5"#,
569        )
570        .await;
571
572        for _ in 0..5 {
573            let event = match poll!(rx.next()) {
574                Poll::Ready(event) => event.unwrap(),
575                _ => unreachable!(),
576            };
577            let log = event.as_log();
578            let message = log[&message_key].to_string_lossy();
579            assert!(serde_json::from_str::<serde_json::Value>(&message).is_ok());
580        }
581        assert_eq!(poll!(rx.next()), Poll::Ready(None));
582    }
583}