vector/sources/
demo_logs.rs

1use chrono::Utc;
2use fakedata::logs::*;
3use futures::StreamExt;
4use rand::prelude::IndexedRandom;
5use serde_with::serde_as;
6use snafu::Snafu;
7use std::task::Poll;
8use tokio::time::{self, Duration};
9use tokio_util::codec::FramedRead;
10use vector_lib::configurable::configurable_component;
11use vector_lib::internal_event::{
12    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
13};
14use vector_lib::lookup::{owned_value_path, path};
15use vector_lib::{
16    codecs::{
17        decoding::{DeserializerConfig, FramingConfig},
18        StreamDecodingError,
19    },
20    config::DataType,
21};
22use vector_lib::{
23    config::{LegacyKey, LogNamespace},
24    EstimatedJsonEncodedSizeOf,
25};
26use vrl::value::Kind;
27
28use crate::{
29    codecs::{Decoder, DecodingConfig},
30    config::{SourceConfig, SourceContext, SourceOutput},
31    internal_events::{DemoLogsEventProcessed, EventsReceived, StreamClosedError},
32    serde::{default_decoding, default_framing_message_based},
33    shutdown::ShutdownSignal,
34    SourceSender,
35};
36
37/// Configuration for the `demo_logs` source.
38#[serde_as]
39#[configurable_component(source(
40    "demo_logs",
41    "Generate fake log events, which can be useful for testing and demos."
42))]
43#[derive(Clone, Debug, Derivative)]
44#[derivative(Default)]
45pub struct DemoLogsConfig {
46    /// The amount of time, in seconds, to pause between each batch of output lines.
47    ///
48    /// The default is one batch per second. To remove the delay and output batches as quickly as possible, set
49    /// `interval` to `0.0`.
50    #[serde(alias = "batch_interval")]
51    #[derivative(Default(value = "default_interval()"))]
52    #[serde(default = "default_interval")]
53    #[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
54    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
55    pub interval: Duration,
56
57    /// The total number of lines to output.
58    ///
59    /// By default, the source continuously prints logs (infinitely).
60    #[derivative(Default(value = "default_count()"))]
61    #[serde(default = "default_count")]
62    pub count: usize,
63
64    #[serde(flatten)]
65    #[configurable(metadata(
66        docs::enum_tag_description = "The format of the randomly generated output."
67    ))]
68    pub format: OutputFormat,
69
70    #[configurable(derived)]
71    #[derivative(Default(value = "default_framing_message_based()"))]
72    #[serde(default = "default_framing_message_based")]
73    pub framing: FramingConfig,
74
75    #[configurable(derived)]
76    #[derivative(Default(value = "default_decoding()"))]
77    #[serde(default = "default_decoding")]
78    pub decoding: DeserializerConfig,
79
80    /// The namespace to use for logs. This overrides the global setting.
81    #[serde(default)]
82    #[configurable(metadata(docs::hidden))]
83    pub log_namespace: Option<bool>,
84}
85
86const fn default_interval() -> Duration {
87    Duration::from_secs(1)
88}
89
90const fn default_count() -> usize {
91    isize::MAX as usize
92}
93
94#[derive(Debug, PartialEq, Eq, Snafu)]
95pub enum DemoLogsConfigError {
96    #[snafu(display("A non-empty list of lines is required for the shuffle format"))]
97    ShuffleDemoLogsItemsEmpty,
98}
99
100/// Output format configuration.
101#[configurable_component]
102#[derive(Clone, Debug, Derivative)]
103#[derivative(Default)]
104#[serde(tag = "format", rename_all = "snake_case")]
105#[configurable(metadata(
106    docs::enum_tag_description = "The format of the randomly generated output."
107))]
108pub enum OutputFormat {
109    /// Lines are chosen at random from the list specified using `lines`.
110    Shuffle {
111        /// If `true`, each output line starts with an increasing sequence number, beginning with 0.
112        #[serde(default)]
113        sequence: bool,
114        /// The list of lines to output.
115        #[configurable(metadata(docs::examples = "lines_example()"))]
116        lines: Vec<String>,
117    },
118
119    /// Randomly generated logs in [Apache common][apache_common] format.
120    ///
121    /// [apache_common]: https://httpd.apache.org/docs/current/logs.html#common
122    ApacheCommon,
123
124    /// Randomly generated logs in [Apache error][apache_error] format.
125    ///
126    /// [apache_error]: https://httpd.apache.org/docs/current/logs.html#errorlog
127    ApacheError,
128
129    /// Randomly generated logs in Syslog format ([RFC 5424][syslog_5424]).
130    ///
131    /// [syslog_5424]: https://tools.ietf.org/html/rfc5424
132    #[serde(alias = "rfc5424")]
133    Syslog,
134
135    /// Randomly generated logs in Syslog format ([RFC 3164][syslog_3164]).
136    ///
137    /// [syslog_3164]: https://tools.ietf.org/html/rfc3164
138    #[serde(alias = "rfc3164")]
139    BsdSyslog,
140
141    /// Randomly generated HTTP server logs in [JSON][json] format.
142    ///
143    /// [json]: https://en.wikipedia.org/wiki/JSON
144    #[derivative(Default)]
145    Json,
146}
147
148const fn lines_example() -> [&'static str; 2] {
149    ["line1", "line2"]
150}
151
152impl OutputFormat {
153    fn generate_line(&self, n: usize) -> String {
154        emit!(DemoLogsEventProcessed);
155
156        match self {
157            Self::Shuffle { sequence, lines } => Self::shuffle_generate(*sequence, lines, n),
158            Self::ApacheCommon => apache_common_log_line(),
159            Self::ApacheError => apache_error_log_line(),
160            Self::Syslog => syslog_5424_log_line(),
161            Self::BsdSyslog => syslog_3164_log_line(),
162            Self::Json => json_log_line(),
163        }
164    }
165
166    fn shuffle_generate(sequence: bool, lines: &[String], n: usize) -> String {
167        // unwrap can be called here because `lines` can't be empty
168        let line = lines.choose(&mut rand::rng()).unwrap();
169
170        if sequence {
171            format!("{n} {line}")
172        } else {
173            line.into()
174        }
175    }
176
177    // Ensures that the `lines` list is non-empty if `Shuffle` is chosen
178    pub(self) fn validate(&self) -> Result<(), DemoLogsConfigError> {
179        match self {
180            Self::Shuffle { lines, .. } => {
181                if lines.is_empty() {
182                    Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
183                } else {
184                    Ok(())
185                }
186            }
187            _ => Ok(()),
188        }
189    }
190}
191
192impl DemoLogsConfig {
193    #[cfg(test)]
194    pub fn repeat(
195        lines: Vec<String>,
196        count: usize,
197        interval: Duration,
198        log_namespace: Option<bool>,
199    ) -> Self {
200        Self {
201            count,
202            interval,
203            format: OutputFormat::Shuffle {
204                lines,
205                sequence: false,
206            },
207            framing: default_framing_message_based(),
208            decoding: default_decoding(),
209            log_namespace,
210        }
211    }
212}
213
214async fn demo_logs_source(
215    interval: Duration,
216    count: usize,
217    format: OutputFormat,
218    decoder: Decoder,
219    mut shutdown: ShutdownSignal,
220    mut out: SourceSender,
221    log_namespace: LogNamespace,
222) -> Result<(), ()> {
223    let interval: Option<Duration> = (interval != Duration::ZERO).then_some(interval);
224    let mut interval = interval.map(time::interval);
225
226    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
227    let events_received = register!(EventsReceived);
228
229    for n in 0..count {
230        if matches!(futures::poll!(&mut shutdown), Poll::Ready(_)) {
231            break;
232        }
233
234        if let Some(interval) = &mut interval {
235            interval.tick().await;
236        }
237        bytes_received.emit(ByteSize(0));
238
239        let line = format.generate_line(n);
240
241        let mut stream = FramedRead::new(line.as_bytes(), decoder.clone());
242        while let Some(next) = stream.next().await {
243            match next {
244                Ok((events, _byte_size)) => {
245                    let count = events.len();
246                    let byte_size = events.estimated_json_encoded_size_of();
247                    events_received.emit(CountByteSize(count, byte_size));
248                    let now = Utc::now();
249
250                    let events = events.into_iter().map(|mut event| {
251                        let log = event.as_mut_log();
252                        log_namespace.insert_standard_vector_source_metadata(
253                            log,
254                            DemoLogsConfig::NAME,
255                            now,
256                        );
257                        log_namespace.insert_source_metadata(
258                            DemoLogsConfig::NAME,
259                            log,
260                            Some(LegacyKey::InsertIfEmpty(path!("service"))),
261                            path!("service"),
262                            "vector",
263                        );
264                        log_namespace.insert_source_metadata(
265                            DemoLogsConfig::NAME,
266                            log,
267                            Some(LegacyKey::InsertIfEmpty(path!("host"))),
268                            path!("host"),
269                            "localhost",
270                        );
271
272                        event
273                    });
274                    out.send_batch(events).await.map_err(|_| {
275                        emit!(StreamClosedError { count });
276                    })?;
277                }
278                Err(error) => {
279                    // Error is logged by `crate::codecs::Decoder`, no further
280                    // handling is needed here.
281                    if !error.can_continue() {
282                        break;
283                    }
284                }
285            }
286        }
287    }
288
289    Ok(())
290}
291
292impl_generate_config_from_default!(DemoLogsConfig);
293
294#[async_trait::async_trait]
295#[typetag::serde(name = "demo_logs")]
296impl SourceConfig for DemoLogsConfig {
297    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
298        let log_namespace = cx.log_namespace(self.log_namespace);
299
300        self.format.validate()?;
301        let decoder =
302            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
303                .build()?;
304        Ok(Box::pin(demo_logs_source(
305            self.interval,
306            self.count,
307            self.format.clone(),
308            decoder,
309            cx.shutdown,
310            cx.out,
311            log_namespace,
312        )))
313    }
314
315    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
316        // There is a global and per-source `log_namespace` config. The source config overrides the global setting,
317        // and is merged here.
318        let log_namespace = global_log_namespace.merge(self.log_namespace);
319
320        let schema_definition = self
321            .decoding
322            .schema_definition(log_namespace)
323            .with_standard_vector_source_metadata()
324            .with_source_metadata(
325                DemoLogsConfig::NAME,
326                Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
327                &owned_value_path!("service"),
328                Kind::bytes(),
329                Some("service"),
330            );
331
332        vec![SourceOutput::new_maybe_logs(
333            DataType::Log,
334            schema_definition,
335        )]
336    }
337
338    fn can_acknowledge(&self) -> bool {
339        false
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use std::time::{Duration, Instant};
346
347    use futures::{poll, Stream, StreamExt};
348
349    use super::*;
350    use crate::{
351        config::log_schema,
352        event::Event,
353        shutdown::ShutdownSignal,
354        test_util::components::{assert_source_compliance, SOURCE_TAGS},
355        SourceSender,
356    };
357
358    #[test]
359    fn generate_config() {
360        crate::test_util::test_generate_config::<DemoLogsConfig>();
361    }
362
363    async fn runit(config: &str) -> impl Stream<Item = Event> + use<> {
364        assert_source_compliance(&SOURCE_TAGS, async {
365            let (tx, rx) = SourceSender::new_test();
366            let config: DemoLogsConfig = toml::from_str(config).unwrap();
367            let decoder = DecodingConfig::new(
368                default_framing_message_based(),
369                default_decoding(),
370                LogNamespace::Legacy,
371            )
372            .build()
373            .unwrap();
374            demo_logs_source(
375                config.interval,
376                config.count,
377                config.format,
378                decoder,
379                ShutdownSignal::noop(),
380                tx,
381                LogNamespace::Legacy,
382            )
383            .await
384            .unwrap();
385
386            rx
387        })
388        .await
389    }
390
391    #[test]
392    fn config_shuffle_lines_not_empty() {
393        let empty_lines: Vec<String> = Vec::new();
394
395        let errant_config = DemoLogsConfig {
396            format: OutputFormat::Shuffle {
397                sequence: false,
398                lines: empty_lines,
399            },
400            ..DemoLogsConfig::default()
401        };
402
403        assert_eq!(
404            errant_config.format.validate(),
405            Err(DemoLogsConfigError::ShuffleDemoLogsItemsEmpty)
406        );
407    }
408
409    #[tokio::test]
410    async fn shuffle_demo_logs_copies_lines() {
411        let message_key = log_schema().message_key().unwrap().to_string();
412        let mut rx = runit(
413            r#"format = "shuffle"
414               lines = ["one", "two", "three", "four"]
415               count = 5"#,
416        )
417        .await;
418
419        let lines = &["one", "two", "three", "four"];
420
421        for _ in 0..5 {
422            let event = match poll!(rx.next()) {
423                Poll::Ready(event) => event.unwrap(),
424                _ => unreachable!(),
425            };
426            let log = event.as_log();
427            let message = log[&message_key].to_string_lossy();
428            assert!(lines.contains(&&*message));
429        }
430
431        assert_eq!(poll!(rx.next()), Poll::Ready(None));
432    }
433
434    #[tokio::test]
435    async fn shuffle_demo_logs_limits_count() {
436        let mut rx = runit(
437            r#"format = "shuffle"
438               lines = ["one", "two"]
439               count = 5"#,
440        )
441        .await;
442
443        for _ in 0..5 {
444            assert!(poll!(rx.next()).is_ready());
445        }
446        assert_eq!(poll!(rx.next()), Poll::Ready(None));
447    }
448
449    #[tokio::test]
450    async fn shuffle_demo_logs_adds_sequence() {
451        let message_key = log_schema().message_key().unwrap().to_string();
452        let mut rx = runit(
453            r#"format = "shuffle"
454               lines = ["one", "two"]
455               sequence = true
456               count = 5"#,
457        )
458        .await;
459
460        for n in 0..5 {
461            let event = match poll!(rx.next()) {
462                Poll::Ready(event) => event.unwrap(),
463                _ => unreachable!(),
464            };
465            let log = event.as_log();
466            let message = log[&message_key].to_string_lossy();
467            assert!(message.starts_with(&n.to_string()));
468        }
469
470        assert_eq!(poll!(rx.next()), Poll::Ready(None));
471    }
472
473    #[tokio::test]
474    async fn shuffle_demo_logs_obeys_interval() {
475        let start = Instant::now();
476        let mut rx = runit(
477            r#"format = "shuffle"
478               lines = ["one", "two"]
479               count = 3
480               interval = 1.0"#,
481        )
482        .await;
483
484        for _ in 0..3 {
485            assert!(poll!(rx.next()).is_ready());
486        }
487        assert_eq!(poll!(rx.next()), Poll::Ready(None));
488
489        let duration = start.elapsed();
490        assert!(duration >= Duration::from_secs(2));
491    }
492
493    #[tokio::test]
494    async fn host_is_set() {
495        let host_key = log_schema().host_key().unwrap().to_string();
496        let mut rx = runit(
497            r#"format = "syslog"
498            count = 5"#,
499        )
500        .await;
501
502        let event = match poll!(rx.next()) {
503            Poll::Ready(event) => event.unwrap(),
504            _ => unreachable!(),
505        };
506        let log = event.as_log();
507        let host = log[&host_key].to_string_lossy();
508        assert_eq!("localhost", host);
509    }
510
511    #[tokio::test]
512    async fn apache_common_format_generates_output() {
513        let mut rx = runit(
514            r#"format = "apache_common"
515            count = 5"#,
516        )
517        .await;
518
519        for _ in 0..5 {
520            assert!(poll!(rx.next()).is_ready());
521        }
522        assert_eq!(poll!(rx.next()), Poll::Ready(None));
523    }
524
525    #[tokio::test]
526    async fn apache_error_format_generates_output() {
527        let mut rx = runit(
528            r#"format = "apache_error"
529            count = 5"#,
530        )
531        .await;
532
533        for _ in 0..5 {
534            assert!(poll!(rx.next()).is_ready());
535        }
536        assert_eq!(poll!(rx.next()), Poll::Ready(None));
537    }
538
539    #[tokio::test]
540    async fn syslog_5424_format_generates_output() {
541        let mut rx = runit(
542            r#"format = "syslog"
543            count = 5"#,
544        )
545        .await;
546
547        for _ in 0..5 {
548            assert!(poll!(rx.next()).is_ready());
549        }
550        assert_eq!(poll!(rx.next()), Poll::Ready(None));
551    }
552
553    #[tokio::test]
554    async fn syslog_3164_format_generates_output() {
555        let mut rx = runit(
556            r#"format = "bsd_syslog"
557            count = 5"#,
558        )
559        .await;
560
561        for _ in 0..5 {
562            assert!(poll!(rx.next()).is_ready());
563        }
564        assert_eq!(poll!(rx.next()), Poll::Ready(None));
565    }
566
567    #[tokio::test]
568    async fn json_format_generates_output() {
569        let message_key = log_schema().message_key().unwrap().to_string();
570        let mut rx = runit(
571            r#"format = "json"
572            count = 5"#,
573        )
574        .await;
575
576        for _ in 0..5 {
577            let event = match poll!(rx.next()) {
578                Poll::Ready(event) => event.unwrap(),
579                _ => unreachable!(),
580            };
581            let log = event.as_log();
582            let message = log[&message_key].to_string_lossy();
583            assert!(serde_json::from_str::<serde_json::Value>(&message).is_ok());
584        }
585        assert_eq!(poll!(rx.next()), Poll::Ready(None));
586    }
587}