vector/sources/
demo_logs.rs

1use std::task::Poll;
2
3use chrono::Utc;
4use fakedata::logs::*;
5use futures::StreamExt;
6use rand::prelude::IndexedRandom;
7use serde_with::serde_as;
8use snafu::Snafu;
9use tokio::time::{self, Duration};
10use vector_lib::{
11    EstimatedJsonEncodedSizeOf,
12    codecs::{
13        DecoderFramedRead, StreamDecodingError,
14        decoding::{DeserializerConfig, FramingConfig},
15    },
16    config::{DataType, LegacyKey, LogNamespace},
17    configurable::configurable_component,
18    internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
19    lookup::{owned_value_path, path},
20};
21use vrl::value::Kind;
22
23use crate::{
24    SourceSender,
25    codecs::{Decoder, DecodingConfig},
26    config::{SourceConfig, SourceContext, SourceOutput},
27    internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
28    serde::{default_decoding, default_framing_message_based},
29    shutdown::ShutdownSignal,
30};
31
32/// Configuration for the `demo_logs` source.
33#[serde_as]
34#[configurable_component(source(
35    "demo_logs",
36    "Generate fake log events, which can be useful for testing and demos."
37))]
38#[derive(Clone, Debug, Derivative)]
39#[derivative(Default)]
40pub struct DemoLogsConfig {
41    /// The amount of time, in seconds, to pause between each batch of output lines.
42    ///
43    /// The default is one batch per second. To remove the delay and output batches as quickly as possible, set
44    /// `interval` to `0.0`.
45    #[serde(alias = "batch_interval")]
46    #[derivative(Default(value = "default_interval()"))]
47    #[serde(default = "default_interval")]
48    #[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
49    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
50    pub interval: Duration,
51
52    /// The total number of lines to output.
53    ///
54    /// By default, the source continuously prints logs (infinitely).
55    #[derivative(Default(value = "default_count()"))]
56    #[serde(default = "default_count")]
57    pub count: usize,
58
59    #[serde(flatten)]
60    #[configurable(metadata(
61        docs::enum_tag_description = "The format of the randomly generated output."
62    ))]
63    pub format: OutputFormat,
64
65    #[configurable(derived)]
66    #[derivative(Default(value = "default_framing_message_based()"))]
67    #[serde(default = "default_framing_message_based")]
68    pub framing: FramingConfig,
69
70    #[configurable(derived)]
71    #[derivative(Default(value = "default_decoding()"))]
72    #[serde(default = "default_decoding")]
73    pub decoding: DeserializerConfig,
74
75    /// The namespace to use for logs. This overrides the global setting.
76    #[serde(default)]
77    #[configurable(metadata(docs::hidden))]
78    pub log_namespace: Option<bool>,
79}
80
81const fn default_interval() -> Duration {
82    Duration::from_secs(1)
83}
84
85const fn default_count() -> usize {
86    isize::MAX as usize
87}
88
89#[derive(Debug, PartialEq, Eq, Snafu)]
90pub enum DemoLogsConfigError {
91    #[snafu(display("A non-empty list of lines is required for the shuffle format"))]
92    ShuffleDemoLogsItemsEmpty,
93}
94
95/// Output format configuration.
96#[configurable_component]
97#[derive(Clone, Debug, Default)]
98#[serde(tag = "format", rename_all = "snake_case")]
99#[configurable(metadata(
100    docs::enum_tag_description = "The format of the randomly generated output."
101))]
102pub enum OutputFormat {
103    /// Lines are chosen at random from the list specified using `lines`.
104    Shuffle {
105        /// If `true`, each output line starts with an increasing sequence number, beginning with 0.
106        #[serde(default)]
107        sequence: bool,
108        /// The list of lines to output.
109        #[configurable(metadata(docs::examples = "lines_example()"))]
110        lines: Vec<String>,
111    },
112
113    /// Randomly generated logs in [Apache common][apache_common] format.
114    ///
115    /// [apache_common]: https://httpd.apache.org/docs/current/logs.html#common
116    ApacheCommon,
117
118    /// Randomly generated logs in [Apache error][apache_error] format.
119    ///
120    /// [apache_error]: https://httpd.apache.org/docs/current/logs.html#errorlog
121    ApacheError,
122
123    /// Randomly generated logs in Syslog format ([RFC 5424][syslog_5424]).
124    ///
125    /// [syslog_5424]: https://tools.ietf.org/html/rfc5424
126    #[serde(alias = "rfc5424")]
127    Syslog,
128
129    /// Randomly generated logs in Syslog format ([RFC 3164][syslog_3164]).
130    ///
131    /// [syslog_3164]: https://tools.ietf.org/html/rfc3164
132    #[serde(alias = "rfc3164")]
133    BsdSyslog,
134
135    /// Randomly generated HTTP server logs in [JSON][json] format.
136    ///
137    /// [json]: https://en.wikipedia.org/wiki/JSON
138    #[default]
139    Json,
140}
141
142const fn lines_example() -> [&'static str; 2] {
143    ["line1", "line2"]
144}
145
146impl OutputFormat {
147    fn generate_line(&self, n: usize) -> String {
148        emit!(DemoLogsEventProcessed);
149
150        match self {
151            Self::Shuffle { sequence, lines } => Self::shuffle_generate(*sequence, lines, n),
152            Self::ApacheCommon => apache_common_log_line(),
153            Self::ApacheError => apache_error_log_line(),
154            Self::Syslog => syslog_5424_log_line(),
155            Self::BsdSyslog => syslog_3164_log_line(),
156            Self::Json => json_log_line(),
157        }
158    }
159
160    fn shuffle_generate(sequence: bool, lines: &[String], n: usize) -> String {
161        // unwrap can be called here because `lines` can't be empty
162        let line = lines.choose(&mut rand::rng()).unwrap();
163
164        if sequence {
165            format!("{n} {line}")
166        } else {
167            line.into()
168        }
169    }
170
171    // Ensures that the `lines` list is non-empty if `Shuffle` is chosen
172    pub(self) const fn validate(&self) -> Result<(), DemoLogsConfigError> {
173        match self {
174            Self::Shuffle { lines, .. } => {
175                if lines.is_empty() {
176                    Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
177                } else {
178                    Ok(())
179                }
180            }
181            _ => Ok(()),
182        }
183    }
184}
185
186impl DemoLogsConfig {
187    #[cfg(test)]
188    pub fn repeat(
189        lines: Vec<String>,
190        count: usize,
191        interval: Duration,
192        log_namespace: Option<bool>,
193    ) -> Self {
194        Self {
195            count,
196            interval,
197            format: OutputFormat::Shuffle {
198                lines,
199                sequence: false,
200            },
201            framing: default_framing_message_based(),
202            decoding: default_decoding(),
203            log_namespace,
204        }
205    }
206}
207
208async fn demo_logs_source(
209    interval: Duration,
210    count: usize,
211    format: OutputFormat,
212    decoder: Decoder,
213    mut shutdown: ShutdownSignal,
214    mut out: SourceSender,
215    log_namespace: LogNamespace,
216) -> Result<(), ()> {
217    let interval: Option<Duration> = (interval != Duration::ZERO).then_some(interval);
218    let mut interval = interval.map(time::interval);
219
220    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
221    let events_received = register!(EventsReceived);
222
223    for n in 0..count {
224        if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
225            break;
226        }
227
228        if let Some(interval) = &mut interval {
229            interval.tick().await;
230        }
231        bytes_received.emit(ByteSize(0));
232
233        let line = format.generate_line(n);
234
235        let mut stream = DecoderFramedRead::new(line.as_bytes(), decoder.clone());
236        while let Some(next) = stream.next().await {
237            match next {
238                Ok((events, _byte_size)) => {
239                    let count = events.len();
240                    let byte_size = events.estimated_json_encoded_size_of();
241                    events_received.emit(CountByteSize(count, byte_size));
242                    let now = Utc::now();
243
244                    let events = events.into_iter().map(|mut event| {
245                        let log = event.as_mut_log();
246                        log_namespace.insert_standard_vector_source_metadata(
247                            log,
248                            DemoLogsConfig::NAME,
249                            now,
250                        );
251                        log_namespace.insert_source_metadata(
252                            DemoLogsConfig::NAME,
253                            log,
254                            Some(LegacyKey::InsertIfEmpty(path!("service"))),
255                            path!("service"),
256                            "vector",
257                        );
258                        log_namespace.insert_source_metadata(
259                            DemoLogsConfig::NAME,
260                            log,
261                            Some(LegacyKey::InsertIfEmpty(path!("host"))),
262                            path!("host"),
263                            "localhost",
264                        );
265
266                        event
267                    });
268                    out.send_batch(events).await.map_err(|_| {
269                        emit!(StreamClosedError { count });
270                    })?;
271                }
272                Err(error) => {
273                    // Error is logged by `vector_lib::codecs::Decoder`, no further
274                    // handling is needed here.
275                    if !error.can_continue() {
276                        break;
277                    }
278                }
279            }
280        }
281    }
282
283    Ok(())
284}
285
286impl_generate_config_from_default!(DemoLogsConfig);
287
288#[async_trait::async_trait]
289#[typetag::serde(name = "demo_logs")]
290impl SourceConfig for DemoLogsConfig {
291    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
292        let log_namespace = cx.log_namespace(self.log_namespace);
293
294        self.format.validate()?;
295        let decoder =
296            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
297                .build()?;
298        Ok(Box::pin(demo_logs_source(
299            self.interval,
300            self.count,
301            self.format.clone(),
302            decoder,
303            cx.shutdown,
304            cx.out,
305            log_namespace,
306        )))
307    }
308
309    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
310        // There is a global and per-source `log_namespace` config. The source config overrides the global setting,
311        // and is merged here.
312        let log_namespace = global_log_namespace.merge(self.log_namespace);
313
314        let schema_definition = self
315            .decoding
316            .schema_definition(log_namespace)
317            .with_standard_vector_source_metadata()
318            .with_source_metadata(
319                DemoLogsConfig::NAME,
320                Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
321                &owned_value_path!("service"),
322                Kind::bytes(),
323                Some("service"),
324            );
325
326        vec![SourceOutput::new_maybe_logs(
327            DataType::Log,
328            schema_definition,
329        )]
330    }
331
332    fn can_acknowledge(&self) -> bool {
333        false
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use std::time::{Duration, Instant};
340
341    use futures::{Stream, StreamExt, poll};
342
343    use super::*;
344    use crate::{
345        SourceSender,
346        config::log_schema,
347        event::Event,
348        shutdown::ShutdownSignal,
349        test_util::components::{SOURCE_TAGS, assert_source_compliance},
350    };
351
352    #[test]
353    fn generate_config() {
354        crate::test_util::test_generate_config::<DemoLogsConfig>();
355    }
356
357    async fn runit(config: &str) -> impl Stream<Item = Event> + use<> {
358        assert_source_compliance(&SOURCE_TAGS, async {
359            let (tx, rx) = SourceSender::new_test();
360            let config: DemoLogsConfig = toml::from_str(config).unwrap();
361            let decoder = DecodingConfig::new(
362                default_framing_message_based(),
363                default_decoding(),
364                LogNamespace::Legacy,
365            )
366            .build()
367            .unwrap();
368            demo_logs_source(
369                config.interval,
370                config.count,
371                config.format,
372                decoder,
373                ShutdownSignal::noop(),
374                tx,
375                LogNamespace::Legacy,
376            )
377            .await
378            .unwrap();
379
380            rx
381        })
382        .await
383    }
384
385    #[test]
386    fn config_shuffle_lines_not_empty() {
387        let empty_lines: Vec<String> = Vec::new();
388
389        let errant_config = DemoLogsConfig {
390            format: OutputFormat::Shuffle {
391                sequence: false,
392                lines: empty_lines,
393            },
394            ..DemoLogsConfig::default()
395        };
396
397        assert_eq!(
398            errant_config.format.validate(),
399            Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
400        );
401    }
402
403    #[tokio::test]
404    async fn shuffle_demo_logs_copies_lines() {
405        let message_key = log_schema().message_key().unwrap().to_string();
406        let mut rx = runit(
407            r#"format = "shuffle"
408               lines = ["one", "two", "three", "four"]
409               count = 5"#,
410        )
411        .await;
412
413        let lines = &["one", "two", "three", "four"];
414
415        for _ in 0..5 {
416            let event = match poll!(rx.next()) {
417                Poll::Ready(event) => event.unwrap(),
418                _ => unreachable!(),
419            };
420            let log = event.as_log();
421            let message = log[&message_key].to_string_lossy();
422            assert!(lines.contains(&&*message));
423        }
424
425        assert_eq!(poll!(rx.next()), Poll::Ready(None));
426    }
427
428    #[tokio::test]
429    async fn shuffle_demo_logs_limits_count() {
430        let mut rx = runit(
431            r#"format = "shuffle"
432               lines = ["one", "two"]
433               count = 5"#,
434        )
435        .await;
436
437        for _ in 0..5 {
438            assert!(poll!(rx.next()).is_ready());
439        }
440        assert_eq!(poll!(rx.next()), Poll::Ready(None));
441    }
442
443    #[tokio::test]
444    async fn shuffle_demo_logs_adds_sequence() {
445        let message_key = log_schema().message_key().unwrap().to_string();
446        let mut rx = runit(
447            r#"format = "shuffle"
448               lines = ["one", "two"]
449               sequence = true
450               count = 5"#,
451        )
452        .await;
453
454        for n in 0..5 {
455            let event = match poll!(rx.next()) {
456                Poll::Ready(event) => event.unwrap(),
457                _ => unreachable!(),
458            };
459            let log = event.as_log();
460            let message = log[&message_key].to_string_lossy();
461            assert!(message.starts_with(&n.to_string()));
462        }
463
464        assert_eq!(poll!(rx.next()), Poll::Ready(None));
465    }
466
467    #[tokio::test]
468    async fn shuffle_demo_logs_obeys_interval() {
469        let start = Instant::now();
470        let mut rx = runit(
471            r#"format = "shuffle"
472               lines = ["one", "two"]
473               count = 3
474               interval = 1.0"#,
475        )
476        .await;
477
478        for _ in 0..3 {
479            assert!(poll!(rx.next()).is_ready());
480        }
481        assert_eq!(poll!(rx.next()), Poll::Ready(None));
482
483        let duration = start.elapsed();
484        assert!(duration >= Duration::from_secs(2));
485    }
486
487    #[tokio::test]
488    async fn host_is_set() {
489        let host_key = log_schema().host_key().unwrap().to_string();
490        let mut rx = runit(
491            r#"format = "syslog"
492            count = 5"#,
493        )
494        .await;
495
496        let event = match poll!(rx.next()) {
497            Poll::Ready(event) => event.unwrap(),
498            _ => unreachable!(),
499        };
500        let log = event.as_log();
501        let host = log[&host_key].to_string_lossy();
502        assert_eq!("localhost", host);
503    }
504
505    #[tokio::test]
506    async fn apache_common_format_generates_output() {
507        let mut rx = runit(
508            r#"format = "apache_common"
509            count = 5"#,
510        )
511        .await;
512
513        for _ in 0..5 {
514            assert!(poll!(rx.next()).is_ready());
515        }
516        assert_eq!(poll!(rx.next()), Poll::Ready(None));
517    }
518
519    #[tokio::test]
520    async fn apache_error_format_generates_output() {
521        let mut rx = runit(
522            r#"format = "apache_error"
523            count = 5"#,
524        )
525        .await;
526
527        for _ in 0..5 {
528            assert!(poll!(rx.next()).is_ready());
529        }
530        assert_eq!(poll!(rx.next()), Poll::Ready(None));
531    }
532
533    #[tokio::test]
534    async fn syslog_5424_format_generates_output() {
535        let mut rx = runit(
536            r#"format = "syslog"
537            count = 5"#,
538        )
539        .await;
540
541        for _ in 0..5 {
542            assert!(poll!(rx.next()).is_ready());
543        }
544        assert_eq!(poll!(rx.next()), Poll::Ready(None));
545    }
546
547    #[tokio::test]
548    async fn syslog_3164_format_generates_output() {
549        let mut rx = runit(
550            r#"format = "bsd_syslog"
551            count = 5"#,
552        )
553        .await;
554
555        for _ in 0..5 {
556            assert!(poll!(rx.next()).is_ready());
557        }
558        assert_eq!(poll!(rx.next()), Poll::Ready(None));
559    }
560
561    #[tokio::test]
562    async fn json_format_generates_output() {
563        let message_key = log_schema().message_key().unwrap().to_string();
564        let mut rx = runit(
565            r#"format = "json"
566            count = 5"#,
567        )
568        .await;
569
570        for _ in 0..5 {
571            let event = match poll!(rx.next()) {
572                Poll::Ready(event) => event.unwrap(),
573                _ => unreachable!(),
574            };
575            let log = event.as_log();
576            let message = log[&message_key].to_string_lossy();
577            assert!(serde_json::from_str::<serde_json::Value>(&message).is_ok());
578        }
579        assert_eq!(poll!(rx.next()), Poll::Ready(None));
580    }
581}