vector/transforms/
filter.rs1use vector_lib::{
2 config::{LogNamespace, clone_input_definitions},
3 configurable::configurable_component,
4 internal_event::{Count, InternalEventHandle as _, Registered},
5};
6
7use crate::{
8 conditions::{AnyCondition, Condition},
9 config::{
10 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
11 TransformOutput,
12 },
13 event::Event,
14 internal_events::FilterEventsDropped,
15 schema,
16 transforms::{FunctionTransform, OutputBuffer, Transform},
17};
18
19#[configurable_component(transform("filter", "Filter events based on a set of conditions."))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub struct FilterConfig {
24 #[configurable(derived)]
25 condition: AnyCondition,
29}
30
31impl From<AnyCondition> for FilterConfig {
32 fn from(condition: AnyCondition) -> Self {
33 Self { condition }
34 }
35}
36
37impl GenerateConfig for FilterConfig {
38 fn generate_config() -> toml::Value {
39 toml::from_str(r#"condition = ".message == \"value\"""#).unwrap()
40 }
41}
42
43#[async_trait::async_trait]
44#[typetag::serde(name = "filter")]
45impl TransformConfig for FilterConfig {
46 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
47 Ok(Transform::function(Filter::new(
48 self.condition.build(&context.enrichment_tables)?,
49 )))
50 }
51
52 fn input(&self) -> Input {
53 Input::all()
54 }
55
56 fn outputs(
57 &self,
58 _enrichment_tables: vector_lib::enrichment::TableRegistry,
59 input_definitions: &[(OutputId, schema::Definition)],
60 _: LogNamespace,
61 ) -> Vec<TransformOutput> {
62 vec![TransformOutput::new(
63 DataType::all_bits(),
64 clone_input_definitions(input_definitions),
65 )]
66 }
67
68 fn enable_concurrency(&self) -> bool {
69 true
70 }
71}
72
73#[derive(Clone)]
74pub struct Filter {
75 condition: Condition,
76 events_dropped: Registered<FilterEventsDropped>,
77}
78
79impl Filter {
80 pub fn new(condition: Condition) -> Self {
81 Self {
82 condition,
83 events_dropped: register!(FilterEventsDropped),
84 }
85 }
86}
87
88impl FunctionTransform for Filter {
89 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
90 let (result, event) = self.condition.check(event);
91 if result {
92 output.push(event);
93 } else {
94 self.events_dropped.emit(Count(1));
95 }
96 }
97}
98
99#[cfg(test)]
100mod test {
101 use std::sync::Arc;
102
103 use tokio::sync::mpsc;
104 use tokio_stream::wrappers::ReceiverStream;
105 use vector_lib::{
106 config::ComponentKey,
107 event::{Metric, MetricKind, MetricValue},
108 };
109
110 use super::*;
111 use crate::{
112 conditions::ConditionConfig,
113 config::schema::Definition,
114 event::{Event, LogEvent},
115 test_util::components::assert_transform_compliance,
116 transforms::test::create_topology,
117 };
118
119 #[test]
120 fn generate_config() {
121 crate::test_util::test_generate_config::<super::FilterConfig>();
122 }
123
124 #[tokio::test]
125 async fn filter_basic() {
126 assert_transform_compliance(async {
127 let transform_config = FilterConfig::from(AnyCondition::from(ConditionConfig::IsLog));
128
129 let (tx, rx) = mpsc::channel(1);
130 let (topology, mut out) =
131 create_topology(ReceiverStream::new(rx), transform_config).await;
132
133 let mut log = Event::from(LogEvent::from("message"));
134 tx.send(log.clone()).await.unwrap();
135
136 log.set_source_id(Arc::new(ComponentKey::from("in")));
137 log.set_upstream_id(Arc::new(OutputId::from("transform")));
138 log.metadata_mut()
139 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
140
141 assert_eq!(out.recv().await.unwrap(), log);
142
143 let metric = Event::from(Metric::new(
144 "test metric",
145 MetricKind::Incremental,
146 MetricValue::Counter { value: 1.0 },
147 ));
148 tx.send(metric).await.unwrap();
149
150 drop(tx);
151 topology.stop().await;
152 assert_eq!(out.recv().await, None);
153 })
154 .await;
155 }
156}