vector_config/schema/parser/
query.rs1use std::{fs::File, io::BufReader, path::Path, sync::OnceLock};
2
3use serde_json::Value;
4use snafu::Snafu;
5use vector_config_common::{
6 attributes::CustomAttribute,
7 constants,
8 schema::{InstanceType, RootSchema, Schema, SchemaObject, SingleOrVec},
9};
10
11#[derive(Debug, Snafu)]
12#[snafu(module, context(suffix(false)))]
13pub enum QueryError {
14 #[snafu(display("I/O error during opening schema: {source}"), context(false))]
15 Io { source: std::io::Error },
16
17 #[snafu(display("deserialization failed: {source}"), context(false))]
18 Deserialization { source: serde_json::Error },
19
20 #[snafu(display("no schemas matched the query"))]
21 NoMatches,
22
23 #[snafu(display("multiple schemas matched the query ({len})"))]
24 MultipleMatches { len: usize },
25
26 #[snafu(display("found matching attribute but was not a flag"))]
27 AttributeNotFlag,
28
29 #[snafu(display(
30 "found matching attribute but expected single value; multiple values present"
31 ))]
32 AttributeMultipleValues,
33}
34
35pub struct SchemaQuerier {
36 schema: RootSchema,
37}
38
39impl SchemaQuerier {
40 pub fn from_schema<P: AsRef<Path>>(schema_path: P) -> Result<Self, QueryError> {
50 let reader = File::open(schema_path).map(BufReader::new)?;
51 let schema = serde_json::from_reader(reader)?;
52
53 Ok(Self { schema })
54 }
55
56 pub fn query(&self) -> SchemaQueryBuilder<'_> {
57 SchemaQueryBuilder::from_schema(&self.schema)
58 }
59}
60
61pub struct SchemaQueryBuilder<'a> {
65 schema: &'a RootSchema,
66 attributes: Vec<CustomAttribute>,
67}
68
69impl<'a> SchemaQueryBuilder<'a> {
70 fn from_schema(schema: &'a RootSchema) -> Self {
71 Self {
72 schema,
73 attributes: Vec::new(),
74 }
75 }
76
77 pub fn with_custom_attribute_kv<K, V>(mut self, key: K, value: V) -> Self
86 where
87 K: Into<String>,
88 V: Into<Value>,
89 {
90 self.attributes.push(CustomAttribute::KeyValue {
91 key: key.into(),
92 value: value.into(),
93 });
94 self
95 }
96
97 pub fn run(self) -> Vec<SimpleSchema<'a>> {
99 let mut matches = Vec::new();
100
101 'schema: for schema_definition in self.schema.definitions.values() {
103 match schema_definition {
104 Schema::Bool(_) => continue,
106 Schema::Object(schema_object) => {
107 let has_attribute_matchers = !self.attributes.is_empty();
110 let schema_metadata = schema_object.extensions.get(constants::METADATA);
111 if has_attribute_matchers && schema_metadata.is_none() {
112 continue 'schema;
113 }
114
115 if let Some(Value::Object(schema_attributes)) = schema_metadata {
116 for self_attribute in &self.attributes {
117 let attr_matched = match self_attribute {
118 CustomAttribute::Flag(key) => schema_attributes
119 .get(key)
120 .is_some_and(|value| matches!(value, Value::Bool(true))),
121 CustomAttribute::KeyValue {
122 key,
123 value: attr_value,
124 } => {
125 schema_attributes.get(key).is_some_and(|value| match value {
126 Value::String(schema_attr_value) => {
128 schema_attr_value == attr_value
129 }
130 Value::Array(schema_attr_values) => {
133 schema_attr_values.iter().any(|value| {
134 value.as_str().is_some_and(|s| s == attr_value)
135 })
136 }
137 _ => false,
138 })
139 }
140 };
141
142 if !attr_matched {
143 continue 'schema;
144 }
145 }
146 }
147
148 matches.push(schema_object.into());
149 }
150 }
151 }
152
153 matches
154 }
155
156 pub fn run_single(self) -> Result<SimpleSchema<'a>, QueryError> {
163 let mut matches = self.run();
164 match matches.len() {
165 0 => Err(QueryError::NoMatches),
166 1 => Ok(matches.remove(0)),
167 len => Err(QueryError::MultipleMatches { len }),
168 }
169 }
170}
171
172pub enum OneOrMany<T> {
173 One(T),
174 Many(Vec<T>),
175}
176
177pub enum SchemaType<'a> {
178 AllOf(Vec<SimpleSchema<'a>>),
184
185 OneOf(Vec<SimpleSchema<'a>>),
192
193 AnyOf(Vec<SimpleSchema<'a>>),
200
201 Constant(&'a Value),
208
209 Enum(&'a Vec<Value>),
216
217 Typed(OneOrMany<InstanceType>),
224}
225
226pub trait QueryableSchema {
227 fn schema_type(&self) -> SchemaType;
228 fn description(&self) -> Option<&str>;
229 fn title(&self) -> Option<&str>;
230 fn get_attributes(&self, key: &str) -> Option<OneOrMany<CustomAttribute>>;
231 fn get_attribute(&self, key: &str) -> Result<Option<CustomAttribute>, QueryError>;
232 fn has_flag_attribute(&self, key: &str) -> Result<bool, QueryError>;
233}
234
235impl<T> QueryableSchema for &T
236where
237 T: QueryableSchema,
238{
239 fn schema_type(&self) -> SchemaType {
240 (*self).schema_type()
241 }
242
243 fn description(&self) -> Option<&str> {
244 (*self).description()
245 }
246
247 fn title(&self) -> Option<&str> {
248 (*self).title()
249 }
250
251 fn get_attributes(&self, key: &str) -> Option<OneOrMany<CustomAttribute>> {
252 (*self).get_attributes(key)
253 }
254
255 fn get_attribute(&self, key: &str) -> Result<Option<CustomAttribute>, QueryError> {
256 (*self).get_attribute(key)
257 }
258
259 fn has_flag_attribute(&self, key: &str) -> Result<bool, QueryError> {
260 (*self).has_flag_attribute(key)
261 }
262}
263
264impl QueryableSchema for &SchemaObject {
265 fn schema_type(&self) -> SchemaType {
266 if let Some(subschemas) = self.subschemas.as_ref() {
276 if let Some(all_of) = subschemas.all_of.as_ref() {
279 return SchemaType::AllOf(all_of.iter().map(schema_to_simple_schema).collect());
280 } else if let Some(one_of) = subschemas.one_of.as_ref() {
281 return SchemaType::OneOf(one_of.iter().map(schema_to_simple_schema).collect());
282 } else if let Some(any_of) = subschemas.any_of.as_ref() {
283 return SchemaType::AnyOf(any_of.iter().map(schema_to_simple_schema).collect());
284 } else {
285 panic!("Encountered schema with subschema validation that wasn't one of the supported types: allOf, oneOf, anyOf.");
286 }
287 }
288
289 if let Some(instance_types) = self.instance_type.as_ref() {
290 return match instance_types {
291 SingleOrVec::Single(single) => SchemaType::Typed(OneOrMany::One(*single.clone())),
292 SingleOrVec::Vec(many) => SchemaType::Typed(OneOrMany::Many(many.clone())),
293 };
294 }
295
296 if let Some(const_value) = self.const_value.as_ref() {
297 return SchemaType::Constant(const_value);
298 }
299
300 if let Some(enum_values) = self.enum_values.as_ref() {
301 return SchemaType::Enum(enum_values);
302 }
303
304 panic!("Schema type was not able to be detected!");
305 }
306
307 fn description(&self) -> Option<&str> {
308 self.metadata
309 .as_ref()
310 .and_then(|metadata| metadata.description.as_deref())
311 }
312
313 fn title(&self) -> Option<&str> {
314 self.metadata
315 .as_ref()
316 .and_then(|metadata| metadata.title.as_deref())
317 }
318
319 fn get_attributes(&self, key: &str) -> Option<OneOrMany<CustomAttribute>> {
320 self.extensions.get(constants::METADATA)
321 .map(|metadata| match metadata {
322 Value::Object(attributes) => attributes,
323 _ => panic!("Found metadata extension in schema that was not of type 'object'."),
324 })
325 .and_then(|attributes| attributes.get(key))
326 .map(|attribute| match attribute {
327 Value::Bool(b) => match b {
328 true => OneOrMany::One(CustomAttribute::flag(key)),
329 false => panic!("Custom attribute flags should never be false."),
330 },
331 Value::String(s) => OneOrMany::One(CustomAttribute::kv(key, s)),
332 Value::Array(values) => {
333 let mapped = values.iter()
334 .map(|value| if let Value::String(s) = value {
335 CustomAttribute::kv(key, s)
336 } else {
337 panic!("Custom attribute key/value pair had array of values with a non-string value.")
338 })
339 .collect();
340 OneOrMany::Many(mapped)
341 },
342 _ => panic!("Custom attribute had unexpected non-flag/non-KV value."),
343 })
344 }
345
346 fn get_attribute(&self, key: &str) -> Result<Option<CustomAttribute>, QueryError> {
347 self.get_attributes(key)
348 .map(|attrs| match attrs {
349 OneOrMany::One(attr) => Ok(attr),
350 OneOrMany::Many(_) => Err(QueryError::AttributeMultipleValues),
351 })
352 .transpose()
353 }
354
355 fn has_flag_attribute(&self, key: &str) -> Result<bool, QueryError> {
356 self.get_attribute(key)
357 .and_then(|maybe_attr| match maybe_attr {
358 None => Ok(false),
359 Some(attr) => {
360 if attr.is_flag() {
361 Ok(true)
362 } else {
363 Err(QueryError::AttributeNotFlag)
364 }
365 }
366 })
367 }
368}
369
370pub struct SimpleSchema<'a> {
371 schema: &'a SchemaObject,
372}
373
374impl<'a> SimpleSchema<'a> {
375 pub fn into_inner(self) -> &'a SchemaObject {
376 self.schema
377 }
378}
379
380impl<'a> From<&'a SchemaObject> for SimpleSchema<'a> {
381 fn from(schema: &'a SchemaObject) -> Self {
382 Self { schema }
383 }
384}
385
386impl QueryableSchema for SimpleSchema<'_> {
387 fn schema_type(&self) -> SchemaType {
388 self.schema.schema_type()
389 }
390
391 fn description(&self) -> Option<&str> {
392 self.schema.description()
393 }
394
395 fn title(&self) -> Option<&str> {
396 self.schema.title()
397 }
398
399 fn get_attributes(&self, key: &str) -> Option<OneOrMany<CustomAttribute>> {
400 self.schema.get_attributes(key)
401 }
402
403 fn get_attribute(&self, key: &str) -> Result<Option<CustomAttribute>, QueryError> {
404 self.schema.get_attribute(key)
405 }
406
407 fn has_flag_attribute(&self, key: &str) -> Result<bool, QueryError> {
408 self.schema.has_flag_attribute(key)
409 }
410}
411
412fn schema_to_simple_schema(schema: &Schema) -> SimpleSchema<'_> {
413 static TRUE_SCHEMA_OBJECT: OnceLock<SchemaObject> = OnceLock::new();
414 static FALSE_SCHEMA_OBJECT: OnceLock<SchemaObject> = OnceLock::new();
415
416 let schema_object = match schema {
417 Schema::Bool(bool) => {
418 if *bool {
419 TRUE_SCHEMA_OBJECT.get_or_init(|| Schema::Bool(true).into_object())
420 } else {
421 FALSE_SCHEMA_OBJECT.get_or_init(|| Schema::Bool(false).into_object())
422 }
423 }
424 Schema::Object(object) => object,
425 };
426
427 SimpleSchema {
428 schema: schema_object,
429 }
430}