vector/transforms/
filter.rs

1use vector_lib::config::{clone_input_definitions, LogNamespace};
2use vector_lib::configurable::configurable_component;
3use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered};
4
5use crate::{
6    conditions::{AnyCondition, Condition},
7    config::{
8        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
9        TransformOutput,
10    },
11    event::Event,
12    internal_events::FilterEventsDropped,
13    schema,
14    transforms::{FunctionTransform, OutputBuffer, Transform},
15};
16
17/// Configuration for the `filter` transform.
18#[configurable_component(transform("filter", "Filter events based on a set of conditions."))]
19#[derive(Clone, Debug)]
20#[serde(deny_unknown_fields)]
21pub struct FilterConfig {
22    #[configurable(derived)]
23    /// The condition that every input event is matched against.
24    ///
25    /// If an event is matched by the condition, it is forwarded. Otherwise, the event is dropped.
26    condition: AnyCondition,
27}
28
29impl From<AnyCondition> for FilterConfig {
30    fn from(condition: AnyCondition) -> Self {
31        Self { condition }
32    }
33}
34
35impl GenerateConfig for FilterConfig {
36    fn generate_config() -> toml::Value {
37        toml::from_str(r#"condition = ".message == \"value\"""#).unwrap()
38    }
39}
40
41#[async_trait::async_trait]
42#[typetag::serde(name = "filter")]
43impl TransformConfig for FilterConfig {
44    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
45        Ok(Transform::function(Filter::new(
46            self.condition.build(&context.enrichment_tables)?,
47        )))
48    }
49
50    fn input(&self) -> Input {
51        Input::all()
52    }
53
54    fn outputs(
55        &self,
56        _enrichment_tables: vector_lib::enrichment::TableRegistry,
57        input_definitions: &[(OutputId, schema::Definition)],
58        _: LogNamespace,
59    ) -> Vec<TransformOutput> {
60        vec![TransformOutput::new(
61            DataType::all_bits(),
62            clone_input_definitions(input_definitions),
63        )]
64    }
65
66    fn enable_concurrency(&self) -> bool {
67        true
68    }
69}
70
71#[derive(Clone)]
72pub struct Filter {
73    condition: Condition,
74    events_dropped: Registered<FilterEventsDropped>,
75}
76
77impl Filter {
78    pub fn new(condition: Condition) -> Self {
79        Self {
80            condition,
81            events_dropped: register!(FilterEventsDropped),
82        }
83    }
84}
85
86impl FunctionTransform for Filter {
87    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
88        let (result, event) = self.condition.check(event);
89        if result {
90            output.push(event);
91        } else {
92            self.events_dropped.emit(Count(1));
93        }
94    }
95}
96
97#[cfg(test)]
98mod test {
99    use std::sync::Arc;
100
101    use tokio::sync::mpsc;
102    use tokio_stream::wrappers::ReceiverStream;
103    use vector_lib::config::ComponentKey;
104    use vector_lib::event::{Metric, MetricKind, MetricValue};
105
106    use super::*;
107    use crate::config::schema::Definition;
108    use crate::{
109        conditions::ConditionConfig,
110        event::{Event, LogEvent},
111        test_util::components::assert_transform_compliance,
112        transforms::test::create_topology,
113    };
114
115    #[test]
116    fn generate_config() {
117        crate::test_util::test_generate_config::<super::FilterConfig>();
118    }
119
120    #[tokio::test]
121    async fn filter_basic() {
122        assert_transform_compliance(async {
123            let transform_config = FilterConfig::from(AnyCondition::from(ConditionConfig::IsLog));
124
125            let (tx, rx) = mpsc::channel(1);
126            let (topology, mut out) =
127                create_topology(ReceiverStream::new(rx), transform_config).await;
128
129            let mut log = Event::from(LogEvent::from("message"));
130            tx.send(log.clone()).await.unwrap();
131
132            log.set_source_id(Arc::new(ComponentKey::from("in")));
133            log.set_upstream_id(Arc::new(OutputId::from("transform")));
134            log.metadata_mut()
135                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
136
137            assert_eq!(out.recv().await.unwrap(), log);
138
139            let metric = Event::from(Metric::new(
140                "test metric",
141                MetricKind::Incremental,
142                MetricValue::Counter { value: 1.0 },
143            ));
144            tx.send(metric).await.unwrap();
145
146            drop(tx);
147            topology.stop().await;
148            assert_eq!(out.recv().await, None);
149        })
150        .await;
151    }
152}