use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
use vrl::value::Kind;
use crate::{
conditions::AnyCondition,
config::{
DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
TransformOutput,
},
schema,
template::Template,
transforms::Transform,
};
use super::transform::Sample;
#[configurable_component(transform(
"sample",
"Sample events from an event stream based on supplied criteria and at a configurable rate."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct SampleConfig {
#[configurable(metadata(docs::examples = 1500))]
pub rate: u64,
#[configurable(metadata(docs::examples = "message"))]
pub key_field: Option<String>,
#[configurable(metadata(docs::examples = "sample_rate"))]
#[serde(default = "default_sample_rate_key")]
pub sample_rate_key: OptionalValuePath,
#[configurable(metadata(
docs::examples = "{{ service }}",
docs::examples = "{{ hostname }}-{{ service }}"
))]
pub group_by: Option<Template>,
pub exclude: Option<AnyCondition>,
}
impl GenerateConfig for SampleConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
rate: 10,
key_field: None,
group_by: None,
exclude: None::<AnyCondition>,
sample_rate_key: default_sample_rate_key(),
})
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "sample")]
impl TransformConfig for SampleConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
Ok(Transform::function(Sample::new(
Self::NAME.to_string(),
self.rate,
self.key_field.clone(),
self.group_by.clone(),
self.exclude
.as_ref()
.map(|condition| condition.build(&context.enrichment_tables))
.transpose()?,
default_sample_rate_key(),
)))
}
fn input(&self) -> Input {
Input::new(DataType::Log | DataType::Trace)
}
fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
vec![TransformOutput::new(
DataType::Log | DataType::Trace,
input_definitions
.iter()
.map(|(output, definition)| {
(
output.clone(),
definition.clone().with_source_metadata(
SampleConfig::NAME,
Some(LegacyKey::Overwrite(owned_value_path!("sample_rate"))),
&owned_value_path!("sample_rate"),
Kind::bytes(),
None,
),
)
})
.collect(),
)]
}
}
pub fn default_sample_rate_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("sample_rate"))
}
#[cfg(test)]
mod tests {
use crate::transforms::sample::config::SampleConfig;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<SampleConfig>();
}
}