1use std::sync::Arc;
2
3use futures::{stream, Sink, Stream};
4use futures_util::{future, stream::BoxStream, FutureExt, StreamExt};
5use tokio::sync::{oneshot, Mutex};
6use vector_lib::configurable::configurable_component;
7use vector_lib::{
8 config::{DataType, Input, LogNamespace},
9 event::Event,
10 schema,
11 sink::{StreamSink, VectorSink},
12};
13
14use crate::{
15 conditions::Condition,
16 config::{
17 AcknowledgementsConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
18 },
19 sinks::Healthcheck,
20 sources,
21};
22
23#[configurable_component(source("unit_test", "Unit test."))]
25#[derive(Clone, Debug, Default)]
26pub struct UnitTestSourceConfig {
27 #[serde(skip)]
29 pub events: Vec<Event>,
30}
31
32impl_generate_config_from_default!(UnitTestSourceConfig);
33
34#[async_trait::async_trait]
35#[typetag::serde(name = "unit_test")]
36impl SourceConfig for UnitTestSourceConfig {
37 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
38 let events = self.events.clone().into_iter();
39
40 Ok(Box::pin(async move {
41 let mut out = cx.out;
42 let _shutdown = cx.shutdown;
43 out.send_batch(events).await.map_err(|_| ())?;
44 Ok(())
45 }))
46 }
47
48 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
49 vec![SourceOutput::new_maybe_logs(
50 DataType::all_bits(),
51 schema::Definition::default_legacy_namespace(),
52 )]
53 }
54
55 fn can_acknowledge(&self) -> bool {
56 false
57 }
58}
59
60#[configurable_component(source("unit_test_stream", "Unit test stream."))]
62#[derive(Clone)]
63pub struct UnitTestStreamSourceConfig {
64 #[serde(skip)]
65 stream: Arc<Mutex<Option<stream::BoxStream<'static, Event>>>>,
66}
67
68impl_generate_config_from_default!(UnitTestStreamSourceConfig);
69
70impl UnitTestStreamSourceConfig {
71 pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
72 Self {
73 stream: Arc::new(Mutex::new(Some(stream.boxed()))),
74 }
75 }
76}
77
78impl Default for UnitTestStreamSourceConfig {
79 fn default() -> Self {
80 Self::new(stream::empty().boxed())
81 }
82}
83
84impl std::fmt::Debug for UnitTestStreamSourceConfig {
85 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 formatter
87 .debug_struct("UnitTestStreamSourceConfig")
88 .finish()
89 }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "unit_test_stream")]
94impl SourceConfig for UnitTestStreamSourceConfig {
95 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
96 let stream = self.stream.lock().await.take().unwrap();
97 Ok(Box::pin(async move {
98 let mut out = cx.out;
99 let _shutdown = cx.shutdown;
100 out.send_event_stream(stream).await.map_err(|_| ())?;
101 Ok(())
102 }))
103 }
104
105 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106 vec![SourceOutput::new_maybe_logs(
107 DataType::all_bits(),
108 schema::Definition::default_legacy_namespace(),
109 )]
110 }
111
112 fn can_acknowledge(&self) -> bool {
113 false
114 }
115}
116
117#[derive(Clone, Default)]
118pub enum UnitTestSinkCheck {
119 Checks(Vec<Vec<Condition>>),
121
122 NoOutputs,
124
125 #[default]
127 NoOp,
128}
129
130#[derive(Debug)]
131pub struct UnitTestSinkResult {
132 pub test_name: String,
133 pub test_errors: Vec<String>,
134}
135
136#[configurable_component(sink("unit_test", "Unit test."))]
138#[derive(Clone, Default, Derivative)]
139#[derivative(Debug)]
140pub struct UnitTestSinkConfig {
141 pub test_name: String,
143
144 pub transform_ids: Vec<String>,
146
147 #[serde(skip)]
149 pub result_tx: Arc<Mutex<Option<oneshot::Sender<UnitTestSinkResult>>>>,
150
151 #[serde(skip)]
153 #[derivative(Debug = "ignore")]
154 pub check: UnitTestSinkCheck,
155}
156
157impl_generate_config_from_default!(UnitTestSinkConfig);
158
159#[async_trait::async_trait]
160#[typetag::serde(name = "unit_test")]
161impl SinkConfig for UnitTestSinkConfig {
162 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
163 let tx = self.result_tx.lock().await.take();
164 let sink = UnitTestSink {
165 test_name: self.test_name.clone(),
166 transform_ids: self.transform_ids.clone(),
167 result_tx: tx,
168 check: self.check.clone(),
169 };
170 let healthcheck = future::ok(()).boxed();
171
172 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
173 }
174
175 fn input(&self) -> Input {
176 Input::all()
177 }
178
179 fn acknowledgements(&self) -> &AcknowledgementsConfig {
180 &AcknowledgementsConfig::DEFAULT
181 }
182}
183
184pub struct UnitTestSink {
185 pub test_name: String,
186 pub transform_ids: Vec<String>,
187 pub result_tx: Option<oneshot::Sender<UnitTestSinkResult>>,
189 pub check: UnitTestSinkCheck,
190}
191
192#[async_trait::async_trait]
193impl StreamSink<Event> for UnitTestSink {
194 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
195 let mut output_events = Vec::new();
196 let mut result = UnitTestSinkResult {
197 test_name: self.test_name,
198 test_errors: Vec::new(),
199 };
200
201 while let Some(event) = input.next().await {
202 output_events.push(event);
203 }
204
205 match self.check {
206 UnitTestSinkCheck::Checks(checks) => {
207 if output_events.is_empty() {
208 result
209 .test_errors
210 .push(format!("checks for transforms {:?} failed: no events received. Topology may be disconnected or transform is missing inputs.", self.transform_ids));
211 } else {
212 for (i, check) in checks.iter().enumerate() {
213 let mut check_errors = Vec::new();
214 for (j, condition) in check.iter().enumerate() {
215 let mut condition_errors = Vec::new();
216 for event in output_events.iter() {
217 match condition.check_with_context(event.clone()).0 {
218 Ok(_) => {
219 condition_errors.clear();
220 break;
221 }
222 Err(error) => {
223 condition_errors.push(format!(" condition[{j}]: {error}"));
224 }
225 }
226 }
227 check_errors.extend(condition_errors);
228 }
229 if !check_errors.is_empty() {
231 check_errors.insert(
232 0,
233 format!(
234 "check[{}] for transforms {:?} failed conditions:",
235 i, self.transform_ids
236 ),
237 );
238 }
239
240 result.test_errors.extend(check_errors);
241 }
242
243 if !result.test_errors.is_empty() {
245 result.test_errors.push(format!(
246 "output payloads from {:?} (events encoded as JSON):\n {}",
247 self.transform_ids,
248 events_to_string(&output_events)
249 ));
250 }
251 }
252 }
253 UnitTestSinkCheck::NoOutputs => {
254 if !output_events.is_empty() {
255 result.test_errors.push(format!(
256 "check for transforms {:?} failed: expected no outputs",
257 self.transform_ids
258 ));
259 }
260 }
261 UnitTestSinkCheck::NoOp => {}
262 }
263
264 if let Some(tx) = self.result_tx {
265 if tx.send(result).is_err() {
266 error!(message = "Sending unit test results failed in unit test sink.");
267 }
268 }
269 Ok(())
270 }
271}
272
273#[configurable_component(sink("unit_test_stream", "Unit test stream."))]
275#[derive(Clone, Default)]
276pub struct UnitTestStreamSinkConfig {
277 #[serde(skip)]
279 sink: Arc<Mutex<Option<Box<dyn Sink<Event, Error = ()> + Send + Unpin>>>>,
280}
281
282impl_generate_config_from_default!(UnitTestStreamSinkConfig);
283
284impl UnitTestStreamSinkConfig {
285 pub fn new(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
286 Self {
287 sink: Arc::new(Mutex::new(Some(Box::new(sink)))),
288 }
289 }
290}
291
292impl std::fmt::Debug for UnitTestStreamSinkConfig {
293 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 formatter.debug_struct("UnitTestStreamSinkConfig").finish()
295 }
296}
297
298#[async_trait::async_trait]
299#[typetag::serde(name = "unit_test_stream")]
300impl SinkConfig for UnitTestStreamSinkConfig {
301 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
302 let sink = self.sink.lock().await.take().unwrap();
303 let healthcheck = future::ok(()).boxed();
304
305 #[allow(deprecated)]
306 Ok((VectorSink::from_event_sink(sink), healthcheck))
307 }
308
309 fn input(&self) -> Input {
310 Input::all()
311 }
312
313 fn acknowledgements(&self) -> &AcknowledgementsConfig {
314 &AcknowledgementsConfig::DEFAULT
315 }
316}
317
318fn events_to_string(events: &[Event]) -> String {
319 events
320 .iter()
321 .map(|event| match event {
322 Event::Log(log) => serde_json::to_string(log).unwrap_or_else(|_| "{}".to_string()),
323 Event::Metric(metric) => {
324 serde_json::to_string(metric).unwrap_or_else(|_| "{}".to_string())
325 }
326 Event::Trace(trace) => {
327 serde_json::to_string(trace).unwrap_or_else(|_| "{}".to_string())
328 }
329 })
330 .collect::<Vec<_>>()
331 .join("\n ")
332}