vector/transforms/sample/
config.rs1use snafu::Snafu;
2use vector_lib::config::{LegacyKey, LogNamespace};
3use vector_lib::configurable::configurable_component;
4use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
5use vrl::value::Kind;
6
7use crate::{
8 conditions::AnyCondition,
9 config::{
10 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
11 TransformOutput,
12 },
13 schema,
14 template::Template,
15 transforms::Transform,
16};
17
18use super::transform::{Sample, SampleMode};
19
20#[derive(Debug, Snafu)]
21pub enum SampleError {
22 #[snafu(display(
24 "Only positive, non-zero numbers are allowed values for `ratio`, value: {ratio}"
25 ))]
26 InvalidRatio { ratio: f64 },
27
28 #[snafu(display("Only non-zero numbers are allowed values for `rate`"))]
29 InvalidRate,
30
31 #[snafu(display(
32 "Exactly one value must be provided for either 'rate' or 'ratio', but not both"
33 ))]
34 InvalidConfiguration,
35}
36
37#[configurable_component(transform(
39 "sample",
40 "Sample events from an event stream based on supplied criteria and at a configurable rate."
41))]
42#[derive(Clone, Debug)]
43#[serde(deny_unknown_fields)]
44pub struct SampleConfig {
45 #[configurable(metadata(docs::examples = 1500))]
51 pub rate: Option<u64>,
52
53 #[configurable(metadata(docs::examples = 0.13))]
60 #[configurable(validation(range(min = 0.0, max = 1.0)))]
61 pub ratio: Option<f64>,
62
63 #[configurable(metadata(docs::examples = "message"))]
75 pub key_field: Option<String>,
76
77 #[configurable(metadata(docs::examples = "sample_rate"))]
79 #[serde(default = "default_sample_rate_key")]
80 pub sample_rate_key: OptionalValuePath,
81
82 #[configurable(metadata(
87 docs::examples = "{{ service }}",
88 docs::examples = "{{ hostname }}-{{ service }}"
89 ))]
90 pub group_by: Option<Template>,
91
92 pub exclude: Option<AnyCondition>,
94}
95
96impl SampleConfig {
97 fn sample_rate(&self) -> Result<SampleMode, SampleError> {
98 match (self.rate, self.ratio) {
99 (None, Some(ratio)) => {
100 if ratio <= 0.0 {
101 Err(SampleError::InvalidRatio { ratio })
102 } else {
103 Ok(SampleMode::new_ratio(ratio))
104 }
105 }
106 (Some(rate), None) => {
107 if rate == 0 {
108 Err(SampleError::InvalidRate)
109 } else {
110 Ok(SampleMode::new_rate(rate))
111 }
112 }
113 _ => Err(SampleError::InvalidConfiguration),
114 }
115 }
116}
117
118impl GenerateConfig for SampleConfig {
119 fn generate_config() -> toml::Value {
120 toml::Value::try_from(Self {
121 rate: None,
122 ratio: Some(0.1),
123 key_field: None,
124 group_by: None,
125 exclude: None::<AnyCondition>,
126 sample_rate_key: default_sample_rate_key(),
127 })
128 .unwrap()
129 }
130}
131
132#[async_trait::async_trait]
133#[typetag::serde(name = "sample")]
134impl TransformConfig for SampleConfig {
135 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
136 Ok(Transform::function(Sample::new(
137 Self::NAME.to_string(),
138 self.sample_rate()?,
139 self.key_field.clone(),
140 self.group_by.clone(),
141 self.exclude
142 .as_ref()
143 .map(|condition| condition.build(&context.enrichment_tables))
144 .transpose()?,
145 self.sample_rate_key.clone(),
146 )))
147 }
148
149 fn input(&self) -> Input {
150 Input::new(DataType::Log | DataType::Trace)
151 }
152
153 fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
154 self.sample_rate()
155 .map(|_| ())
156 .map_err(|e| vec![e.to_string()])
157 }
158
159 fn outputs(
160 &self,
161 _: vector_lib::enrichment::TableRegistry,
162 input_definitions: &[(OutputId, schema::Definition)],
163 _: LogNamespace,
164 ) -> Vec<TransformOutput> {
165 vec![TransformOutput::new(
166 DataType::Log | DataType::Trace,
167 input_definitions
168 .iter()
169 .map(|(output, definition)| {
170 (
171 output.clone(),
172 definition.clone().with_source_metadata(
173 SampleConfig::NAME,
174 Some(LegacyKey::Overwrite(owned_value_path!("sample_rate"))),
175 &owned_value_path!("sample_rate"),
176 Kind::bytes(),
177 None,
178 ),
179 )
180 })
181 .collect(),
182 )]
183 }
184}
185
186pub fn default_sample_rate_key() -> OptionalValuePath {
187 OptionalValuePath::from(owned_value_path!("sample_rate"))
188}
189
190#[cfg(test)]
191mod tests {
192 use crate::transforms::sample::config::SampleConfig;
193
194 #[test]
195 fn generate_config() {
196 crate::test_util::test_generate_config::<SampleConfig>();
197 }
198}