vector/transforms/throttle/
config.rsuse governor::clock;
use serde_with::serde_as;
use std::time::Duration;
use vector_lib::config::{clone_input_definitions, LogNamespace};
use vector_lib::configurable::configurable_component;
use super::transform::Throttle;
use crate::{
conditions::AnyCondition,
config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
schema,
template::Template,
transforms::Transform,
};
#[configurable_component]
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[serde(deny_unknown_fields)]
pub struct ThrottleInternalMetricsConfig {
#[serde(default)]
pub emit_events_discarded_per_key: bool,
}
#[serde_as]
#[configurable_component(transform("throttle", "Rate limit logs passing through a topology."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct ThrottleConfig {
pub threshold: u32,
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
#[configurable(metadata(docs::human_name = "Time Window"))]
pub window_secs: Duration,
#[configurable(metadata(docs::examples = "{{ message }}", docs::examples = "{{ hostname }}",))]
pub key_field: Option<Template>,
pub exclude: Option<AnyCondition>,
#[configurable(derived)]
#[serde(default)]
pub internal_metrics: ThrottleInternalMetricsConfig,
}
impl_generate_config_from_default!(ThrottleConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "throttle")]
impl TransformConfig for ThrottleConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
Throttle::new(self, context, clock::MonotonicClock).map(Transform::event_task)
}
fn input(&self) -> Input {
Input::log()
}
fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
vec![TransformOutput::new(
DataType::Log,
clone_input_definitions(input_definitions),
)]
}
}
#[cfg(test)]
mod tests {
use super::ThrottleConfig;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<ThrottleConfig>();
}
}