vector/config/unit_test/
unit_test_components.rs

1use std::sync::Arc;
2
3use futures::{stream, Sink, Stream};
4use futures_util::{future, stream::BoxStream, FutureExt, StreamExt};
5use tokio::sync::{oneshot, Mutex};
6use vector_lib::configurable::configurable_component;
7use vector_lib::{
8    config::{DataType, Input, LogNamespace},
9    event::Event,
10    schema,
11    sink::{StreamSink, VectorSink},
12};
13
14use crate::{
15    conditions::Condition,
16    config::{
17        AcknowledgementsConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
18    },
19    sinks::Healthcheck,
20    sources,
21};
22
23/// Configuration for the `unit_test` source.
24#[configurable_component(source("unit_test", "Unit test."))]
25#[derive(Clone, Debug, Default)]
26pub struct UnitTestSourceConfig {
27    /// List of events sent from this source as part of the test.
28    #[serde(skip)]
29    pub events: Vec<Event>,
30}
31
32impl_generate_config_from_default!(UnitTestSourceConfig);
33
34#[async_trait::async_trait]
35#[typetag::serde(name = "unit_test")]
36impl SourceConfig for UnitTestSourceConfig {
37    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
38        let events = self.events.clone().into_iter();
39
40        Ok(Box::pin(async move {
41            let mut out = cx.out;
42            let _shutdown = cx.shutdown;
43            out.send_batch(events).await.map_err(|_| ())?;
44            Ok(())
45        }))
46    }
47
48    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
49        vec![SourceOutput::new_maybe_logs(
50            DataType::all_bits(),
51            schema::Definition::default_legacy_namespace(),
52        )]
53    }
54
55    fn can_acknowledge(&self) -> bool {
56        false
57    }
58}
59
60/// Configuration for the `unit_test_stream` source.
61#[configurable_component(source("unit_test_stream", "Unit test stream."))]
62#[derive(Clone)]
63pub struct UnitTestStreamSourceConfig {
64    #[serde(skip)]
65    stream: Arc<Mutex<Option<stream::BoxStream<'static, Event>>>>,
66}
67
68impl_generate_config_from_default!(UnitTestStreamSourceConfig);
69
70impl UnitTestStreamSourceConfig {
71    pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
72        Self {
73            stream: Arc::new(Mutex::new(Some(stream.boxed()))),
74        }
75    }
76}
77
78impl Default for UnitTestStreamSourceConfig {
79    fn default() -> Self {
80        Self::new(stream::empty().boxed())
81    }
82}
83
84impl std::fmt::Debug for UnitTestStreamSourceConfig {
85    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        formatter
87            .debug_struct("UnitTestStreamSourceConfig")
88            .finish()
89    }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "unit_test_stream")]
94impl SourceConfig for UnitTestStreamSourceConfig {
95    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
96        let stream = self.stream.lock().await.take().unwrap();
97        Ok(Box::pin(async move {
98            let mut out = cx.out;
99            let _shutdown = cx.shutdown;
100            out.send_event_stream(stream).await.map_err(|_| ())?;
101            Ok(())
102        }))
103    }
104
105    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106        vec![SourceOutput::new_maybe_logs(
107            DataType::all_bits(),
108            schema::Definition::default_legacy_namespace(),
109        )]
110    }
111
112    fn can_acknowledge(&self) -> bool {
113        false
114    }
115}
116
117#[derive(Clone, Default)]
118pub enum UnitTestSinkCheck {
119    /// Check all events that are received against the list of conditions.
120    Checks(Vec<Vec<Condition>>),
121
122    /// Check that no events were received.
123    NoOutputs,
124
125    /// Do nothing.
126    #[default]
127    NoOp,
128}
129
130#[derive(Debug)]
131pub struct UnitTestSinkResult {
132    pub test_name: String,
133    pub test_errors: Vec<String>,
134}
135
136/// Configuration for the `unit_test` sink.
137#[configurable_component(sink("unit_test", "Unit test."))]
138#[derive(Clone, Default, Derivative)]
139#[derivative(Debug)]
140pub struct UnitTestSinkConfig {
141    /// Name of the test that this sink is being used for.
142    pub test_name: String,
143
144    /// List of names of the transform/branch associated with this sink.
145    pub transform_ids: Vec<String>,
146
147    /// Sender side of the test result channel.
148    #[serde(skip)]
149    pub result_tx: Arc<Mutex<Option<oneshot::Sender<UnitTestSinkResult>>>>,
150
151    /// Predicate applied to each event that reaches the sink.
152    #[serde(skip)]
153    #[derivative(Debug = "ignore")]
154    pub check: UnitTestSinkCheck,
155}
156
157impl_generate_config_from_default!(UnitTestSinkConfig);
158
159#[async_trait::async_trait]
160#[typetag::serde(name = "unit_test")]
161impl SinkConfig for UnitTestSinkConfig {
162    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
163        let tx = self.result_tx.lock().await.take();
164        let sink = UnitTestSink {
165            test_name: self.test_name.clone(),
166            transform_ids: self.transform_ids.clone(),
167            result_tx: tx,
168            check: self.check.clone(),
169        };
170        let healthcheck = future::ok(()).boxed();
171
172        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
173    }
174
175    fn input(&self) -> Input {
176        Input::all()
177    }
178
179    fn acknowledgements(&self) -> &AcknowledgementsConfig {
180        &AcknowledgementsConfig::DEFAULT
181    }
182}
183
184pub struct UnitTestSink {
185    pub test_name: String,
186    pub transform_ids: Vec<String>,
187    // None for NoOp test sinks
188    pub result_tx: Option<oneshot::Sender<UnitTestSinkResult>>,
189    pub check: UnitTestSinkCheck,
190}
191
192#[async_trait::async_trait]
193impl StreamSink<Event> for UnitTestSink {
194    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
195        let mut output_events = Vec::new();
196        let mut result = UnitTestSinkResult {
197            test_name: self.test_name,
198            test_errors: Vec::new(),
199        };
200
201        while let Some(event) = input.next().await {
202            output_events.push(event);
203        }
204
205        match self.check {
206            UnitTestSinkCheck::Checks(checks) => {
207                if output_events.is_empty() {
208                    result
209                        .test_errors
210                        .push(format!("checks for transforms {:?} failed: no events received. Topology may be disconnected or transform is missing inputs.", self.transform_ids));
211                } else {
212                    for (i, check) in checks.iter().enumerate() {
213                        let mut check_errors = Vec::new();
214                        for (j, condition) in check.iter().enumerate() {
215                            let mut condition_errors = Vec::new();
216                            for event in output_events.iter() {
217                                match condition.check_with_context(event.clone()).0 {
218                                    Ok(_) => {
219                                        condition_errors.clear();
220                                        break;
221                                    }
222                                    Err(error) => {
223                                        condition_errors.push(format!("  condition[{j}]: {error}"));
224                                    }
225                                }
226                            }
227                            check_errors.extend(condition_errors);
228                        }
229                        // If there are errors, add a preamble to the output
230                        if !check_errors.is_empty() {
231                            check_errors.insert(
232                                0,
233                                format!(
234                                    "check[{}] for transforms {:?} failed conditions:",
235                                    i, self.transform_ids
236                                ),
237                            );
238                        }
239
240                        result.test_errors.extend(check_errors);
241                    }
242
243                    // If there are errors, add a summary of events received
244                    if !result.test_errors.is_empty() {
245                        result.test_errors.push(format!(
246                            "output payloads from {:?} (events encoded as JSON):\n  {}",
247                            self.transform_ids,
248                            events_to_string(&output_events)
249                        ));
250                    }
251                }
252            }
253            UnitTestSinkCheck::NoOutputs => {
254                if !output_events.is_empty() {
255                    result.test_errors.push(format!(
256                        "check for transforms {:?} failed: expected no outputs",
257                        self.transform_ids
258                    ));
259                }
260            }
261            UnitTestSinkCheck::NoOp => {}
262        }
263
264        if let Some(tx) = self.result_tx {
265            if tx.send(result).is_err() {
266                error!(message = "Sending unit test results failed in unit test sink.");
267            }
268        }
269        Ok(())
270    }
271}
272
273/// Configuration for the `unit_test_stream` sink.
274#[configurable_component(sink("unit_test_stream", "Unit test stream."))]
275#[derive(Clone, Default)]
276pub struct UnitTestStreamSinkConfig {
277    /// Sink that receives the processed events.
278    #[serde(skip)]
279    sink: Arc<Mutex<Option<Box<dyn Sink<Event, Error = ()> + Send + Unpin>>>>,
280}
281
282impl_generate_config_from_default!(UnitTestStreamSinkConfig);
283
284impl UnitTestStreamSinkConfig {
285    pub fn new(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
286        Self {
287            sink: Arc::new(Mutex::new(Some(Box::new(sink)))),
288        }
289    }
290}
291
292impl std::fmt::Debug for UnitTestStreamSinkConfig {
293    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        formatter.debug_struct("UnitTestStreamSinkConfig").finish()
295    }
296}
297
298#[async_trait::async_trait]
299#[typetag::serde(name = "unit_test_stream")]
300impl SinkConfig for UnitTestStreamSinkConfig {
301    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
302        let sink = self.sink.lock().await.take().unwrap();
303        let healthcheck = future::ok(()).boxed();
304
305        #[allow(deprecated)]
306        Ok((VectorSink::from_event_sink(sink), healthcheck))
307    }
308
309    fn input(&self) -> Input {
310        Input::all()
311    }
312
313    fn acknowledgements(&self) -> &AcknowledgementsConfig {
314        &AcknowledgementsConfig::DEFAULT
315    }
316}
317
318fn events_to_string(events: &[Event]) -> String {
319    events
320        .iter()
321        .map(|event| match event {
322            Event::Log(log) => serde_json::to_string(log).unwrap_or_else(|_| "{}".to_string()),
323            Event::Metric(metric) => {
324                serde_json::to_string(metric).unwrap_or_else(|_| "{}".to_string())
325            }
326            Event::Trace(trace) => {
327                serde_json::to_string(trace).unwrap_or_else(|_| "{}".to_string())
328            }
329        })
330        .collect::<Vec<_>>()
331        .join("\n  ")
332}