1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use std::time::Duration;

use futures::{future, FutureExt};
use serde_with::serde_as;
use vector_lib::configurable::configurable_component;

use crate::{
    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
    sinks::{blackhole::sink::BlackholeSink, Healthcheck, VectorSink},
};

const fn default_print_interval_secs() -> Duration {
    Duration::from_secs(0)
}

/// Configuration for the `blackhole` sink.
#[serde_as]
#[configurable_component(sink(
    "blackhole",
    "Send observability events nowhere, which can be useful for debugging purposes."
))]
#[derive(Clone, Debug, Derivative)]
#[serde(deny_unknown_fields, default)]
#[derivative(Default)]
pub struct BlackholeConfig {
    /// The interval between reporting a summary of activity.
    ///
    /// Set to `0` (default) to disable reporting.
    #[derivative(Default(value = "default_print_interval_secs()"))]
    #[serde(default = "default_print_interval_secs")]
    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
    #[configurable(metadata(docs::human_name = "Print Interval"))]
    #[configurable(metadata(docs::examples = 10))]
    pub print_interval_secs: Duration,

    /// The number of events, per second, that the sink is allowed to consume.
    ///
    /// By default, there is no limit.
    #[configurable(metadata(docs::examples = 1000))]
    pub rate: Option<usize>,

    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    pub acknowledgements: AcknowledgementsConfig,
}

#[async_trait::async_trait]
#[typetag::serde(name = "blackhole")]
impl SinkConfig for BlackholeConfig {
    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let sink = BlackholeSink::new(self.clone());
        let healthcheck = future::ok(()).boxed();

        Ok((VectorSink::Stream(Box::new(sink)), healthcheck))
    }

    fn input(&self) -> Input {
        Input::all()
    }

    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}

impl GenerateConfig for BlackholeConfig {
    fn generate_config() -> toml::Value {
        toml::Value::try_from(Self::default()).unwrap()
    }
}

#[cfg(test)]
mod tests {
    use crate::sinks::blackhole::config::BlackholeConfig;

    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<BlackholeConfig>();
    }
}