1#![allow(missing_docs)]
2#[allow(unused_imports)]
3use std::collections::HashSet;
4
5pub mod dedupe;
6pub mod reduce;
7#[cfg(feature = "transforms-impl-sample")]
8pub mod sample;
9
10#[cfg(feature = "transforms-aggregate")]
11pub mod aggregate;
12#[cfg(feature = "transforms-aws_ec2_metadata")]
13pub mod aws_ec2_metadata;
14#[cfg(feature = "transforms-exclusive-route")]
15mod exclusive_route;
16#[cfg(feature = "transforms-filter")]
17pub mod filter;
18#[cfg(feature = "transforms-incremental_to_absolute")]
19pub mod incremental_to_absolute;
20#[cfg(feature = "transforms-log_to_metric")]
21pub mod log_to_metric;
22#[cfg(feature = "transforms-lua")]
23pub mod lua;
24#[cfg(feature = "transforms-metric_to_log")]
25pub mod metric_to_log;
26#[cfg(feature = "transforms-remap")]
27pub mod remap;
28#[cfg(feature = "transforms-route")]
29pub mod route;
30#[cfg(feature = "transforms-tag_cardinality_limit")]
31pub mod tag_cardinality_limit;
32#[cfg(feature = "transforms-throttle")]
33pub mod throttle;
34#[cfg(feature = "transforms-trace_to_log")]
35pub mod trace_to_log;
36#[cfg(feature = "transforms-window")]
37pub mod window;
38
39pub use vector_lib::transform::{
40 FunctionTransform, OutputBuffer, SyncTransform, TaskTransform, Transform, TransformOutputs,
41 TransformOutputsBuf,
42};
43
44#[cfg(test)]
45mod test {
46 use futures::Stream;
47 use futures_util::SinkExt;
48 use tokio::sync::mpsc;
49 use tokio_util::sync::PollSender;
50 use vector_lib::transform::FunctionTransform;
51
52 use crate::{
53 config::{
54 ConfigBuilder, TransformConfig,
55 unit_test::{UnitTestStreamSinkConfig, UnitTestStreamSourceConfig},
56 },
57 event::Event,
58 test_util::start_topology,
59 topology::RunningTopology,
60 transforms::OutputBuffer,
61 };
62
63 #[allow(dead_code)]
74 pub fn transform_one(ft: &mut dyn FunctionTransform, event: Event) -> Option<Event> {
75 let mut buf = OutputBuffer::with_capacity(1);
76 ft.transform(&mut buf, event);
77 assert!(buf.len() <= 1);
78 buf.into_events().next()
79 }
80
81 #[allow(dead_code)]
82 pub async fn create_topology<T: TransformConfig + 'static>(
83 events: impl Stream<Item = Event> + Send + 'static,
84 transform_config: T,
85 ) -> (RunningTopology, mpsc::Receiver<Event>) {
86 let mut builder = ConfigBuilder::default();
87
88 let (tx, rx) = mpsc::channel(1);
89
90 builder.add_source("in", UnitTestStreamSourceConfig::new(events));
92 builder.add_transform("transform", &["in"], transform_config);
93 builder.add_sink(
94 "out",
95 &["transform"],
96 UnitTestStreamSinkConfig::new(
97 PollSender::new(tx).sink_map_err(|error| panic!("{}", error)),
98 ),
99 );
100
101 let config = builder.build().expect("building config should not fail");
102 let (topology, _) = start_topology(config, false).await;
103
104 (topology, rx)
105 }
106}