vector/config/unit_test/
unit_test_components.rs

1use std::sync::Arc;
2
3use futures::{stream, Sink, Stream};
4use futures_util::{future, stream::BoxStream, FutureExt, StreamExt};
5use tokio::sync::{oneshot, Mutex};
6use vector_lib::configurable::configurable_component;
7use vector_lib::{
8    config::{DataType, Input, LogNamespace},
9    event::Event,
10    schema,
11    sink::{StreamSink, VectorSink},
12};
13
14use crate::{
15    conditions::Condition,
16    config::{
17        AcknowledgementsConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
18    },
19    sinks::Healthcheck,
20    sources,
21};
22
23/// Configuration for the `unit_test` source.
24#[configurable_component(source("unit_test", "Unit test."))]
25#[derive(Clone, Debug, Default)]
26pub struct UnitTestSourceConfig {
27    /// List of events sent from this source as part of the test.
28    #[serde(skip)]
29    pub events: Vec<Event>,
30}
31
32impl_generate_config_from_default!(UnitTestSourceConfig);
33
34#[async_trait::async_trait]
35#[typetag::serde(name = "unit_test")]
36impl SourceConfig for UnitTestSourceConfig {
37    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
38        let events = self.events.clone().into_iter();
39
40        Ok(Box::pin(async move {
41            let mut out = cx.out;
42            let _shutdown = cx.shutdown;
43            out.send_batch(events).await.map_err(|_| ())?;
44            Ok(())
45        }))
46    }
47
48    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
49        vec![SourceOutput::new_maybe_logs(
50            DataType::all_bits(),
51            schema::Definition::default_legacy_namespace(),
52        )]
53    }
54
55    fn can_acknowledge(&self) -> bool {
56        false
57    }
58}
59
60/// Configuration for the `unit_test_stream` source.
61#[configurable_component(source("unit_test_stream", "Unit test stream."))]
62#[derive(Clone)]
63pub struct UnitTestStreamSourceConfig {
64    #[serde(skip)]
65    stream: Arc<Mutex<Option<stream::BoxStream<'static, Event>>>>,
66}
67
68impl_generate_config_from_default!(UnitTestStreamSourceConfig);
69
70impl UnitTestStreamSourceConfig {
71    pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
72        Self {
73            stream: Arc::new(Mutex::new(Some(stream.boxed()))),
74        }
75    }
76}
77
78impl Default for UnitTestStreamSourceConfig {
79    fn default() -> Self {
80        Self::new(stream::empty().boxed())
81    }
82}
83
84impl std::fmt::Debug for UnitTestStreamSourceConfig {
85    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        formatter
87            .debug_struct("UnitTestStreamSourceConfig")
88            .finish()
89    }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "unit_test_stream")]
94impl SourceConfig for UnitTestStreamSourceConfig {
95    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
96        let stream = self.stream.lock().await.take().unwrap();
97        Ok(Box::pin(async move {
98            let mut out = cx.out;
99            let _shutdown = cx.shutdown;
100            out.send_event_stream(stream).await.map_err(|_| ())?;
101            Ok(())
102        }))
103    }
104
105    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106        vec![SourceOutput::new_maybe_logs(
107            DataType::all_bits(),
108            schema::Definition::default_legacy_namespace(),
109        )]
110    }
111
112    fn can_acknowledge(&self) -> bool {
113        false
114    }
115}
116
117#[derive(Clone, Default)]
118pub enum UnitTestSinkCheck {
119    /// Check all events that are received against the list of conditions.
120    Checks(Vec<Vec<Condition>>),
121
122    /// Check that no events were received.
123    NoOutputs,
124
125    /// Do nothing.
126    #[default]
127    NoOp,
128}
129
130#[derive(Debug)]
131pub struct UnitTestSinkResult {
132    pub test_name: String,
133    pub test_errors: Vec<String>,
134}
135
136/// Configuration for the `unit_test` sink.
137#[configurable_component(sink("unit_test", "Unit test."))]
138#[derive(Clone, Default, Derivative)]
139#[derivative(Debug)]
140pub struct UnitTestSinkConfig {
141    /// Name of the test that this sink is being used for.
142    pub test_name: String,
143
144    /// List of names of the transform/branch associated with this sink.
145    pub transform_ids: Vec<String>,
146
147    /// Sender side of the test result channel.
148    #[serde(skip)]
149    pub result_tx: Arc<Mutex<Option<oneshot::Sender<UnitTestSinkResult>>>>,
150
151    /// Predicate applied to each event that reaches the sink.
152    #[serde(skip)]
153    #[derivative(Debug = "ignore")]
154    pub check: UnitTestSinkCheck,
155}
156
157impl_generate_config_from_default!(UnitTestSinkConfig);
158
159#[async_trait::async_trait]
160#[typetag::serde(name = "unit_test")]
161impl SinkConfig for UnitTestSinkConfig {
162    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
163        let tx = self.result_tx.lock().await.take();
164        let sink = UnitTestSink {
165            test_name: self.test_name.clone(),
166            transform_ids: self.transform_ids.clone(),
167            result_tx: tx,
168            check: self.check.clone(),
169        };
170        let healthcheck = future::ok(()).boxed();
171
172        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
173    }
174
175    fn input(&self) -> Input {
176        Input::all()
177    }
178
179    fn acknowledgements(&self) -> &AcknowledgementsConfig {
180        &AcknowledgementsConfig::DEFAULT
181    }
182}
183
184pub struct UnitTestSink {
185    pub test_name: String,
186    pub transform_ids: Vec<String>,
187    // None for NoOp test sinks
188    pub result_tx: Option<oneshot::Sender<UnitTestSinkResult>>,
189    pub check: UnitTestSinkCheck,
190}
191
192#[async_trait::async_trait]
193impl StreamSink<Event> for UnitTestSink {
194    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
195        let mut output_events = Vec::new();
196        let mut result = UnitTestSinkResult {
197            test_name: self.test_name,
198            test_errors: Vec::new(),
199        };
200
201        while let Some(event) = input.next().await {
202            output_events.push(event);
203        }
204
205        match self.check {
206            UnitTestSinkCheck::Checks(checks) => {
207                if output_events.is_empty() {
208                    result
209                        .test_errors
210                        .push(format!("checks for transforms {:?} failed: no events received. Topology may be disconnected or transform is missing inputs.", self.transform_ids));
211                } else {
212                    for (i, check) in checks.iter().enumerate() {
213                        let mut check_errors = Vec::new();
214                        for (j, condition) in check.iter().enumerate() {
215                            let mut condition_errors = Vec::new();
216                            for event in output_events.iter() {
217                                match condition.check_with_context(event.clone()).0 {
218                                    Ok(_) => {
219                                        condition_errors.clear();
220                                        break;
221                                    }
222                                    Err(error) => {
223                                        condition_errors
224                                            .push(format!("  condition[{}]: {}", j, error));
225                                    }
226                                }
227                            }
228                            check_errors.extend(condition_errors);
229                        }
230                        // If there are errors, add a preamble to the output
231                        if !check_errors.is_empty() {
232                            check_errors.insert(
233                                0,
234                                format!(
235                                    "check[{}] for transforms {:?} failed conditions:",
236                                    i, self.transform_ids
237                                ),
238                            );
239                        }
240
241                        result.test_errors.extend(check_errors);
242                    }
243
244                    // If there are errors, add a summary of events received
245                    if !result.test_errors.is_empty() {
246                        result.test_errors.push(format!(
247                            "output payloads from {:?} (events encoded as JSON):\n  {}",
248                            self.transform_ids,
249                            events_to_string(&output_events)
250                        ));
251                    }
252                }
253            }
254            UnitTestSinkCheck::NoOutputs => {
255                if !output_events.is_empty() {
256                    result.test_errors.push(format!(
257                        "check for transforms {:?} failed: expected no outputs",
258                        self.transform_ids
259                    ));
260                }
261            }
262            UnitTestSinkCheck::NoOp => {}
263        }
264
265        if let Some(tx) = self.result_tx {
266            if tx.send(result).is_err() {
267                error!(message = "Sending unit test results failed in unit test sink.");
268            }
269        }
270        Ok(())
271    }
272}
273
274/// Configuration for the `unit_test_stream` sink.
275#[configurable_component(sink("unit_test_stream", "Unit test stream."))]
276#[derive(Clone, Default)]
277pub struct UnitTestStreamSinkConfig {
278    /// Sink that receives the processed events.
279    #[serde(skip)]
280    sink: Arc<Mutex<Option<Box<dyn Sink<Event, Error = ()> + Send + Unpin>>>>,
281}
282
283impl_generate_config_from_default!(UnitTestStreamSinkConfig);
284
285impl UnitTestStreamSinkConfig {
286    pub fn new(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
287        Self {
288            sink: Arc::new(Mutex::new(Some(Box::new(sink)))),
289        }
290    }
291}
292
293impl std::fmt::Debug for UnitTestStreamSinkConfig {
294    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        formatter.debug_struct("UnitTestStreamSinkConfig").finish()
296    }
297}
298
299#[async_trait::async_trait]
300#[typetag::serde(name = "unit_test_stream")]
301impl SinkConfig for UnitTestStreamSinkConfig {
302    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
303        let sink = self.sink.lock().await.take().unwrap();
304        let healthcheck = future::ok(()).boxed();
305
306        #[allow(deprecated)]
307        Ok((VectorSink::from_event_sink(sink), healthcheck))
308    }
309
310    fn input(&self) -> Input {
311        Input::all()
312    }
313
314    fn acknowledgements(&self) -> &AcknowledgementsConfig {
315        &AcknowledgementsConfig::DEFAULT
316    }
317}
318
319fn events_to_string(events: &[Event]) -> String {
320    events
321        .iter()
322        .map(|event| match event {
323            Event::Log(log) => serde_json::to_string(log).unwrap_or_else(|_| "{}".to_string()),
324            Event::Metric(metric) => {
325                serde_json::to_string(metric).unwrap_or_else(|_| "{}".to_string())
326            }
327            Event::Trace(trace) => {
328                serde_json::to_string(trace).unwrap_or_else(|_| "{}".to_string())
329            }
330        })
331        .collect::<Vec<_>>()
332        .join("\n  ")
333}