1use std::sync::Arc;
2
3use futures::{stream, Sink, Stream};
4use futures_util::{future, stream::BoxStream, FutureExt, StreamExt};
5use tokio::sync::{oneshot, Mutex};
6use vector_lib::configurable::configurable_component;
7use vector_lib::{
8 config::{DataType, Input, LogNamespace},
9 event::Event,
10 schema,
11 sink::{StreamSink, VectorSink},
12};
13
14use crate::{
15 conditions::Condition,
16 config::{
17 AcknowledgementsConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
18 },
19 sinks::Healthcheck,
20 sources,
21};
22
23#[configurable_component(source("unit_test", "Unit test."))]
25#[derive(Clone, Debug, Default)]
26pub struct UnitTestSourceConfig {
27 #[serde(skip)]
29 pub events: Vec<Event>,
30}
31
32impl_generate_config_from_default!(UnitTestSourceConfig);
33
34#[async_trait::async_trait]
35#[typetag::serde(name = "unit_test")]
36impl SourceConfig for UnitTestSourceConfig {
37 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
38 let events = self.events.clone().into_iter();
39
40 Ok(Box::pin(async move {
41 let mut out = cx.out;
42 let _shutdown = cx.shutdown;
43 out.send_batch(events).await.map_err(|_| ())?;
44 Ok(())
45 }))
46 }
47
48 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
49 vec![SourceOutput::new_maybe_logs(
50 DataType::all_bits(),
51 schema::Definition::default_legacy_namespace(),
52 )]
53 }
54
55 fn can_acknowledge(&self) -> bool {
56 false
57 }
58}
59
60#[configurable_component(source("unit_test_stream", "Unit test stream."))]
62#[derive(Clone)]
63pub struct UnitTestStreamSourceConfig {
64 #[serde(skip)]
65 stream: Arc<Mutex<Option<stream::BoxStream<'static, Event>>>>,
66}
67
68impl_generate_config_from_default!(UnitTestStreamSourceConfig);
69
70impl UnitTestStreamSourceConfig {
71 pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
72 Self {
73 stream: Arc::new(Mutex::new(Some(stream.boxed()))),
74 }
75 }
76}
77
78impl Default for UnitTestStreamSourceConfig {
79 fn default() -> Self {
80 Self::new(stream::empty().boxed())
81 }
82}
83
84impl std::fmt::Debug for UnitTestStreamSourceConfig {
85 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 formatter
87 .debug_struct("UnitTestStreamSourceConfig")
88 .finish()
89 }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "unit_test_stream")]
94impl SourceConfig for UnitTestStreamSourceConfig {
95 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
96 let stream = self.stream.lock().await.take().unwrap();
97 Ok(Box::pin(async move {
98 let mut out = cx.out;
99 let _shutdown = cx.shutdown;
100 out.send_event_stream(stream).await.map_err(|_| ())?;
101 Ok(())
102 }))
103 }
104
105 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106 vec![SourceOutput::new_maybe_logs(
107 DataType::all_bits(),
108 schema::Definition::default_legacy_namespace(),
109 )]
110 }
111
112 fn can_acknowledge(&self) -> bool {
113 false
114 }
115}
116
117#[derive(Clone, Default)]
118pub enum UnitTestSinkCheck {
119 Checks(Vec<Vec<Condition>>),
121
122 NoOutputs,
124
125 #[default]
127 NoOp,
128}
129
130#[derive(Debug)]
131pub struct UnitTestSinkResult {
132 pub test_name: String,
133 pub test_errors: Vec<String>,
134}
135
136#[configurable_component(sink("unit_test", "Unit test."))]
138#[derive(Clone, Default, Derivative)]
139#[derivative(Debug)]
140pub struct UnitTestSinkConfig {
141 pub test_name: String,
143
144 pub transform_ids: Vec<String>,
146
147 #[serde(skip)]
149 pub result_tx: Arc<Mutex<Option<oneshot::Sender<UnitTestSinkResult>>>>,
150
151 #[serde(skip)]
153 #[derivative(Debug = "ignore")]
154 pub check: UnitTestSinkCheck,
155}
156
157impl_generate_config_from_default!(UnitTestSinkConfig);
158
159#[async_trait::async_trait]
160#[typetag::serde(name = "unit_test")]
161impl SinkConfig for UnitTestSinkConfig {
162 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
163 let tx = self.result_tx.lock().await.take();
164 let sink = UnitTestSink {
165 test_name: self.test_name.clone(),
166 transform_ids: self.transform_ids.clone(),
167 result_tx: tx,
168 check: self.check.clone(),
169 };
170 let healthcheck = future::ok(()).boxed();
171
172 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
173 }
174
175 fn input(&self) -> Input {
176 Input::all()
177 }
178
179 fn acknowledgements(&self) -> &AcknowledgementsConfig {
180 &AcknowledgementsConfig::DEFAULT
181 }
182}
183
184pub struct UnitTestSink {
185 pub test_name: String,
186 pub transform_ids: Vec<String>,
187 pub result_tx: Option<oneshot::Sender<UnitTestSinkResult>>,
189 pub check: UnitTestSinkCheck,
190}
191
192#[async_trait::async_trait]
193impl StreamSink<Event> for UnitTestSink {
194 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
195 let mut output_events = Vec::new();
196 let mut result = UnitTestSinkResult {
197 test_name: self.test_name,
198 test_errors: Vec::new(),
199 };
200
201 while let Some(event) = input.next().await {
202 output_events.push(event);
203 }
204
205 match self.check {
206 UnitTestSinkCheck::Checks(checks) => {
207 if output_events.is_empty() {
208 result
209 .test_errors
210 .push(format!("checks for transforms {:?} failed: no events received. Topology may be disconnected or transform is missing inputs.", self.transform_ids));
211 } else {
212 for (i, check) in checks.iter().enumerate() {
213 let mut check_errors = Vec::new();
214 for (j, condition) in check.iter().enumerate() {
215 let mut condition_errors = Vec::new();
216 for event in output_events.iter() {
217 match condition.check_with_context(event.clone()).0 {
218 Ok(_) => {
219 condition_errors.clear();
220 break;
221 }
222 Err(error) => {
223 condition_errors
224 .push(format!(" condition[{}]: {}", j, error));
225 }
226 }
227 }
228 check_errors.extend(condition_errors);
229 }
230 if !check_errors.is_empty() {
232 check_errors.insert(
233 0,
234 format!(
235 "check[{}] for transforms {:?} failed conditions:",
236 i, self.transform_ids
237 ),
238 );
239 }
240
241 result.test_errors.extend(check_errors);
242 }
243
244 if !result.test_errors.is_empty() {
246 result.test_errors.push(format!(
247 "output payloads from {:?} (events encoded as JSON):\n {}",
248 self.transform_ids,
249 events_to_string(&output_events)
250 ));
251 }
252 }
253 }
254 UnitTestSinkCheck::NoOutputs => {
255 if !output_events.is_empty() {
256 result.test_errors.push(format!(
257 "check for transforms {:?} failed: expected no outputs",
258 self.transform_ids
259 ));
260 }
261 }
262 UnitTestSinkCheck::NoOp => {}
263 }
264
265 if let Some(tx) = self.result_tx {
266 if tx.send(result).is_err() {
267 error!(message = "Sending unit test results failed in unit test sink.");
268 }
269 }
270 Ok(())
271 }
272}
273
274#[configurable_component(sink("unit_test_stream", "Unit test stream."))]
276#[derive(Clone, Default)]
277pub struct UnitTestStreamSinkConfig {
278 #[serde(skip)]
280 sink: Arc<Mutex<Option<Box<dyn Sink<Event, Error = ()> + Send + Unpin>>>>,
281}
282
283impl_generate_config_from_default!(UnitTestStreamSinkConfig);
284
285impl UnitTestStreamSinkConfig {
286 pub fn new(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
287 Self {
288 sink: Arc::new(Mutex::new(Some(Box::new(sink)))),
289 }
290 }
291}
292
293impl std::fmt::Debug for UnitTestStreamSinkConfig {
294 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 formatter.debug_struct("UnitTestStreamSinkConfig").finish()
296 }
297}
298
299#[async_trait::async_trait]
300#[typetag::serde(name = "unit_test_stream")]
301impl SinkConfig for UnitTestStreamSinkConfig {
302 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
303 let sink = self.sink.lock().await.take().unwrap();
304 let healthcheck = future::ok(()).boxed();
305
306 #[allow(deprecated)]
307 Ok((VectorSink::from_event_sink(sink), healthcheck))
308 }
309
310 fn input(&self) -> Input {
311 Input::all()
312 }
313
314 fn acknowledgements(&self) -> &AcknowledgementsConfig {
315 &AcknowledgementsConfig::DEFAULT
316 }
317}
318
319fn events_to_string(events: &[Event]) -> String {
320 events
321 .iter()
322 .map(|event| match event {
323 Event::Log(log) => serde_json::to_string(log).unwrap_or_else(|_| "{}".to_string()),
324 Event::Metric(metric) => {
325 serde_json::to_string(metric).unwrap_or_else(|_| "{}".to_string())
326 }
327 Event::Trace(trace) => {
328 serde_json::to_string(trace).unwrap_or_else(|_| "{}".to_string())
329 }
330 })
331 .collect::<Vec<_>>()
332 .join("\n ")
333}