vector/transforms/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#![allow(missing_docs)]
#[allow(unused_imports)]
use std::collections::HashSet;

pub mod dedupe;
pub mod reduce;
#[cfg(feature = "transforms-impl-sample")]
pub mod sample;

#[cfg(feature = "transforms-aggregate")]
pub mod aggregate;
#[cfg(feature = "transforms-aws_ec2_metadata")]
pub mod aws_ec2_metadata;
#[cfg(feature = "transforms-exclusive-route")]
mod exclusive_route;
#[cfg(feature = "transforms-filter")]
pub mod filter;
#[cfg(feature = "transforms-log_to_metric")]
pub mod log_to_metric;
#[cfg(feature = "transforms-lua")]
pub mod lua;
#[cfg(feature = "transforms-metric_to_log")]
pub mod metric_to_log;
#[cfg(feature = "transforms-remap")]
pub mod remap;
#[cfg(feature = "transforms-route")]
pub mod route;
#[cfg(feature = "transforms-tag_cardinality_limit")]
pub mod tag_cardinality_limit;
#[cfg(feature = "transforms-throttle")]
pub mod throttle;

pub use vector_lib::transform::{
    FunctionTransform, OutputBuffer, SyncTransform, TaskTransform, Transform, TransformOutputs,
    TransformOutputsBuf,
};

#[cfg(test)]
mod test {
    use futures::Stream;
    use futures_util::SinkExt;
    use tokio::sync::mpsc;
    use tokio_util::sync::PollSender;
    use vector_lib::transform::FunctionTransform;

    use crate::{
        config::{
            unit_test::{UnitTestStreamSinkConfig, UnitTestStreamSourceConfig},
            ConfigBuilder, TransformConfig,
        },
        event::Event,
        test_util::start_topology,
        topology::RunningTopology,
        transforms::OutputBuffer,
    };

    /// Transform a single `Event` through the `FunctionTransform`
    ///
    /// # Panics
    ///
    /// If `ft` attempts to emit more than one `Event` on transform this
    /// function will panic.
    // We allow dead_code here to avoid unused warnings when we compile our
    // benchmarks as tests. It's a valid warning -- the benchmarks don't use
    // this function -- but flagging this function off for bench flags will
    // issue a unused warnings about the import above.
    #[allow(dead_code)]
    pub fn transform_one(ft: &mut dyn FunctionTransform, event: Event) -> Option<Event> {
        let mut buf = OutputBuffer::with_capacity(1);
        ft.transform(&mut buf, event);
        assert!(buf.len() <= 1);
        buf.into_events().next()
    }

    #[allow(dead_code)]
    pub async fn create_topology<T: TransformConfig + 'static>(
        events: impl Stream<Item = Event> + Send + 'static,
        transform_config: T,
    ) -> (RunningTopology, mpsc::Receiver<Event>) {
        let mut builder = ConfigBuilder::default();

        let (tx, rx) = mpsc::channel(1);

        // TODO: Use non-hard-coded names to improve tests.
        builder.add_source("in", UnitTestStreamSourceConfig::new(events));
        builder.add_transform("transform", &["in"], transform_config);
        builder.add_sink(
            "out",
            &["transform"],
            UnitTestStreamSinkConfig::new(
                PollSender::new(tx).sink_map_err(|error| panic!("{}", error)),
            ),
        );

        let config = builder.build().expect("building config should not fail");
        let (topology, _) = start_topology(config, false).await;

        (topology, rx)
    }
}