vector/transforms/
filter.rs1use vector_lib::{
2    config::{LogNamespace, clone_input_definitions},
3    configurable::configurable_component,
4    internal_event::{Count, InternalEventHandle as _, Registered},
5};
6
7use crate::{
8    conditions::{AnyCondition, Condition},
9    config::{
10        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
11        TransformOutput,
12    },
13    event::Event,
14    internal_events::FilterEventsDropped,
15    schema,
16    transforms::{FunctionTransform, OutputBuffer, Transform},
17};
18
19#[configurable_component(transform("filter", "Filter events based on a set of conditions."))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub struct FilterConfig {
24    #[configurable(derived)]
25    condition: AnyCondition,
29}
30
31impl From<AnyCondition> for FilterConfig {
32    fn from(condition: AnyCondition) -> Self {
33        Self { condition }
34    }
35}
36
37impl GenerateConfig for FilterConfig {
38    fn generate_config() -> toml::Value {
39        toml::from_str(r#"condition = ".message == \"value\"""#).unwrap()
40    }
41}
42
43#[async_trait::async_trait]
44#[typetag::serde(name = "filter")]
45impl TransformConfig for FilterConfig {
46    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
47        Ok(Transform::function(Filter::new(
48            self.condition.build(&context.enrichment_tables)?,
49        )))
50    }
51
52    fn input(&self) -> Input {
53        Input::all()
54    }
55
56    fn outputs(
57        &self,
58        _enrichment_tables: vector_lib::enrichment::TableRegistry,
59        input_definitions: &[(OutputId, schema::Definition)],
60        _: LogNamespace,
61    ) -> Vec<TransformOutput> {
62        vec![TransformOutput::new(
63            DataType::all_bits(),
64            clone_input_definitions(input_definitions),
65        )]
66    }
67
68    fn enable_concurrency(&self) -> bool {
69        true
70    }
71}
72
73#[derive(Clone)]
74pub struct Filter {
75    condition: Condition,
76    events_dropped: Registered<FilterEventsDropped>,
77}
78
79impl Filter {
80    pub fn new(condition: Condition) -> Self {
81        Self {
82            condition,
83            events_dropped: register!(FilterEventsDropped),
84        }
85    }
86}
87
88impl FunctionTransform for Filter {
89    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
90        let (result, event) = self.condition.check(event);
91        if result {
92            output.push(event);
93        } else {
94            self.events_dropped.emit(Count(1));
95        }
96    }
97}
98
99#[cfg(test)]
100mod test {
101    use std::sync::Arc;
102
103    use tokio::sync::mpsc;
104    use tokio_stream::wrappers::ReceiverStream;
105    use vector_lib::{
106        config::ComponentKey,
107        event::{Metric, MetricKind, MetricValue},
108    };
109
110    use super::*;
111    use crate::{
112        conditions::ConditionConfig,
113        config::schema::Definition,
114        event::{Event, LogEvent},
115        test_util::components::assert_transform_compliance,
116        transforms::test::create_topology,
117    };
118
119    #[test]
120    fn generate_config() {
121        crate::test_util::test_generate_config::<super::FilterConfig>();
122    }
123
124    #[tokio::test]
125    async fn filter_basic() {
126        assert_transform_compliance(async {
127            let transform_config = FilterConfig::from(AnyCondition::from(ConditionConfig::IsLog));
128
129            let (tx, rx) = mpsc::channel(1);
130            let (topology, mut out) =
131                create_topology(ReceiverStream::new(rx), transform_config).await;
132
133            let mut log = Event::from(LogEvent::from("message"));
134            tx.send(log.clone()).await.unwrap();
135
136            log.set_source_id(Arc::new(ComponentKey::from("in")));
137            log.set_upstream_id(Arc::new(OutputId::from("transform")));
138            log.metadata_mut()
139                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
140
141            assert_eq!(out.recv().await.unwrap(), log);
142
143            let metric = Event::from(Metric::new(
144                "test metric",
145                MetricKind::Incremental,
146                MetricValue::Counter { value: 1.0 },
147            ));
148            tx.send(metric).await.unwrap();
149
150            drop(tx);
151            topology.stop().await;
152            assert_eq!(out.recv().await, None);
153        })
154        .await;
155    }
156}