1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
use std::collections::{BTreeMap, BTreeSet};

use lookup::OwnedTargetPath;
use vrl::value::Kind;

use crate::config::LogNamespace;

use super::Definition;

/// The input schema for a given component.
///
/// This schema defines the (semantic) fields a component expects to receive from its input
/// components.
#[derive(Debug, Clone, PartialEq)]
pub struct Requirement {
    /// Semantic meanings configured for this requirement.
    meaning: BTreeMap<String, SemanticMeaning>,
}

/// The semantic meaning of an event.
#[derive(Debug, Clone, PartialEq)]
struct SemanticMeaning {
    /// The type required by this semantic meaning.
    kind: Kind,

    /// Whether the meaning is optional.
    ///
    /// If a meaning is optional, the sink must not error when the meaning is not defined in the
    /// provided `Definition`, but it *must* error if it is defined, but its type does not meet the
    /// requirement.
    optional: bool,
}

impl Requirement {
    /// Create a new empty schema.
    ///
    /// An empty schema is the most "open" schema, in that there are no restrictions.
    pub fn empty() -> Self {
        Self {
            meaning: BTreeMap::default(),
        }
    }

    /// Check if the requirement is "empty", meaning:
    ///
    /// 1. There are no required fields defined.
    /// 2. The unknown fields are set to "any".
    /// 3. There are no required meanings defined.
    pub fn is_empty(&self) -> bool {
        self.meaning.is_empty()
    }

    /// Add a restriction to the schema.
    #[must_use]
    pub fn required_meaning(mut self, meaning: impl Into<String>, kind: Kind) -> Self {
        self.insert_meaning(meaning, kind, false);
        self
    }

    /// Add an optional restriction to the schema.
    ///
    /// This differs from `required_meaning` in that it is valid for the event to not have the
    /// specified meaning defined, but invalid for that meaning to be defined, but its [`Kind`] not
    /// matching the configured expectation.
    #[must_use]
    pub fn optional_meaning(mut self, meaning: impl Into<String>, kind: Kind) -> Self {
        self.insert_meaning(meaning, kind, true);
        self
    }

    fn insert_meaning(&mut self, identifier: impl Into<String>, kind: Kind, optional: bool) {
        let meaning = SemanticMeaning { kind, optional };
        self.meaning.insert(identifier.into(), meaning);
    }

    /// Validate the provided [`Definition`] against the current requirement.
    /// If `validate_schema_type` is true, validation ensure the types match,
    /// otherwise it only ensures the required fields exist.
    ///
    /// # Errors
    ///
    /// Returns a list of errors if validation fails.
    pub fn validate(
        &self,
        definition: &Definition,
        validate_schema_type: bool,
    ) -> Result<(), ValidationErrors> {
        let mut errors = vec![];

        // We only validate definitions if there is at least one connected component
        // that uses the Vector namespace.
        if !definition.log_namespaces().contains(&LogNamespace::Vector) {
            return Ok(());
        }

        for (identifier, req_meaning) in &self.meaning {
            // Check if we're dealing with an invalid meaning, meaning the definition has a single
            // meaning identifier pointing to multiple paths.
            if let Some(paths) = definition.invalid_meaning(identifier).cloned() {
                errors.push(ValidationError::MeaningDuplicate {
                    identifier: identifier.clone(),
                    paths,
                });
                continue;
            }

            let maybe_meaning_path = definition.meanings().find_map(|(def_id, path)| {
                if def_id == identifier {
                    Some(path)
                } else {
                    None
                }
            });

            match maybe_meaning_path {
                Some(target_path) if validate_schema_type => {
                    // Get the kind at the path for the given semantic meaning.
                    let definition_kind = definition.kind_at(target_path);

                    if req_meaning.kind.is_superset(&definition_kind).is_err() {
                        // The semantic meaning kind does not match the expected
                        // kind, so we can't use it in the sink.
                        errors.push(ValidationError::MeaningKind {
                            identifier: identifier.clone(),
                            want: req_meaning.kind.clone(),
                            got: definition_kind,
                        });
                    }
                }
                None if !req_meaning.optional => {
                    errors.push(ValidationError::MeaningMissing {
                        identifier: identifier.clone(),
                    });
                }
                _ => {}
            }
        }

        if errors.is_empty() {
            Ok(())
        } else {
            Err(ValidationErrors(errors))
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ValidationErrors(Vec<ValidationError>);

impl ValidationErrors {
    pub fn is_meaning_missing(&self) -> bool {
        self.0.iter().any(ValidationError::is_meaning_missing)
    }

    pub fn is_meaning_kind(&self) -> bool {
        self.0.iter().any(ValidationError::is_meaning_kind)
    }

    pub fn errors(&self) -> &[ValidationError] {
        &self.0
    }
}

impl std::error::Error for ValidationErrors {
    fn source(&self) -> Option<&(dyn snafu::Error + 'static)> {
        Some(&self.0[0])
    }
}

impl std::fmt::Display for ValidationErrors {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        for error in &self.0 {
            error.fmt(f)?;
        }

        Ok(())
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(clippy::enum_variant_names)]
pub enum ValidationError {
    /// A required semantic meaning is missing.
    MeaningMissing { identifier: String },

    /// A semantic meaning has an invalid `[Kind]`.
    MeaningKind {
        identifier: String,
        want: Kind,
        got: Kind,
    },

    /// A semantic meaning is pointing to multiple paths.
    MeaningDuplicate {
        identifier: String,
        paths: BTreeSet<OwnedTargetPath>,
    },
}

impl ValidationError {
    pub fn is_meaning_missing(&self) -> bool {
        matches!(self, Self::MeaningMissing { .. })
    }

    pub fn is_meaning_kind(&self) -> bool {
        matches!(self, Self::MeaningKind { .. })
    }

    pub fn is_meaning_duplicate(&self) -> bool {
        matches!(self, Self::MeaningDuplicate { .. })
    }
}

impl std::fmt::Display for ValidationError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::MeaningMissing { identifier } => {
                write!(f, "missing semantic meaning: {identifier}")
            }
            Self::MeaningKind {
                identifier,
                want,
                got,
            } => write!(
                f,
                "invalid semantic meaning: {identifier} (expected {want}, got {got})"
            ),
            Self::MeaningDuplicate { identifier, paths } => write!(
                f,
                "semantic meaning {} pointing to multiple fields: {}",
                identifier,
                paths
                    .iter()
                    .map(ToString::to_string)
                    .collect::<Vec<_>>()
                    .join(", ")
            ),
        }
    }
}

