vector/components/validation/validators/component_spec/
mod.rs1use vector_lib::event::{Event, Metric, MetricKind};
2
3use super::{ComponentMetricType, Validator};
4use crate::components::validation::{
5 ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent, component_names::*,
6};
7
8#[derive(Default)]
17pub struct ComponentSpecValidator;
18
19impl Validator for ComponentSpecValidator {
20 fn name(&self) -> &'static str {
21 "component_spec"
22 }
23
24 fn check_validation(
25 &self,
26 component_type: ComponentType,
27 expectation: TestCaseExpectation,
28 inputs: &[TestEvent],
29 outputs: &[Event],
30 telemetry_events: &[Event],
31 runner_metrics: &RunnerMetrics,
32 ) -> Result<Vec<String>, Vec<String>> {
33 let expect_received_events = inputs
34 .iter()
35 .filter(|te| !te.should_fail() || te.should_reject())
36 .count() as u64;
37
38 for input in inputs {
39 info!("Validator observed input event: {:?}", input);
40 }
41
42 for output in outputs {
43 info!("Validator observed output event: {:?}", output);
44 }
45
46 match expectation {
52 TestCaseExpectation::Success => {
53 if inputs.len() != outputs.len() {
54 return Err(vec![format!(
55 "Sent {} inputs but received {} outputs.",
56 inputs.len(),
57 outputs.len()
58 )]);
59 }
60 }
61 TestCaseExpectation::Failure => {
62 if !outputs.is_empty() {
63 return Err(vec![format!(
64 "Received {} outputs but none were expected.",
65 outputs.len()
66 )]);
67 }
68 }
69 TestCaseExpectation::PartialSuccess => {
70 if inputs.len() == outputs.len() {
71 return Err(vec![
72 "Received an output event for every input, when only some outputs were expected.".to_string()
73 ]);
74 }
75 }
76 }
77
78 let mut run_out = vec![
79 format!(
80 "sent {} inputs and received {} outputs",
81 inputs.len(),
82 outputs.len()
83 ),
84 format!("received {} telemetry events", telemetry_events.len()),
85 ];
86
87 let out = validate_telemetry(
88 component_type,
89 telemetry_events,
90 runner_metrics,
91 expect_received_events,
92 )?;
93 run_out.extend(out);
94
95 Ok(run_out)
96 }
97}
98
99fn validate_telemetry(
100 component_type: ComponentType,
101 telemetry_events: &[Event],
102 runner_metrics: &RunnerMetrics,
103 expect_received_events: u64,
104) -> Result<Vec<String>, Vec<String>> {
105 let mut out: Vec<String> = Vec::new();
106 let mut errs: Vec<String> = Vec::new();
107
108 let metric_types = [
109 ComponentMetricType::EventsReceived,
110 ComponentMetricType::EventsReceivedBytes,
111 ComponentMetricType::ReceivedBytesTotal,
112 ComponentMetricType::SentEventsTotal,
113 ComponentMetricType::SentEventBytesTotal,
114 ComponentMetricType::SentBytesTotal,
115 ComponentMetricType::ErrorsTotal,
116 ComponentMetricType::DiscardedEventsTotal,
117 ];
118
119 metric_types.iter().for_each(|metric_type| {
120 match validate_metric(
121 telemetry_events,
122 runner_metrics,
123 metric_type,
124 component_type,
125 expect_received_events,
126 ) {
127 Err(e) => errs.extend(e),
128 Ok(m) => out.extend(m),
129 }
130 });
131
132 if errs.is_empty() { Ok(out) } else { Err(errs) }
133}
134
135fn validate_metric(
136 telemetry_events: &[Event],
137 runner_metrics: &RunnerMetrics,
138 metric_type: &ComponentMetricType,
139 component_type: ComponentType,
140 expect_received_events: u64,
141) -> Result<Vec<String>, Vec<String>> {
142 let component_id = match component_type {
143 ComponentType::Source => TEST_SOURCE_NAME,
144 ComponentType::Transform => TEST_TRANSFORM_NAME,
145 ComponentType::Sink => TEST_SINK_NAME,
146 };
147
148 let expected = match metric_type {
149 ComponentMetricType::EventsReceived => {
150 runner_metrics.sent_events_total
153 }
154 ComponentMetricType::EventsReceivedBytes => {
155 runner_metrics.sent_event_bytes_total
158 }
159 ComponentMetricType::ReceivedBytesTotal => {
160 if component_type == ComponentType::Sink {
163 0 } else {
165 runner_metrics.sent_bytes_total
166 }
167 }
168 ComponentMetricType::SentEventsTotal => {
169 runner_metrics.received_events_total
172 }
173 ComponentMetricType::SentBytesTotal => {
174 if component_type == ComponentType::Source {
177 0 } else {
179 runner_metrics.received_bytes_total
180 }
181 }
182 ComponentMetricType::SentEventBytesTotal => {
183 runner_metrics.received_event_bytes_total
186 }
187 ComponentMetricType::ErrorsTotal => runner_metrics.errors_total,
188 ComponentMetricType::DiscardedEventsTotal => runner_metrics.discarded_events_total,
189 };
190
191 compare_actual_to_expected(
192 telemetry_events,
193 metric_type,
194 component_id,
195 expected,
196 expect_received_events,
197 )
198}
199
200fn filter_events_by_metric_and_component<'a>(
201 telemetry_events: &'a [Event],
202 metric: &ComponentMetricType,
203 component_id: &'a str,
204) -> Vec<&'a Metric> {
205 info!(
206 "Filter looking for metric {} {}",
207 metric.to_string(),
208 component_id
209 );
210
211 let metrics: Vec<&Metric> = telemetry_events
212 .iter()
213 .flat_map(|e| {
214 if let vector_lib::event::Event::Metric(m) = e {
215 Some(m)
216 } else {
217 None
218 }
219 })
220 .filter(|&m| {
221 if m.name() == metric.to_string() {
222 debug!("{}", m);
223 if let Some(tags) = m.tags()
224 && tags.get("component_id").unwrap_or("") == component_id
225 {
226 return true;
227 }
228 }
229
230 false
231 })
232 .collect();
233
234 info!("{}: {} metrics found.", metric.to_string(), metrics.len());
235
236 metrics
237}
238
239fn sum_counters(
240 metric_name: &ComponentMetricType,
241 metrics: &[&Metric],
242) -> Result<u64, Vec<String>> {
243 let mut sum: f64 = 0.0;
244 let mut errs = Vec::new();
245
246 for m in metrics {
247 match m.value() {
248 vector_lib::event::MetricValue::Counter { value } => {
249 if let MetricKind::Absolute = m.data().kind {
250 sum = *value;
251 } else {
252 sum += *value;
253 }
254 }
255 _ => errs.push(format!("{metric_name}: metric value is not a counter",)),
256 }
257 }
258
259 if errs.is_empty() {
260 Ok(sum as u64)
261 } else {
262 Err(errs)
263 }
264}
265
266fn compare_actual_to_expected(
267 telemetry_events: &[Event],
268 metric_type: &ComponentMetricType,
269 component_id: &str,
270 expected: u64,
271 expect_received_events: u64,
272) -> Result<Vec<String>, Vec<String>> {
273 let mut errs: Vec<String> = Vec::new();
274
275 let metrics =
276 filter_events_by_metric_and_component(telemetry_events, metric_type, component_id);
277
278 let actual = sum_counters(metric_type, &metrics)?;
279
280 info!("{metric_type}: expected {expected}, actual {actual}.");
281
282 if actual != expected &&
283 (metric_type != &ComponentMetricType::EventsReceivedBytes
289 || (actual != (expected + (expect_received_events * 2))))
290 {
291 errs.push(format!(
292 "{metric_type}: expected {expected}, actual {actual}",
293 ));
294 }
295
296 if !errs.is_empty() {
297 return Err(errs);
298 }
299
300 Ok(vec![format!("{}: {}", metric_type, actual)])
301}