vector/transforms/
filter.rs

1use vector_lib::{
2    config::clone_input_definitions,
3    configurable::configurable_component,
4    internal_event::{Count, InternalEventHandle as _, Registered},
5};
6
7use crate::{
8    conditions::{AnyCondition, Condition},
9    config::{
10        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
11        TransformOutput,
12    },
13    event::Event,
14    internal_events::FilterEventsDropped,
15    schema,
16    transforms::{FunctionTransform, OutputBuffer, Transform},
17};
18
19/// Configuration for the `filter` transform.
20#[configurable_component(transform("filter", "Filter events based on a set of conditions."))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub struct FilterConfig {
24    #[configurable(derived)]
25    /// The condition that every input event is matched against.
26    ///
27    /// If an event is matched by the condition, it is forwarded. Otherwise, the event is dropped.
28    condition: AnyCondition,
29}
30
31impl From<AnyCondition> for FilterConfig {
32    fn from(condition: AnyCondition) -> Self {
33        Self { condition }
34    }
35}
36
37impl GenerateConfig for FilterConfig {
38    fn generate_config() -> toml::Value {
39        toml::from_str(r#"condition = ".message == \"value\"""#).unwrap()
40    }
41}
42
43#[async_trait::async_trait]
44#[typetag::serde(name = "filter")]
45impl TransformConfig for FilterConfig {
46    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
47        Ok(Transform::function(Filter::new(self.condition.build(
48            &context.enrichment_tables,
49            &context.metrics_storage,
50        )?)))
51    }
52
53    fn input(&self) -> Input {
54        Input::all()
55    }
56
57    fn outputs(
58        &self,
59        _: &TransformContext,
60        input_definitions: &[(OutputId, schema::Definition)],
61    ) -> Vec<TransformOutput> {
62        vec![TransformOutput::new(
63            DataType::all_bits(),
64            clone_input_definitions(input_definitions),
65        )]
66    }
67
68    fn enable_concurrency(&self) -> bool {
69        true
70    }
71}
72
73#[derive(Clone)]
74pub struct Filter {
75    condition: Condition,
76    events_dropped: Registered<FilterEventsDropped>,
77}
78
79impl Filter {
80    pub fn new(condition: Condition) -> Self {
81        Self {
82            condition,
83            events_dropped: register!(FilterEventsDropped),
84        }
85    }
86}
87
88impl FunctionTransform for Filter {
89    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
90        let (result, event) = self.condition.check(event);
91        if result {
92            output.push(event);
93        } else {
94            self.events_dropped.emit(Count(1));
95        }
96    }
97}
98
99#[cfg(test)]
100mod test {
101    use std::sync::Arc;
102
103    use tokio::sync::mpsc;
104    use tokio_stream::wrappers::ReceiverStream;
105    use vector_lib::{
106        config::ComponentKey,
107        event::{Metric, MetricKind, MetricValue},
108    };
109
110    use super::*;
111    use crate::{
112        conditions::ConditionConfig,
113        config::schema::Definition,
114        event::{Event, LogEvent},
115        test_util::components::assert_transform_compliance,
116        transforms::test::create_topology,
117    };
118
119    const TEST_SOURCE_COMPONENT_ID: &str = "in";
120    const TEST_UPSTREAM_COMPONENT_ID: &str = "transform";
121    const TEST_SOURCE_TYPE: &str = "unit_test_stream";
122
123    fn set_expected_metadata(event: &mut Event) {
124        event.set_source_id(Arc::new(ComponentKey::from(TEST_SOURCE_COMPONENT_ID)));
125        event.set_upstream_id(Arc::new(OutputId::from(TEST_UPSTREAM_COMPONENT_ID)));
126        event.set_source_type(TEST_SOURCE_TYPE);
127        event
128            .metadata_mut()
129            .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
130    }
131
132    #[test]
133    fn generate_config() {
134        crate::test_util::test_generate_config::<super::FilterConfig>();
135    }
136
137    #[tokio::test]
138    async fn filter_basic() {
139        assert_transform_compliance(async {
140            let transform_config = FilterConfig::from(AnyCondition::from(ConditionConfig::IsLog));
141
142            let (tx, rx) = mpsc::channel(1);
143            let (topology, mut out) =
144                create_topology(ReceiverStream::new(rx), transform_config).await;
145
146            let mut log = Event::from(LogEvent::from("message"));
147            tx.send(log.clone()).await.unwrap();
148
149            set_expected_metadata(&mut log);
150
151            assert_eq!(out.recv().await.unwrap(), log);
152
153            let metric = Event::from(Metric::new(
154                "test metric",
155                MetricKind::Incremental,
156                MetricValue::Counter { value: 1.0 },
157            ));
158            tx.send(metric).await.unwrap();
159
160            drop(tx);
161            topology.stop().await;
162            assert_eq!(out.recv().await, None);
163        })
164        .await;
165    }
166}