vector_config/schema/parser/
query.rs

1use std::{fs::File, io::BufReader, path::Path, sync::OnceLock};
2
3use serde_json::Value;
4use snafu::Snafu;
5use vector_config_common::{
6    attributes::CustomAttribute,
7    constants,
8    schema::{InstanceType, RootSchema, Schema, SchemaObject, SingleOrVec},
9};
10
11#[derive(Debug, Snafu)]
12#[snafu(module, context(suffix(false)))]
13pub enum QueryError {
14    #[snafu(display("I/O error during opening schema: {source}"), context(false))]
15    Io { source: std::io::Error },
16
17    #[snafu(display("deserialization failed: {source}"), context(false))]
18    Deserialization { source: serde_json::Error },
19
20    #[snafu(display("no schemas matched the query"))]
21    NoMatches,
22
23    #[snafu(display("multiple schemas matched the query ({len})"))]
24    MultipleMatches { len: usize },
25
26    #[snafu(display("found matching attribute but was not a flag"))]
27    AttributeNotFlag,
28
29    #[snafu(display(
30        "found matching attribute but expected single value; multiple values present"
31    ))]
32    AttributeMultipleValues,
33}
34
35pub struct SchemaQuerier {
36    schema: RootSchema,
37}
38
39impl SchemaQuerier {
40    /// Creates a `SchemaQuerier` based on the schema file located at `schema_path`.
41    ///
42    /// # Errors
43    ///
44    /// If no file exists at the given schema path, or there is an I/O error during loading the file
45    /// (permissions, etc), then an error variant will be returned.
46    ///
47    /// If the file does not contain valid JSON, or cannot be deserialized as a schema, then an
48    /// error variant will be returned.
49    pub fn from_schema<P: AsRef<Path>>(schema_path: P) -> Result<Self, QueryError> {
50        let reader = File::open(schema_path).map(BufReader::new)?;
51        let schema = serde_json::from_reader(reader)?;
52
53        Ok(Self { schema })
54    }
55
56    pub fn query(&self) -> SchemaQueryBuilder<'_> {
57        SchemaQueryBuilder::from_schema(&self.schema)
58    }
59}
60
61/// A query builder for querying against a root schema.
62///
63/// All constraints are applied in a boolean AND fashion.
64pub struct SchemaQueryBuilder<'a> {
65    schema: &'a RootSchema,
66    attributes: Vec<CustomAttribute>,
67}
68
69impl<'a> SchemaQueryBuilder<'a> {
70    fn from_schema(schema: &'a RootSchema) -> Self {
71        Self {
72            schema,
73            attributes: Vec::new(),
74        }
75    }
76
77    /// Adds a constraint on the given custom attribute key/value.
78    ///
79    /// Can be used multiple times to match schemas against multiple attributes.
80    ///
81    /// Custom attributes are strongly matched: a flag attribute can only match a flag attribute,
82    /// not a key/value attribute, and vice versa. For key/value attributes where the attribute in
83    /// the schema itself has multiple values, the schema is considered a match so long as it
84    /// contains the value specified in the query.
85    pub fn with_custom_attribute_kv<K, V>(mut self, key: K, value: V) -> Self
86    where
87        K: Into<String>,
88        V: Into<Value>,
89    {
90        self.attributes.push(CustomAttribute::KeyValue {
91            key: key.into(),
92            value: value.into(),
93        });
94        self
95    }
96
97    /// Executes the query, returning all matching schemas.
98    pub fn run(self) -> Vec<SimpleSchema<'a>> {
99        let mut matches = Vec::new();
100
101        // Search through all defined schemas.
102        'schema: for schema_definition in self.schema.definitions.values() {
103            match schema_definition {
104                // We don't match against boolean schemas because there's nothing to match against.
105                Schema::Bool(_) => continue,
106                Schema::Object(schema_object) => {
107                    // If we have custom attribute matches defined, but the schema has no metadata,
108                    // it's not possible for it to match, so just bail out early.
109                    let has_attribute_matchers = !self.attributes.is_empty();
110                    let schema_metadata = schema_object.extensions.get(constants::METADATA);
111                    if has_attribute_matchers && schema_metadata.is_none() {
112                        continue 'schema;
113                    }
114
115                    if let Some(Value::Object(schema_attributes)) = schema_metadata {
116                        for self_attribute in &self.attributes {
117                            let attr_matched = match self_attribute {
118                                CustomAttribute::Flag(key) => schema_attributes
119                                    .get(key)
120                                    .is_some_and(|value| matches!(value, Value::Bool(true))),
121                                CustomAttribute::KeyValue {
122                                    key,
123                                    value: attr_value,
124                                } => {
125                                    schema_attributes.get(key).is_some_and(|value| match value {
126                                        // Check string values directly.
127                                        Value::String(schema_attr_value) => {
128                                            schema_attr_value == attr_value
129                                        }
130                                        // For arrays, try and convert each item to a string, and
131                                        // for the values that are strings, see if they match.
132                                        Value::Array(schema_attr_values) => {
133                                            schema_attr_values.iter().any(|value| {
134                                                value.as_str().is_some_and(|s| s == attr_value)
135                                            })
136                                        }
137                                        _ => false,
138                                    })
139                                }
140                            };
141
142                            if !attr_matched {
143                                continue 'schema;
144                            }
145                        }
146                    }
147
148                    matches.push(schema_object.into());
149                }
150            }
151        }
152
153        matches
154    }
155
156    /// Executes the query, returning a single matching schema.
157    ///
158    /// # Errors
159    ///
160    /// If no schemas match, or more than one schema matches, then an error variant will be
161    /// returned.
162    pub fn run_single(self) -> Result<SimpleSchema<'a>, QueryError> {
163        let mut matches = self.run();
164        match matches.len() {
165            0 => Err(QueryError::NoMatches),
166            1 => Ok(matches.remove(0)),
167            len => Err(QueryError::MultipleMatches { len }),
168        }
169    }
170}
171
172pub enum OneOrMany<T> {
173    One(T),
174    Many(Vec<T>),
175}
176
177pub enum SchemaType<'a> {
178    /// A set of subschemas in which all must match.
179    ///
180    /// Referred to as an `allOf` schema in JSON Schema.
181    ///
182    /// For a given input, the input is only valid if it is valid against all specified subschemas.
183    AllOf(Vec<SimpleSchema<'a>>),
184
185    /// A set of subschemas in which only one must match.
186    ///
187    /// Referred to as a `oneOf` schema in JSON Schema.
188    ///
189    /// For a given input, the input is only valid if it is valid against exactly one of the
190    /// specified subschemas.
191    OneOf(Vec<SimpleSchema<'a>>),
192
193    /// A set of subschemas in which at least one must match.
194    ///
195    /// Referred to as a `anyOf` schema in JSON Schema.
196    ///
197    /// For a given input, the input is only valid if it is valid against at least one of the
198    /// specified subschemas.
199    AnyOf(Vec<SimpleSchema<'a>>),
200
201    /// A schema that matches a well-known, constant value.
202    ///
203    /// Referred to by the `const` field in JSON Schema.
204    ///
205    /// For a given input, the input is only valid if it matches the value specified by `const`
206    /// exactly. The value can be any valid JSON value.
207    Constant(&'a Value),
208
209    /// A schema that matches one of many well-known, constant values.
210    ///
211    /// Referred to by the `enum` field in JSON Schema.
212    ///
213    /// For a given input, the input is only valid if it matches one of the values specified by
214    /// `enum` exactly. The values can be any valid JSON value.
215    Enum(&'a Vec<Value>),
216
217    /// A typed schema that matches a JSON data type.
218    ///
219    /// Referred to by the `type` field in JSON Schema.
220    ///
221    /// For a given input, the input is only valid if it is the same type as one of the types
222    /// specified by `type`. A schema can allow multiple data types.
223    Typed(OneOrMany<InstanceType>),
224}
225
226pub trait QueryableSchema {
227    fn schema_type(&self) -> SchemaType;
228    fn description(&self) -> Option<&str>;
229    fn title(&self) -> Option<&str>;
230    fn get_attributes(&self, key: &str) -> Option<OneOrMany<CustomAttribute>>;
231    fn get_attribute(&self, key: &str) -> Result<Option<CustomAttribute>, QueryError>;
232    fn has_flag_attribute(&self, key: &str) -> Result<bool, QueryError>;
233}
234
235impl<T> QueryableSchema for &T
236where
237    T: QueryableSchema,
238{
239    fn schema_type(&self) -> SchemaType {
240        (*self).schema_type()
241    }
242
243    fn description(&self) -> Option<&str> {
244        (*self).description()
245    }
246
247    fn title(&self) -> Option<&str> {
248        (*self).title()
249    }
250
251    fn get_attributes(&self, key: &str) -> Option<OneOrMany<CustomAttribute>> {
252        (*self).get_attributes(key)
253    }
254
255    fn get_attribute(&self, key: &str) -> Result<Option<CustomAttribute>, QueryError> {
256        (*self).get_attribute(key)
257    }
258
259    fn has_flag_attribute(&self, key: &str) -> Result<bool, QueryError> {
260        (*self).has_flag_attribute(key)
261    }
262}
263
264impl QueryableSchema for &SchemaObject {
265    fn schema_type(&self) -> SchemaType {
266        // TODO: Technically speaking, it is allowed to use the "X of" schema types in conjunction
267        // with other schema types i.e. `allOf` in conjunction with specifying a `type`.
268        //
269        // Right now, the configuration schema codegen should not actually be emitting anything like
270        // this, so our logic below is written against what we generate, not against what is
271        // technically possible. This _may_ need to change in the future if we end up using any "X
272        // of" schema composition mechanisms for richer validation (i.e. sticking special validation
273        // logic in various subschemas under `allOf`, while defining the main data schema via
274        // `type`, etc.)
275        if let Some(subschemas) = self.subschemas.as_ref() {
276            // Of all the possible "subschema" validation mechanism, we only support `allOf` and
277            // `oneOf`, based on what the configuration schema codegen will spit out.
278            if let Some(all_of) = subschemas.all_of.as_ref() {
279                return SchemaType::AllOf(all_of.iter().map(schema_to_simple_schema).collect());
280            } else if let Some(one_of) = subschemas.one_of.as_ref() {
281                return SchemaType::OneOf(one_of.iter().map(schema_to_simple_schema).collect());
282            } else if let Some(any_of) = subschemas.any_of.as_ref() {
283                return SchemaType::AnyOf(any_of.iter().map(schema_to_simple_schema).collect());
284            } else {
285                panic!("Encountered schema with subschema validation that wasn't one of the supported types: allOf, oneOf, anyOf.");
286            }
287        }
288
289        if let Some(instance_types) = self.instance_type.as_ref() {
290            return match instance_types {
291                SingleOrVec::Single(single) => SchemaType::Typed(OneOrMany::One(*single.clone())),
292                SingleOrVec::Vec(many) => SchemaType::Typed(OneOrMany::Many(many.clone())),
293            };
294        }
295
296        if let Some(const_value) = self.const_value.as_ref() {
297            return SchemaType::Constant(const_value);
298        }
299
300        if let Some(enum_values) = self.enum_values.as_ref() {
301            return SchemaType::Enum(enum_values);
302        }
303
304        panic!("Schema type was not able to be detected!");
305    }
306
307    fn description(&self) -> Option<&str> {
308        self.metadata
309            .as_ref()
310            .and_then(|metadata| metadata.description.as_deref())
311    }
312
313    fn title(&self) -> Option<&str> {
314        self.metadata
315            .as_ref()
316            .and_then(|metadata| metadata.title.as_deref())
317    }
318
319    fn get_attributes(&self, key: &str) -> Option<OneOrMany<CustomAttribute>> {
320        self.extensions.get(constants::METADATA)
321            .map(|metadata| match metadata {
322                Value::Object(attributes) => attributes,
323                _ => panic!("Found metadata extension in schema that was not of type 'object'."),
324            })
325            .and_then(|attributes| attributes.get(key))
326            .map(|attribute| match attribute {
327                Value::Bool(b) => match b {
328                    true => OneOrMany::One(CustomAttribute::flag(key)),
329                    false => panic!("Custom attribute flags should never be false."),
330                },
331                Value::String(s) => OneOrMany::One(CustomAttribute::kv(key, s)),
332                Value::Array(values) => {
333                    let mapped = values.iter()
334                        .map(|value| if let Value::String(s) = value {
335                            CustomAttribute::kv(key, s)
336                        } else {
337                            panic!("Custom attribute key/value pair had array of values with a non-string value.")
338                        })
339                        .collect();
340                    OneOrMany::Many(mapped)
341                },
342                _ => panic!("Custom attribute had unexpected non-flag/non-KV value."),
343            })
344    }
345
346    fn get_attribute(&self, key: &str) -> Result<Option<CustomAttribute>, QueryError> {
347        self.get_attributes(key)
348            .map(|attrs| match attrs {
349                OneOrMany::One(attr) => Ok(attr),
350                OneOrMany::Many(_) => Err(QueryError::AttributeMultipleValues),
351            })
352            .transpose()
353    }
354
355    fn has_flag_attribute(&self, key: &str) -> Result<bool, QueryError> {
356        self.get_attribute(key)
357            .and_then(|maybe_attr| match maybe_attr {
358                None => Ok(false),
359                Some(attr) => {
360                    if attr.is_flag() {
361                        Ok(true)
362                    } else {
363                        Err(QueryError::AttributeNotFlag)
364                    }
365                }
366            })
367    }
368}
369
370pub struct SimpleSchema<'a> {
371    schema: &'a SchemaObject,
372}
373
374impl<'a> SimpleSchema<'a> {
375    pub fn into_inner(self) -> &'a SchemaObject {
376        self.schema
377    }
378}
379
380impl<'a> From<&'a SchemaObject> for SimpleSchema<'a> {
381    fn from(schema: &'a SchemaObject) -> Self {
382        Self { schema }
383    }
384}
385
386impl QueryableSchema for SimpleSchema<'_> {
387    fn schema_type(&self) -> SchemaType {
388        self.schema.schema_type()
389    }
390
391    fn description(&self) -> Option<&str> {
392        self.schema.description()
393    }
394
395    fn title(&self) -> Option<&str> {
396        self.schema.title()
397    }
398
399    fn get_attributes(&self, key: &str) -> Option<OneOrMany<CustomAttribute>> {
400        self.schema.get_attributes(key)
401    }
402
403    fn get_attribute(&self, key: &str) -> Result<Option<CustomAttribute>, QueryError> {
404        self.schema.get_attribute(key)
405    }
406
407    fn has_flag_attribute(&self, key: &str) -> Result<bool, QueryError> {
408        self.schema.has_flag_attribute(key)
409    }
410}
411
412fn schema_to_simple_schema(schema: &Schema) -> SimpleSchema<'_> {
413    static TRUE_SCHEMA_OBJECT: OnceLock<SchemaObject> = OnceLock::new();
414    static FALSE_SCHEMA_OBJECT: OnceLock<SchemaObject> = OnceLock::new();
415
416    let schema_object = match schema {
417        Schema::Bool(bool) => {
418            if *bool {
419                TRUE_SCHEMA_OBJECT.get_or_init(|| Schema::Bool(true).into_object())
420            } else {
421                FALSE_SCHEMA_OBJECT.get_or_init(|| Schema::Bool(false).into_object())
422            }
423        }
424        Schema::Object(object) => object,
425    };
426
427    SimpleSchema {
428        schema: schema_object,
429    }
430}