vector/transforms/throttle/
config.rs

1use std::time::Duration;
2
3use governor::clock;
4use serde_with::serde_as;
5use vector_lib::{
6    config::{LogNamespace, clone_input_definitions},
7    configurable::configurable_component,
8};
9
10use super::transform::Throttle;
11use crate::{
12    conditions::AnyCondition,
13    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
14    schema,
15    template::Template,
16    transforms::Transform,
17};
18
19/// Configuration of internal metrics for the Throttle transform.
20#[configurable_component]
21#[derive(Clone, Debug, PartialEq, Eq, Default)]
22#[serde(deny_unknown_fields)]
23pub struct ThrottleInternalMetricsConfig {
24    /// Whether or not to emit the `events_discarded_total` internal metric with the `key` tag.
25    ///
26    /// If true, the counter will be incremented for each discarded event, including the key value
27    /// associated with the discarded event. If false, the counter will not be emitted. Instead, the
28    /// number of discarded events can be seen through the `component_discarded_events_total` internal
29    /// metric.
30    ///
31    /// Note that this defaults to false because the `key` tag has potentially unbounded cardinality.
32    /// Only set this to true if you know that the number of unique keys is bounded.
33    #[serde(default)]
34    pub emit_events_discarded_per_key: bool,
35}
36
37/// Configuration for the `throttle` transform.
38#[serde_as]
39#[configurable_component(transform("throttle", "Rate limit logs passing through a topology."))]
40#[derive(Clone, Debug, Default)]
41#[serde(deny_unknown_fields)]
42pub struct ThrottleConfig {
43    /// The number of events allowed for a given bucket per configured `window_secs`.
44    ///
45    /// Each unique key has its own `threshold`.
46    pub threshold: u32,
47
48    /// The time window in which the configured `threshold` is applied, in seconds.
49    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
50    #[configurable(metadata(docs::human_name = "Time Window"))]
51    pub window_secs: Duration,
52
53    /// The value to group events into separate buckets to be rate limited independently.
54    ///
55    /// If left unspecified, or if the event doesn't have `key_field`, then the event is not rate
56    /// limited separately.
57    #[configurable(metadata(docs::examples = "{{ message }}", docs::examples = "{{ hostname }}",))]
58    pub key_field: Option<Template>,
59
60    /// A logical condition used to exclude events from sampling.
61    pub exclude: Option<AnyCondition>,
62
63    #[configurable(derived)]
64    #[serde(default)]
65    pub internal_metrics: ThrottleInternalMetricsConfig,
66}
67
68impl_generate_config_from_default!(ThrottleConfig);
69
70#[async_trait::async_trait]
71#[typetag::serde(name = "throttle")]
72impl TransformConfig for ThrottleConfig {
73    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
74        Throttle::new(self, context, clock::MonotonicClock).map(Transform::event_task)
75    }
76
77    fn input(&self) -> Input {
78        Input::log()
79    }
80
81    fn outputs(
82        &self,
83        _: vector_lib::enrichment::TableRegistry,
84        input_definitions: &[(OutputId, schema::Definition)],
85        _: LogNamespace,
86    ) -> Vec<TransformOutput> {
87        // The event is not modified, so the definition is passed through as-is
88        vec![TransformOutput::new(
89            DataType::Log,
90            clone_input_definitions(input_definitions),
91        )]
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::ThrottleConfig;
98
99    #[test]
100    fn generate_config() {
101        crate::test_util::test_generate_config::<ThrottleConfig>();
102    }
103}