impl std::error::Error for ValidationError {}

#[cfg(test)]
mod tests {
    use lookup::lookup_v2::parse_target_path;
    use lookup::owned_value_path;
    use std::collections::HashMap;

    use super::*;

    #[test]
    fn test_doesnt_validate_types() {
        let requirement = Requirement::empty().required_meaning("foo", Kind::boolean());
        let definition = Definition::default_for_namespace(&[LogNamespace::Vector].into())
            .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"));

        assert_eq!(Ok(()), requirement.validate(&definition, false));
    }

    #[test]
    fn test_doesnt_validate_legacy_namespace() {
        let requirement = Requirement::empty().required_meaning("foo", Kind::boolean());

        // We get an error if we have a connected component with the Vector namespace.
        let definition =
            Definition::default_for_namespace(&[LogNamespace::Vector, LogNamespace::Legacy].into())
                .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"));

        assert_ne!(Ok(()), requirement.validate(&definition, true));

        // We don't get an error if we have a connected component with just the Legacy namespace.
        let definition = Definition::default_for_namespace(&[LogNamespace::Legacy].into())
            .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"));

        assert_eq!(Ok(()), requirement.validate(&definition, true));
    }

    #[test]
    #[allow(clippy::too_many_lines)]
    fn test_validate() {
        struct TestCase {
            requirement: Requirement,
            definition: Definition,
            errors: Vec<ValidationError>,
        }

        for (
            title,
            TestCase {
                requirement,
                definition,
                errors,
            },
        ) in HashMap::from([
            (
                "empty",
                TestCase {
                    requirement: Requirement::empty(),
                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
                    errors: vec![],
                },
            ),
            (
                "missing required meaning",
                TestCase {
                    requirement: Requirement::empty().required_meaning("foo", Kind::any()),
                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
                    errors: vec![ValidationError::MeaningMissing {
                        identifier: "foo".into(),
                    }],
                },
            ),
            (
                "missing required meanings",
                TestCase {
                    requirement: Requirement::empty()
                        .required_meaning("foo", Kind::any())
                        .required_meaning("bar", Kind::any()),
                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
                    errors: vec![
                        ValidationError::MeaningMissing {
                            identifier: "bar".into(),
                        },
                        ValidationError::MeaningMissing {
                            identifier: "foo".into(),
                        },
                    ],
                },
            ),
            (
                "missing optional meaning",
                TestCase {
                    requirement: Requirement::empty().optional_meaning("foo", Kind::any()),
                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
                    errors: vec![],
                },
            ),
            (
                "missing mixed meanings",
                TestCase {
                    requirement: Requirement::empty()
                        .optional_meaning("foo", Kind::any())
                        .required_meaning("bar", Kind::any()),
                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
                    errors: vec![ValidationError::MeaningMissing {
                        identifier: "bar".into(),
                    }],
                },
            ),
            (
                "invalid required meaning kind",
                TestCase {
                    requirement: Requirement::empty().required_meaning("foo", Kind::boolean()),
                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into())
                        .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo")),
                    errors: vec![ValidationError::MeaningKind {
                        identifier: "foo".into(),
                        want: Kind::boolean(),
                        got: Kind::integer(),
                    }],
                },
            ),
            (
                "invalid optional meaning kind",
                TestCase {
                    requirement: Requirement::empty().optional_meaning("foo", Kind::boolean()),
                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into())
                        .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo")),
                    errors: vec![ValidationError::MeaningKind {
                        identifier: "foo".into(),
                        want: Kind::boolean(),
                        got: Kind::integer(),
                    }],
                },
            ),
            (
                "duplicate meaning pointers",
                TestCase {
                    requirement: Requirement::empty().optional_meaning("foo", Kind::boolean()),
                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into())
                        .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"))
                        .merge(
                            Definition::default_for_namespace(&[LogNamespace::Vector].into())
                                .with_event_field(
                                    &owned_value_path!("bar"),
                                    Kind::boolean(),
                                    Some("foo"),
                                ),
                        ),
                    errors: vec![ValidationError::MeaningDuplicate {
                        identifier: "foo".into(),
                        paths: BTreeSet::from([
                            parse_target_path("foo").unwrap(),
                            parse_target_path("bar").unwrap(),
                        ]),
                    }],
                },
            ),
        ]) {
            let got = requirement.validate(&definition, true);
            let want = if errors.is_empty() {
                Ok(())
            } else {
                Err(ValidationErrors(errors))
            };

            assert_eq!(got, want, "{title}");
        }
    }
}