vector/transforms/
filter.rs1use vector_lib::config::{clone_input_definitions, LogNamespace};
2use vector_lib::configurable::configurable_component;
3use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered};
4
5use crate::{
6 conditions::{AnyCondition, Condition},
7 config::{
8 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
9 TransformOutput,
10 },
11 event::Event,
12 internal_events::FilterEventsDropped,
13 schema,
14 transforms::{FunctionTransform, OutputBuffer, Transform},
15};
16
17#[configurable_component(transform("filter", "Filter events based on a set of conditions."))]
19#[derive(Clone, Debug)]
20#[serde(deny_unknown_fields)]
21pub struct FilterConfig {
22 #[configurable(derived)]
23 condition: AnyCondition,
27}
28
29impl From<AnyCondition> for FilterConfig {
30 fn from(condition: AnyCondition) -> Self {
31 Self { condition }
32 }
33}
34
35impl GenerateConfig for FilterConfig {
36 fn generate_config() -> toml::Value {
37 toml::from_str(r#"condition = ".message == \"value\"""#).unwrap()
38 }
39}
40
41#[async_trait::async_trait]
42#[typetag::serde(name = "filter")]
43impl TransformConfig for FilterConfig {
44 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
45 Ok(Transform::function(Filter::new(
46 self.condition.build(&context.enrichment_tables)?,
47 )))
48 }
49
50 fn input(&self) -> Input {
51 Input::all()
52 }
53
54 fn outputs(
55 &self,
56 _enrichment_tables: vector_lib::enrichment::TableRegistry,
57 input_definitions: &[(OutputId, schema::Definition)],
58 _: LogNamespace,
59 ) -> Vec<TransformOutput> {
60 vec![TransformOutput::new(
61 DataType::all_bits(),
62 clone_input_definitions(input_definitions),
63 )]
64 }
65
66 fn enable_concurrency(&self) -> bool {
67 true
68 }
69}
70
71#[derive(Clone)]
72pub struct Filter {
73 condition: Condition,
74 events_dropped: Registered<FilterEventsDropped>,
75}
76
77impl Filter {
78 pub fn new(condition: Condition) -> Self {
79 Self {
80 condition,
81 events_dropped: register!(FilterEventsDropped),
82 }
83 }
84}
85
86impl FunctionTransform for Filter {
87 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
88 let (result, event) = self.condition.check(event);
89 if result {
90 output.push(event);
91 } else {
92 self.events_dropped.emit(Count(1));
93 }
94 }
95}
96
97#[cfg(test)]
98mod test {
99 use std::sync::Arc;
100
101 use tokio::sync::mpsc;
102 use tokio_stream::wrappers::ReceiverStream;
103 use vector_lib::config::ComponentKey;
104 use vector_lib::event::{Metric, MetricKind, MetricValue};
105
106 use super::*;
107 use crate::config::schema::Definition;
108 use crate::{
109 conditions::ConditionConfig,
110 event::{Event, LogEvent},
111 test_util::components::assert_transform_compliance,
112 transforms::test::create_topology,
113 };
114
115 #[test]
116 fn generate_config() {
117 crate::test_util::test_generate_config::<super::FilterConfig>();
118 }
119
120 #[tokio::test]
121 async fn filter_basic() {
122 assert_transform_compliance(async {
123 let transform_config = FilterConfig::from(AnyCondition::from(ConditionConfig::IsLog));
124
125 let (tx, rx) = mpsc::channel(1);
126 let (topology, mut out) =
127 create_topology(ReceiverStream::new(rx), transform_config).await;
128
129 let mut log = Event::from(LogEvent::from("message"));
130 tx.send(log.clone()).await.unwrap();
131
132 log.set_source_id(Arc::new(ComponentKey::from("in")));
133 log.set_upstream_id(Arc::new(OutputId::from("transform")));
134 log.metadata_mut()
135 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
136
137 assert_eq!(out.recv().await.unwrap(), log);
138
139 let metric = Event::from(Metric::new(
140 "test metric",
141 MetricKind::Incremental,
142 MetricValue::Counter { value: 1.0 },
143 ));
144 tx.send(metric).await.unwrap();
145
146 drop(tx);
147 topology.stop().await;
148 assert_eq!(out.recv().await, None);
149 })
150 .await;
151 }
152}