vector/transforms/sample/
config.rs1use snafu::Snafu;
2use vector_lib::{
3 config::{LegacyKey, LogNamespace},
4 configurable::configurable_component,
5 lookup::{lookup_v2::OptionalValuePath, owned_value_path},
6};
7use vrl::value::Kind;
8
9use super::transform::{Sample, SampleMode};
10use crate::{
11 conditions::AnyCondition,
12 config::{
13 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
14 TransformOutput,
15 },
16 schema,
17 template::Template,
18 transforms::Transform,
19};
20
21#[derive(Debug, Snafu)]
22pub enum SampleError {
23 #[snafu(display(
25 "Only positive, non-zero numbers are allowed values for `ratio`, value: {ratio}"
26 ))]
27 InvalidRatio { ratio: f64 },
28
29 #[snafu(display("Only non-zero numbers are allowed values for `rate`"))]
30 InvalidRate,
31
32 #[snafu(display(
33 "Exactly one value must be provided for either 'rate' or 'ratio', but not both"
34 ))]
35 InvalidConfiguration,
36}
37
38#[configurable_component(transform(
40 "sample",
41 "Sample events from an event stream based on supplied criteria and at a configurable rate."
42))]
43#[derive(Clone, Debug)]
44#[serde(deny_unknown_fields)]
45pub struct SampleConfig {
46 #[configurable(metadata(docs::examples = 1500))]
52 pub rate: Option<u64>,
53
54 #[configurable(metadata(docs::examples = 0.13))]
61 #[configurable(validation(range(min = 0.0, max = 1.0)))]
62 pub ratio: Option<f64>,
63
64 #[configurable(metadata(docs::examples = "message"))]
76 pub key_field: Option<String>,
77
78 #[configurable(metadata(docs::examples = "sample_rate"))]
80 #[serde(default = "default_sample_rate_key")]
81 pub sample_rate_key: OptionalValuePath,
82
83 #[configurable(metadata(
88 docs::examples = "{{ service }}",
89 docs::examples = "{{ hostname }}-{{ service }}"
90 ))]
91 pub group_by: Option<Template>,
92
93 pub exclude: Option<AnyCondition>,
95}
96
97impl SampleConfig {
98 fn sample_rate(&self) -> Result<SampleMode, SampleError> {
99 match (self.rate, self.ratio) {
100 (None, Some(ratio)) => {
101 if ratio <= 0.0 {
102 Err(SampleError::InvalidRatio { ratio })
103 } else {
104 Ok(SampleMode::new_ratio(ratio))
105 }
106 }
107 (Some(rate), None) => {
108 if rate == 0 {
109 Err(SampleError::InvalidRate)
110 } else {
111 Ok(SampleMode::new_rate(rate))
112 }
113 }
114 _ => Err(SampleError::InvalidConfiguration),
115 }
116 }
117}
118
119impl GenerateConfig for SampleConfig {
120 fn generate_config() -> toml::Value {
121 toml::Value::try_from(Self {
122 rate: None,
123 ratio: Some(0.1),
124 key_field: None,
125 group_by: None,
126 exclude: None::<AnyCondition>,
127 sample_rate_key: default_sample_rate_key(),
128 })
129 .unwrap()
130 }
131}
132
133#[async_trait::async_trait]
134#[typetag::serde(name = "sample")]
135impl TransformConfig for SampleConfig {
136 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
137 Ok(Transform::function(Sample::new(
138 Self::NAME.to_string(),
139 self.sample_rate()?,
140 self.key_field.clone(),
141 self.group_by.clone(),
142 self.exclude
143 .as_ref()
144 .map(|condition| condition.build(&context.enrichment_tables))
145 .transpose()?,
146 self.sample_rate_key.clone(),
147 )))
148 }
149
150 fn input(&self) -> Input {
151 Input::new(DataType::Log | DataType::Trace)
152 }
153
154 fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
155 self.sample_rate()
156 .map(|_| ())
157 .map_err(|e| vec![e.to_string()])
158 }
159
160 fn outputs(
161 &self,
162 _: vector_lib::enrichment::TableRegistry,
163 input_definitions: &[(OutputId, schema::Definition)],
164 _: LogNamespace,
165 ) -> Vec<TransformOutput> {
166 vec![TransformOutput::new(
167 DataType::Log | DataType::Trace,
168 input_definitions
169 .iter()
170 .map(|(output, definition)| {
171 (
172 output.clone(),
173 definition.clone().with_source_metadata(
174 SampleConfig::NAME,
175 Some(LegacyKey::Overwrite(owned_value_path!("sample_rate"))),
176 &owned_value_path!("sample_rate"),
177 Kind::bytes(),
178 None,
179 ),
180 )
181 })
182 .collect(),
183 )]
184 }
185}
186
187pub fn default_sample_rate_key() -> OptionalValuePath {
188 OptionalValuePath::from(owned_value_path!("sample_rate"))
189}
190
191#[cfg(test)]
192mod tests {
193 use crate::transforms::sample::config::SampleConfig;
194
195 #[test]
196 fn generate_config() {
197 crate::test_util::test_generate_config::<SampleConfig>();
198 }
199}