vector/components/validation/validators/component_spec/
mod.rs

1use crate::components::validation::{
2    component_names::*, ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent,
3};
4use vector_lib::event::{Event, Metric, MetricKind};
5
6use super::{ComponentMetricType, Validator};
7
8/// Validates that the component meets the requirements of the [Component Specification][component_spec].
9///
10/// Generally speaking, the Component Specification dictates the expected events and metrics
11/// that must be emitted by a component of a specific type. This ensures that not only are
12/// metrics emitting the expected telemetry, but that operators can depend on, for example, any
13/// source to always emit a specific base set of metrics that are specific to sources, and so on.
14///
15/// [component_spec]: https://github.com/vectordotdev/vector/blob/master/docs/specs/component.md
16#[derive(Default)]
17pub struct ComponentSpecValidator;
18
19impl Validator for ComponentSpecValidator {
20    fn name(&self) -> &'static str {
21        "component_spec"
22    }
23
24    fn check_validation(
25        &self,
26        component_type: ComponentType,
27        expectation: TestCaseExpectation,
28        inputs: &[TestEvent],
29        outputs: &[Event],
30        telemetry_events: &[Event],
31        runner_metrics: &RunnerMetrics,
32    ) -> Result<Vec<String>, Vec<String>> {
33        let expect_received_events = inputs
34            .iter()
35            .filter(|te| !te.should_fail() || te.should_reject())
36            .count() as u64;
37
38        for input in inputs {
39            info!("Validator observed input event: {:?}", input);
40        }
41
42        for output in outputs {
43            info!("Validator observed output event: {:?}", output);
44        }
45
46        // Validate that the number of inputs/outputs matched the test case expectation.
47        //
48        // NOTE: This logic currently assumes that one input event leads to, at most, one output
49        // event. It also assumes that tests that are marked as expecting to be partially successful
50        // should never emit the same number of output events as there are input events.
51        match expectation {
52            TestCaseExpectation::Success => {
53                if inputs.len() != outputs.len() {
54                    return Err(vec![format!(
55                        "Sent {} inputs but received {} outputs.",
56                        inputs.len(),
57                        outputs.len()
58                    )]);
59                }
60            }
61            TestCaseExpectation::Failure => {
62                if !outputs.is_empty() {
63                    return Err(vec![format!(
64                        "Received {} outputs but none were expected.",
65                        outputs.len()
66                    )]);
67                }
68            }
69            TestCaseExpectation::PartialSuccess => {
70                if inputs.len() == outputs.len() {
71                    return Err(vec![
72                        "Received an output event for every input, when only some outputs were expected.".to_string()
73                    ]);
74                }
75            }
76        }
77
78        let mut run_out = vec![
79            format!(
80                "sent {} inputs and received {} outputs",
81                inputs.len(),
82                outputs.len()
83            ),
84            format!("received {} telemetry events", telemetry_events.len()),
85        ];
86
87        let out = validate_telemetry(
88            component_type,
89            telemetry_events,
90            runner_metrics,
91            expect_received_events,
92        )?;
93        run_out.extend(out);
94
95        Ok(run_out)
96    }
97}
98
99fn validate_telemetry(
100    component_type: ComponentType,
101    telemetry_events: &[Event],
102    runner_metrics: &RunnerMetrics,
103    expect_received_events: u64,
104) -> Result<Vec<String>, Vec<String>> {
105    let mut out: Vec<String> = Vec::new();
106    let mut errs: Vec<String> = Vec::new();
107
108    let metric_types = [
109        ComponentMetricType::EventsReceived,
110        ComponentMetricType::EventsReceivedBytes,
111        ComponentMetricType::ReceivedBytesTotal,
112        ComponentMetricType::SentEventsTotal,
113        ComponentMetricType::SentEventBytesTotal,
114        ComponentMetricType::SentBytesTotal,
115        ComponentMetricType::ErrorsTotal,
116        ComponentMetricType::DiscardedEventsTotal,
117    ];
118
119    metric_types.iter().for_each(|metric_type| {
120        match validate_metric(
121            telemetry_events,
122            runner_metrics,
123            metric_type,
124            component_type,
125            expect_received_events,
126        ) {
127            Err(e) => errs.extend(e),
128            Ok(m) => out.extend(m),
129        }
130    });
131
132    if errs.is_empty() {
133        Ok(out)
134    } else {
135        Err(errs)
136    }
137}
138
139fn validate_metric(
140    telemetry_events: &[Event],
141    runner_metrics: &RunnerMetrics,
142    metric_type: &ComponentMetricType,
143    component_type: ComponentType,
144    expect_received_events: u64,
145) -> Result<Vec<String>, Vec<String>> {
146    let component_id = match component_type {
147        ComponentType::Source => TEST_SOURCE_NAME,
148        ComponentType::Transform => TEST_TRANSFORM_NAME,
149        ComponentType::Sink => TEST_SINK_NAME,
150    };
151
152    let expected = match metric_type {
153        ComponentMetricType::EventsReceived => {
154            // The reciprocal metric for events received is events sent,
155            // so the expected value is what the input runner sent.
156            runner_metrics.sent_events_total
157        }
158        ComponentMetricType::EventsReceivedBytes => {
159            // The reciprocal metric for received_event_bytes is sent_event_bytes,
160            // so the expected value is what the input runner sent.
161            runner_metrics.sent_event_bytes_total
162        }
163        ComponentMetricType::ReceivedBytesTotal => {
164            // The reciprocal metric for received_bytes is sent_bytes,
165            // so the expected value is what the input runner sent.
166            if component_type == ComponentType::Sink {
167                0 // sinks should not emit this metric
168            } else {
169                runner_metrics.sent_bytes_total
170            }
171        }
172        ComponentMetricType::SentEventsTotal => {
173            // The reciprocal metric for events sent is events received,
174            // so the expected value is what the output runner received.
175            runner_metrics.received_events_total
176        }
177        ComponentMetricType::SentBytesTotal => {
178            // The reciprocal metric for sent_bytes is received_bytes,
179            // so the expected value is what the output runner received.
180            if component_type == ComponentType::Source {
181                0 // sources should not emit this metric
182            } else {
183                runner_metrics.received_bytes_total
184            }
185        }
186        ComponentMetricType::SentEventBytesTotal => {
187            // The reciprocal metric for sent_event_bytes is received_event_bytes,
188            // so the expected value is what the output runner received.
189            runner_metrics.received_event_bytes_total
190        }
191        ComponentMetricType::ErrorsTotal => runner_metrics.errors_total,
192        ComponentMetricType::DiscardedEventsTotal => runner_metrics.discarded_events_total,
193    };
194
195    compare_actual_to_expected(
196        telemetry_events,
197        metric_type,
198        component_id,
199        expected,
200        expect_received_events,
201    )
202}
203
204fn filter_events_by_metric_and_component<'a>(
205    telemetry_events: &'a [Event],
206    metric: &ComponentMetricType,
207    component_id: &'a str,
208) -> Vec<&'a Metric> {
209    info!(
210        "Filter looking for metric {} {}",
211        metric.to_string(),
212        component_id
213    );
214
215    let metrics: Vec<&Metric> = telemetry_events
216        .iter()
217        .flat_map(|e| {
218            if let vector_lib::event::Event::Metric(m) = e {
219                Some(m)
220            } else {
221                None
222            }
223        })
224        .filter(|&m| {
225            if m.name() == metric.to_string() {
226                debug!("{}", m);
227                if let Some(tags) = m.tags() {
228                    if tags.get("component_id").unwrap_or("") == component_id {
229                        return true;
230                    }
231                }
232            }
233
234            false
235        })
236        .collect();
237
238    info!("{}: {} metrics found.", metric.to_string(), metrics.len());
239
240    metrics
241}
242
243fn sum_counters(
244    metric_name: &ComponentMetricType,
245    metrics: &[&Metric],
246) -> Result<u64, Vec<String>> {
247    let mut sum: f64 = 0.0;
248    let mut errs = Vec::new();
249
250    for m in metrics {
251        match m.value() {
252            vector_lib::event::MetricValue::Counter { value } => {
253                if let MetricKind::Absolute = m.data().kind {
254                    sum = *value;
255                } else {
256                    sum += *value;
257                }
258            }
259            _ => errs.push(format!("{metric_name}: metric value is not a counter",)),
260        }
261    }
262
263    if errs.is_empty() {
264        Ok(sum as u64)
265    } else {
266        Err(errs)
267    }
268}
269
270fn compare_actual_to_expected(
271    telemetry_events: &[Event],
272    metric_type: &ComponentMetricType,
273    component_id: &str,
274    expected: u64,
275    expect_received_events: u64,
276) -> Result<Vec<String>, Vec<String>> {
277    let mut errs: Vec<String> = Vec::new();
278
279    let metrics =
280        filter_events_by_metric_and_component(telemetry_events, metric_type, component_id);
281
282    let actual = sum_counters(metric_type, &metrics)?;
283
284    info!("{metric_type}: expected {expected}, actual {actual}.");
285
286    if actual != expected &&
287        // This is a bit messy. The issue is that EstimatedJsonSizeOf can be called by a component
288        // on an event array, or on a single event. And we have no way of knowing which that is.
289        // By default the input driver for the framework is not assuming it is an array, so we
290        // check here if it matches what the array scenario would be, which is to add the size of
291        // the brackets, for each event.
292        (metric_type != &ComponentMetricType::EventsReceivedBytes
293            || (actual != (expected + (expect_received_events * 2))))
294    {
295        errs.push(format!(
296            "{metric_type}: expected {expected}, actual {actual}",
297        ));
298    }
299
300    if !errs.is_empty() {
301        return Err(errs);
302    }
303
304    Ok(vec![format!("{}: {}", metric_type, actual)])
305}