use std::{num::NonZeroU32, pin::Pin, time::Duration};
use async_stream::stream;
use futures::{Stream, StreamExt};
use governor::{clock, Quota, RateLimiter};
use serde_with::serde_as;
use snafu::Snafu;
use vector_lib::config::{clone_input_definitions, LogNamespace};
use vector_lib::configurable::configurable_component;
use crate::{
conditions::{AnyCondition, Condition},
config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
event::Event,
internal_events::{TemplateRenderingError, ThrottleEventDiscarded},
schema,
template::Template,
transforms::{TaskTransform, Transform},
};
#[configurable_component]
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[serde(deny_unknown_fields)]
pub struct ThrottleInternalMetricsConfig {
#[serde(default)]
pub emit_events_discarded_per_key: bool,
}
#[serde_as]
#[configurable_component(transform("throttle", "Rate limit logs passing through a topology."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct ThrottleConfig {
threshold: u32,
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
#[configurable(metadata(docs::human_name = "Time Window"))]
window_secs: Duration,
#[configurable(metadata(docs::examples = "{{ message }}", docs::examples = "{{ hostname }}",))]
key_field: Option<Template>,
exclude: Option<AnyCondition>,
#[configurable(derived)]
#[serde(default)]
internal_metrics: ThrottleInternalMetricsConfig,
}
impl_generate_config_from_default!(ThrottleConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "throttle")]
impl TransformConfig for ThrottleConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
Throttle::new(self, context, clock::MonotonicClock).map(Transform::event_task)
}
fn input(&self) -> Input {
Input::log()
}
fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
vec![TransformOutput::new(
DataType::Log,
clone_input_definitions(input_definitions),
)]
}
}
#[derive(Clone)]
pub struct Throttle<C: clock::Clock<Instant = I>, I: clock::Reference> {
quota: Quota,
flush_keys_interval: Duration,
key_field: Option<Template>,
exclude: Option<Condition>,
clock: C,
internal_metrics: ThrottleInternalMetricsConfig,
}
impl<C, I> Throttle<C, I>
where
C: clock::Clock<Instant = I>,
I: clock::Reference,
{
pub fn new(
config: &ThrottleConfig,
context: &TransformContext,
clock: C,
) -> crate::Result<Self> {
let flush_keys_interval = config.window_secs;
let threshold = match NonZeroU32::new(config.threshold) {
Some(threshold) => threshold,
None => return Err(Box::new(ConfigError::NonZero)),
};
let quota = match Quota::with_period(Duration::from_secs_f64(
flush_keys_interval.as_secs_f64() / f64::from(threshold.get()),
)) {
Some(quota) => quota.allow_burst(threshold),
None => return Err(Box::new(ConfigError::NonZero)),
};
let exclude = config
.exclude
.as_ref()
.map(|condition| condition.build(&context.enrichment_tables))
.transpose()?;
Ok(Self {
quota,
clock,
flush_keys_interval,
key_field: config.key_field.clone(),
exclude,
internal_metrics: config.internal_metrics.clone(),
})
}
}
impl<C, I> TaskTransform<Event> for Throttle<C, I>
where
C: clock::Clock<Instant = I> + Send + 'static + Clone,
I: clock::Reference + Send + 'static,
{
fn transform(
self: Box<Self>,
mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
) -> Pin<Box<dyn Stream<Item = Event> + Send>>
where
Self: 'static,
{
let mut flush_keys = tokio::time::interval(self.flush_keys_interval * 2);
let limiter = RateLimiter::dashmap_with_clock(self.quota, self.clock.clone());
Box::pin(stream! {
loop {
let done = tokio::select! {
biased;
maybe_event = input_rx.next() => {
match maybe_event {
None => true,
Some(event) => {
let (throttle, event) = match self.exclude.as_ref() {
Some(condition) => {
let (result, event) = condition.check(event);
(!result, event)
},
_ => (true, event)
};
let output = if throttle {
let key = self.key_field.as_ref().and_then(|t| {
t.render_string(&event)
.map_err(|error| {
emit!(TemplateRenderingError {
error,
field: Some("key_field"),
drop_event: false,
})
})
.ok()
});
match limiter.check_key(&key) {
Ok(()) => {
Some(event)
}
_ => {
emit!(ThrottleEventDiscarded{
key: key.unwrap_or_else(|| "None".to_string()),
emit_events_discarded_per_key: self.internal_metrics.emit_events_discarded_per_key
});
None
}
}
} else {
Some(event)
};
if let Some(event) = output {
yield event;
}
false
}
}
}
_ = flush_keys.tick() => {
limiter.retain_recent();
false
}
};
if done { break }
}
})
}
}
#[derive(Debug, Snafu)]
pub enum ConfigError {
#[snafu(display("`threshold`, and `window_secs` must be non-zero"))]
NonZero,
}
#[cfg(test)]
mod tests {
use std::task::Poll;
use futures::SinkExt;
use super::*;
use crate::{
event::LogEvent, test_util::components::assert_transform_compliance,
transforms::test::create_topology,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<ThrottleConfig>();
}
#[tokio::test]
async fn throttle_events() {
let clock = clock::FakeRelativeClock::default();
let config = toml::from_str::<ThrottleConfig>(
r"
threshold = 2
window_secs = 5
",
)
.unwrap();
let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
.map(Transform::event_task)
.unwrap();
let throttle = throttle.into_task();
let (mut tx, rx) = futures::channel::mpsc::channel(10);
let mut out_stream = throttle.transform_events(Box::pin(rx));
assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
tx.send(LogEvent::default().into()).await.unwrap();
tx.send(LogEvent::default().into()).await.unwrap();
let mut count = 0_u8;
while count < 2 {
if let Some(_event) = out_stream.next().await {
count += 1;
} else {
panic!("Unexpectedly received None in output stream");
}
}
assert_eq!(2, count);
clock.advance(Duration::from_secs(2));
tx.send(LogEvent::default().into()).await.unwrap();
assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
clock.advance(Duration::from_secs(3));
tx.send(LogEvent::default().into()).await.unwrap();
if let Some(_event) = out_stream.next().await {
} else {
panic!("Unexpectedly received None in output stream");
}
assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
tx.disconnect();
assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
}
#[tokio::test]
async fn throttle_exclude() {
let clock = clock::FakeRelativeClock::default();
let config = toml::from_str::<ThrottleConfig>(
r#"
threshold = 2
window_secs = 5
exclude = """
exists(.special)
"""
"#,
)
.unwrap();
let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
.map(Transform::event_task)
.unwrap();
let throttle = throttle.into_task();
let (mut tx, rx) = futures::channel::mpsc::channel(10);
let mut out_stream = throttle.transform_events(Box::pin(rx));
assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
tx.send(LogEvent::default().into()).await.unwrap();
tx.send(LogEvent::default().into()).await.unwrap();
let mut count = 0_u8;
while count < 2 {
if let Some(_event) = out_stream.next().await {
count += 1;
} else {
panic!("Unexpectedly received None in output stream");
}
}
assert_eq!(2, count);
clock.advance(Duration::from_secs(2));
tx.send(LogEvent::default().into()).await.unwrap();
assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
let mut special_log = LogEvent::default();
special_log.insert("special", "true");
tx.send(special_log.into()).await.unwrap();
if let Some(_event) = out_stream.next().await {
} else {
panic!("Unexpectedly received None in output stream");
}
clock.advance(Duration::from_secs(3));
tx.send(LogEvent::default().into()).await.unwrap();
if let Some(_event) = out_stream.next().await {
} else {
panic!("Unexpectedly received None in output stream");
}
assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
tx.disconnect();
assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
}
#[tokio::test]
async fn throttle_buckets() {
let clock = clock::FakeRelativeClock::default();
let config = toml::from_str::<ThrottleConfig>(
r#"
threshold = 1
window_secs = 5
key_field = "{{ bucket }}"
"#,
)
.unwrap();
let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
.map(Transform::event_task)
.unwrap();
let throttle = throttle.into_task();
let (mut tx, rx) = futures::channel::mpsc::channel(10);
let mut out_stream = throttle.transform_events(Box::pin(rx));
assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
let mut log_a = LogEvent::default();
log_a.insert("bucket", "a");
let mut log_b = LogEvent::default();
log_b.insert("bucket", "b");
tx.send(log_a.into()).await.unwrap();
tx.send(log_b.into()).await.unwrap();
let mut count = 0_u8;
while count < 2 {
if let Some(_event) = out_stream.next().await {
count += 1;
} else {
panic!("Unexpectedly received None in output stream");
}
}
assert_eq!(2, count);
assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
tx.disconnect();
assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
}
#[tokio::test]
async fn emits_internal_events() {
assert_transform_compliance(async move {
let config = ThrottleConfig {
threshold: 1,
window_secs: Duration::from_secs_f64(1.0),
key_field: None,
exclude: None,
internal_metrics: Default::default(),
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
let log = LogEvent::from("hello world");
tx.send(log.into()).await.unwrap();
_ = out.recv().await;
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await
}
}