vector/components/validation/validators/component_spec/
mod.rs1use crate::components::validation::{
2 component_names::*, ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent,
3};
4use vector_lib::event::{Event, Metric, MetricKind};
5
6use super::{ComponentMetricType, Validator};
7
8#[derive(Default)]
17pub struct ComponentSpecValidator;
18
19impl Validator for ComponentSpecValidator {
20 fn name(&self) -> &'static str {
21 "component_spec"
22 }
23
24 fn check_validation(
25 &self,
26 component_type: ComponentType,
27 expectation: TestCaseExpectation,
28 inputs: &[TestEvent],
29 outputs: &[Event],
30 telemetry_events: &[Event],
31 runner_metrics: &RunnerMetrics,
32 ) -> Result<Vec<String>, Vec<String>> {
33 let expect_received_events = inputs
34 .iter()
35 .filter(|te| !te.should_fail() || te.should_reject())
36 .count() as u64;
37
38 for input in inputs {
39 info!("Validator observed input event: {:?}", input);
40 }
41
42 for output in outputs {
43 info!("Validator observed output event: {:?}", output);
44 }
45
46 match expectation {
52 TestCaseExpectation::Success => {
53 if inputs.len() != outputs.len() {
54 return Err(vec![format!(
55 "Sent {} inputs but received {} outputs.",
56 inputs.len(),
57 outputs.len()
58 )]);
59 }
60 }
61 TestCaseExpectation::Failure => {
62 if !outputs.is_empty() {
63 return Err(vec![format!(
64 "Received {} outputs but none were expected.",
65 outputs.len()
66 )]);
67 }
68 }
69 TestCaseExpectation::PartialSuccess => {
70 if inputs.len() == outputs.len() {
71 return Err(vec![
72 "Received an output event for every input, when only some outputs were expected.".to_string()
73 ]);
74 }
75 }
76 }
77
78 let mut run_out = vec![
79 format!(
80 "sent {} inputs and received {} outputs",
81 inputs.len(),
82 outputs.len()
83 ),
84 format!("received {} telemetry events", telemetry_events.len()),
85 ];
86
87 let out = validate_telemetry(
88 component_type,
89 telemetry_events,
90 runner_metrics,
91 expect_received_events,
92 )?;
93 run_out.extend(out);
94
95 Ok(run_out)
96 }
97}
98
99fn validate_telemetry(
100 component_type: ComponentType,
101 telemetry_events: &[Event],
102 runner_metrics: &RunnerMetrics,
103 expect_received_events: u64,
104) -> Result<Vec<String>, Vec<String>> {
105 let mut out: Vec<String> = Vec::new();
106 let mut errs: Vec<String> = Vec::new();
107
108 let metric_types = [
109 ComponentMetricType::EventsReceived,
110 ComponentMetricType::EventsReceivedBytes,
111 ComponentMetricType::ReceivedBytesTotal,
112 ComponentMetricType::SentEventsTotal,
113 ComponentMetricType::SentEventBytesTotal,
114 ComponentMetricType::SentBytesTotal,
115 ComponentMetricType::ErrorsTotal,
116 ComponentMetricType::DiscardedEventsTotal,
117 ];
118
119 metric_types.iter().for_each(|metric_type| {
120 match validate_metric(
121 telemetry_events,
122 runner_metrics,
123 metric_type,
124 component_type,
125 expect_received_events,
126 ) {
127 Err(e) => errs.extend(e),
128 Ok(m) => out.extend(m),
129 }
130 });
131
132 if errs.is_empty() {
133 Ok(out)
134 } else {
135 Err(errs)
136 }
137}
138
139fn validate_metric(
140 telemetry_events: &[Event],
141 runner_metrics: &RunnerMetrics,
142 metric_type: &ComponentMetricType,
143 component_type: ComponentType,
144 expect_received_events: u64,
145) -> Result<Vec<String>, Vec<String>> {
146 let component_id = match component_type {
147 ComponentType::Source => TEST_SOURCE_NAME,
148 ComponentType::Transform => TEST_TRANSFORM_NAME,
149 ComponentType::Sink => TEST_SINK_NAME,
150 };
151
152 let expected = match metric_type {
153 ComponentMetricType::EventsReceived => {
154 runner_metrics.sent_events_total
157 }
158 ComponentMetricType::EventsReceivedBytes => {
159 runner_metrics.sent_event_bytes_total
162 }
163 ComponentMetricType::ReceivedBytesTotal => {
164 if component_type == ComponentType::Sink {
167 0 } else {
169 runner_metrics.sent_bytes_total
170 }
171 }
172 ComponentMetricType::SentEventsTotal => {
173 runner_metrics.received_events_total
176 }
177 ComponentMetricType::SentBytesTotal => {
178 if component_type == ComponentType::Source {
181 0 } else {
183 runner_metrics.received_bytes_total
184 }
185 }
186 ComponentMetricType::SentEventBytesTotal => {
187 runner_metrics.received_event_bytes_total
190 }
191 ComponentMetricType::ErrorsTotal => runner_metrics.errors_total,
192 ComponentMetricType::DiscardedEventsTotal => runner_metrics.discarded_events_total,
193 };
194
195 compare_actual_to_expected(
196 telemetry_events,
197 metric_type,
198 component_id,
199 expected,
200 expect_received_events,
201 )
202}
203
204fn filter_events_by_metric_and_component<'a>(
205 telemetry_events: &'a [Event],
206 metric: &ComponentMetricType,
207 component_id: &'a str,
208) -> Vec<&'a Metric> {
209 info!(
210 "Filter looking for metric {} {}",
211 metric.to_string(),
212 component_id
213 );
214
215 let metrics: Vec<&Metric> = telemetry_events
216 .iter()
217 .flat_map(|e| {
218 if let vector_lib::event::Event::Metric(m) = e {
219 Some(m)
220 } else {
221 None
222 }
223 })
224 .filter(|&m| {
225 if m.name() == metric.to_string() {
226 debug!("{}", m);
227 if let Some(tags) = m.tags() {
228 if tags.get("component_id").unwrap_or("") == component_id {
229 return true;
230 }
231 }
232 }
233
234 false
235 })
236 .collect();
237
238 info!("{}: {} metrics found.", metric.to_string(), metrics.len());
239
240 metrics
241}
242
243fn sum_counters(
244 metric_name: &ComponentMetricType,
245 metrics: &[&Metric],
246) -> Result<u64, Vec<String>> {
247 let mut sum: f64 = 0.0;
248 let mut errs = Vec::new();
249
250 for m in metrics {
251 match m.value() {
252 vector_lib::event::MetricValue::Counter { value } => {
253 if let MetricKind::Absolute = m.data().kind {
254 sum = *value;
255 } else {
256 sum += *value;
257 }
258 }
259 _ => errs.push(format!("{metric_name}: metric value is not a counter",)),
260 }
261 }
262
263 if errs.is_empty() {
264 Ok(sum as u64)
265 } else {
266 Err(errs)
267 }
268}
269
270fn compare_actual_to_expected(
271 telemetry_events: &[Event],
272 metric_type: &ComponentMetricType,
273 component_id: &str,
274 expected: u64,
275 expect_received_events: u64,
276) -> Result<Vec<String>, Vec<String>> {
277 let mut errs: Vec<String> = Vec::new();
278
279 let metrics =
280 filter_events_by_metric_and_component(telemetry_events, metric_type, component_id);
281
282 let actual = sum_counters(metric_type, &metrics)?;
283
284 info!("{metric_type}: expected {expected}, actual {actual}.");
285
286 if actual != expected &&
287 (metric_type != &ComponentMetricType::EventsReceivedBytes
293 || (actual != (expected + (expect_received_events * 2))))
294 {
295 errs.push(format!(
296 "{metric_type}: expected {expected}, actual {actual}",
297 ));
298 }
299
300 if !errs.is_empty() {
301 return Err(errs);
302 }
303
304 Ok(vec![format!("{}: {}", metric_type, actual)])
305}