vector/components/validation/validators/component_spec/
mod.rs1use vector_lib::event::{Event, Metric, MetricKind};
2
3use super::{ComponentMetricType, Validator};
4use crate::components::validation::{
5    ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent, component_names::*,
6};
7
8#[derive(Default)]
17pub struct ComponentSpecValidator;
18
19impl Validator for ComponentSpecValidator {
20    fn name(&self) -> &'static str {
21        "component_spec"
22    }
23
24    fn check_validation(
25        &self,
26        component_type: ComponentType,
27        expectation: TestCaseExpectation,
28        inputs: &[TestEvent],
29        outputs: &[Event],
30        telemetry_events: &[Event],
31        runner_metrics: &RunnerMetrics,
32    ) -> Result<Vec<String>, Vec<String>> {
33        let expect_received_events = inputs
34            .iter()
35            .filter(|te| !te.should_fail() || te.should_reject())
36            .count() as u64;
37
38        for input in inputs {
39            info!("Validator observed input event: {:?}", input);
40        }
41
42        for output in outputs {
43            info!("Validator observed output event: {:?}", output);
44        }
45
46        match expectation {
52            TestCaseExpectation::Success => {
53                if inputs.len() != outputs.len() {
54                    return Err(vec![format!(
55                        "Sent {} inputs but received {} outputs.",
56                        inputs.len(),
57                        outputs.len()
58                    )]);
59                }
60            }
61            TestCaseExpectation::Failure => {
62                if !outputs.is_empty() {
63                    return Err(vec![format!(
64                        "Received {} outputs but none were expected.",
65                        outputs.len()
66                    )]);
67                }
68            }
69            TestCaseExpectation::PartialSuccess => {
70                if inputs.len() == outputs.len() {
71                    return Err(vec![
72                        "Received an output event for every input, when only some outputs were expected.".to_string()
73                    ]);
74                }
75            }
76        }
77
78        let mut run_out = vec![
79            format!(
80                "sent {} inputs and received {} outputs",
81                inputs.len(),
82                outputs.len()
83            ),
84            format!("received {} telemetry events", telemetry_events.len()),
85        ];
86
87        let out = validate_telemetry(
88            component_type,
89            telemetry_events,
90            runner_metrics,
91            expect_received_events,
92        )?;
93        run_out.extend(out);
94
95        Ok(run_out)
96    }
97}
98
99fn validate_telemetry(
100    component_type: ComponentType,
101    telemetry_events: &[Event],
102    runner_metrics: &RunnerMetrics,
103    expect_received_events: u64,
104) -> Result<Vec<String>, Vec<String>> {
105    let mut out: Vec<String> = Vec::new();
106    let mut errs: Vec<String> = Vec::new();
107
108    let metric_types = [
109        ComponentMetricType::EventsReceived,
110        ComponentMetricType::EventsReceivedBytes,
111        ComponentMetricType::ReceivedBytesTotal,
112        ComponentMetricType::SentEventsTotal,
113        ComponentMetricType::SentEventBytesTotal,
114        ComponentMetricType::SentBytesTotal,
115        ComponentMetricType::ErrorsTotal,
116        ComponentMetricType::DiscardedEventsTotal,
117    ];
118
119    metric_types.iter().for_each(|metric_type| {
120        match validate_metric(
121            telemetry_events,
122            runner_metrics,
123            metric_type,
124            component_type,
125            expect_received_events,
126        ) {
127            Err(e) => errs.extend(e),
128            Ok(m) => out.extend(m),
129        }
130    });
131
132    if errs.is_empty() { Ok(out) } else { Err(errs) }
133}
134
135fn validate_metric(
136    telemetry_events: &[Event],
137    runner_metrics: &RunnerMetrics,
138    metric_type: &ComponentMetricType,
139    component_type: ComponentType,
140    expect_received_events: u64,
141) -> Result<Vec<String>, Vec<String>> {
142    let component_id = match component_type {
143        ComponentType::Source => TEST_SOURCE_NAME,
144        ComponentType::Transform => TEST_TRANSFORM_NAME,
145        ComponentType::Sink => TEST_SINK_NAME,
146    };
147
148    let expected = match metric_type {
149        ComponentMetricType::EventsReceived => {
150            runner_metrics.sent_events_total
153        }
154        ComponentMetricType::EventsReceivedBytes => {
155            runner_metrics.sent_event_bytes_total
158        }
159        ComponentMetricType::ReceivedBytesTotal => {
160            if component_type == ComponentType::Sink {
163                0 } else {
165                runner_metrics.sent_bytes_total
166            }
167        }
168        ComponentMetricType::SentEventsTotal => {
169            runner_metrics.received_events_total
172        }
173        ComponentMetricType::SentBytesTotal => {
174            if component_type == ComponentType::Source {
177                0 } else {
179                runner_metrics.received_bytes_total
180            }
181        }
182        ComponentMetricType::SentEventBytesTotal => {
183            runner_metrics.received_event_bytes_total
186        }
187        ComponentMetricType::ErrorsTotal => runner_metrics.errors_total,
188        ComponentMetricType::DiscardedEventsTotal => runner_metrics.discarded_events_total,
189    };
190
191    compare_actual_to_expected(
192        telemetry_events,
193        metric_type,
194        component_id,
195        expected,
196        expect_received_events,
197    )
198}
199
200fn filter_events_by_metric_and_component<'a>(
201    telemetry_events: &'a [Event],
202    metric: &ComponentMetricType,
203    component_id: &'a str,
204) -> Vec<&'a Metric> {
205    info!(
206        "Filter looking for metric {} {}",
207        metric.to_string(),
208        component_id
209    );
210
211    let metrics: Vec<&Metric> = telemetry_events
212        .iter()
213        .flat_map(|e| {
214            if let vector_lib::event::Event::Metric(m) = e {
215                Some(m)
216            } else {
217                None
218            }
219        })
220        .filter(|&m| {
221            if m.name() == metric.to_string() {
222                debug!("{}", m);
223                if let Some(tags) = m.tags()
224                    && tags.get("component_id").unwrap_or("") == component_id
225                {
226                    return true;
227                }
228            }
229
230            false
231        })
232        .collect();
233
234    info!("{}: {} metrics found.", metric.to_string(), metrics.len());
235
236    metrics
237}
238
239fn sum_counters(
240    metric_name: &ComponentMetricType,
241    metrics: &[&Metric],
242) -> Result<u64, Vec<String>> {
243    let mut sum: f64 = 0.0;
244    let mut errs = Vec::new();
245
246    for m in metrics {
247        match m.value() {
248            vector_lib::event::MetricValue::Counter { value } => {
249                if let MetricKind::Absolute = m.data().kind {
250                    sum = *value;
251                } else {
252                    sum += *value;
253                }
254            }
255            _ => errs.push(format!("{metric_name}: metric value is not a counter",)),
256        }
257    }
258
259    if errs.is_empty() {
260        Ok(sum as u64)
261    } else {
262        Err(errs)
263    }
264}
265
266fn compare_actual_to_expected(
267    telemetry_events: &[Event],
268    metric_type: &ComponentMetricType,
269    component_id: &str,
270    expected: u64,
271    expect_received_events: u64,
272) -> Result<Vec<String>, Vec<String>> {
273    let mut errs: Vec<String> = Vec::new();
274
275    let metrics =
276        filter_events_by_metric_and_component(telemetry_events, metric_type, component_id);
277
278    let actual = sum_counters(metric_type, &metrics)?;
279
280    info!("{metric_type}: expected {expected}, actual {actual}.");
281
282    if actual != expected &&
283        (metric_type != &ComponentMetricType::EventsReceivedBytes
289            || (actual != (expected + (expect_received_events * 2))))
290    {
291        errs.push(format!(
292            "{metric_type}: expected {expected}, actual {actual}",
293        ));
294    }
295
296    if !errs.is_empty() {
297        return Err(errs);
298    }
299
300    Ok(vec![format!("{}: {}", metric_type, actual)])
301}