vector/config/
diff.rs

1use std::collections::HashSet;
2
3use indexmap::IndexMap;
4use vector_lib::config::OutputId;
5
6use super::{ComponentKey, Config, EnrichmentTableOuter};
7
8#[derive(Debug)]
9pub struct ConfigDiff {
10    pub sources: Difference,
11    pub transforms: Difference,
12    pub sinks: Difference,
13    /// This difference does not only contain the actual enrichment_tables keys, but also keys that
14    /// may be used for their source and sink components (if available).
15    pub enrichment_tables: Difference,
16    pub components_to_reload: HashSet<ComponentKey>,
17}
18
19impl ConfigDiff {
20    pub fn initial(initial: &Config) -> Self {
21        Self::new(&Config::default(), initial, HashSet::new())
22    }
23
24    pub fn new(old: &Config, new: &Config, components_to_reload: HashSet<ComponentKey>) -> Self {
25        ConfigDiff {
26            sources: Difference::new(&old.sources, &new.sources, &components_to_reload),
27            transforms: Difference::new(&old.transforms, &new.transforms, &components_to_reload),
28            sinks: Difference::new(&old.sinks, &new.sinks, &components_to_reload),
29            enrichment_tables: Difference::from_enrichment_tables(
30                &old.enrichment_tables,
31                &new.enrichment_tables,
32            ),
33            components_to_reload,
34        }
35    }
36
37    /// Swaps removed with added in Differences.
38    pub const fn flip(mut self) -> Self {
39        self.sources.flip();
40        self.transforms.flip();
41        self.sinks.flip();
42        self.enrichment_tables.flip();
43        self
44    }
45
46    /// Checks whether the given component is present at all.
47    pub fn contains(&self, key: &ComponentKey) -> bool {
48        self.sources.contains(key)
49            || self.transforms.contains(key)
50            || self.sinks.contains(key)
51            || self.enrichment_tables.contains(key)
52    }
53
54    /// Checks whether the given component is changed.
55    pub fn is_changed(&self, key: &ComponentKey) -> bool {
56        self.sources.is_changed(key)
57            || self.transforms.is_changed(key)
58            || self.sinks.is_changed(key)
59            || self.enrichment_tables.contains(key)
60    }
61
62    /// Checks whether the given component is removed.
63    pub fn is_removed(&self, key: &ComponentKey) -> bool {
64        self.sources.is_removed(key)
65            || self.transforms.is_removed(key)
66            || self.sinks.is_removed(key)
67            || self.enrichment_tables.contains(key)
68    }
69}
70
71#[derive(Debug)]
72pub struct Difference {
73    pub to_remove: HashSet<ComponentKey>,
74    pub to_change: HashSet<ComponentKey>,
75    pub to_add: HashSet<ComponentKey>,
76}
77
78impl Difference {
79    fn new<C>(
80        old: &IndexMap<ComponentKey, C>,
81        new: &IndexMap<ComponentKey, C>,
82        need_change: &HashSet<ComponentKey>,
83    ) -> Self
84    where
85        C: serde::Serialize + serde::Deserialize<'static>,
86    {
87        let old_names = old.keys().cloned().collect::<HashSet<_>>();
88        let new_names = new.keys().cloned().collect::<HashSet<_>>();
89
90        let to_change = old_names
91            .intersection(&new_names)
92            .filter(|&n| {
93                // This is a hack around the issue of comparing two
94                // trait objects. Json is used here over toml since
95                // toml does not support serializing `None`
96                // to_value is used specifically (instead of string)
97                // to avoid problems comparing serialized HashMaps,
98                // which can iterate in varied orders.
99                let old_value = serde_json::to_value(&old[n]).unwrap();
100                let new_value = serde_json::to_value(&new[n]).unwrap();
101                old_value != new_value || need_change.contains(n)
102            })
103            .cloned()
104            .collect::<HashSet<_>>();
105
106        let to_remove = &old_names - &new_names;
107        let to_add = &new_names - &old_names;
108
109        Self {
110            to_remove,
111            to_change,
112            to_add,
113        }
114    }
115
116    fn from_enrichment_tables(
117        old: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
118        new: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
119    ) -> Self {
120        let old_table_keys = extract_table_component_keys(old);
121        let new_table_keys = extract_table_component_keys(new);
122
123        let to_change = old_table_keys
124            .intersection(&new_table_keys)
125            .filter(|(table_key, _derived_component_key)| {
126                // This is a hack around the issue of comparing two
127                // trait objects. Json is used here over toml since
128                // toml does not support serializing `None`
129                // to_value is used specifically (instead of string)
130                // to avoid problems comparing serialized HashMaps,
131                // which can iterate in varied orders.
132                let old_value = serde_json::to_value(&old[*table_key]).unwrap();
133                let new_value = serde_json::to_value(&new[*table_key]).unwrap();
134                old_value != new_value
135            })
136            .cloned()
137            .map(|(_table_key, derived_component_key)| derived_component_key)
138            .collect::<HashSet<_>>();
139
140        // Extract only the derived component keys for the final difference calculation
141        let old_component_keys = old_table_keys
142            .into_iter()
143            .map(|(_table_key, component_key)| component_key)
144            .collect::<HashSet<_>>();
145        let new_component_keys = new_table_keys
146            .into_iter()
147            .map(|(_table_key, component_key)| component_key)
148            .collect::<HashSet<_>>();
149
150        let to_remove = &old_component_keys - &new_component_keys;
151        let to_add = &new_component_keys - &old_component_keys;
152
153        Self {
154            to_remove,
155            to_change,
156            to_add,
157        }
158    }
159
160    /// Checks whether or not any components are being changed or added.
161    pub fn any_changed_or_added(&self) -> bool {
162        !(self.to_change.is_empty() && self.to_add.is_empty())
163    }
164
165    /// Checks whether or not any components are being changed or removed.
166    pub fn any_changed_or_removed(&self) -> bool {
167        !(self.to_change.is_empty() && self.to_remove.is_empty())
168    }
169
170    /// Checks whether the given component is present at all.
171    pub fn contains(&self, id: &ComponentKey) -> bool {
172        self.to_add.contains(id) || self.to_change.contains(id) || self.to_remove.contains(id)
173    }
174
175    /// Checks whether the given component is present as a change or addition.
176    pub fn contains_new(&self, id: &ComponentKey) -> bool {
177        self.to_add.contains(id) || self.to_change.contains(id)
178    }
179
180    /// Checks whether or not the given component is changed.
181    pub fn is_changed(&self, key: &ComponentKey) -> bool {
182        self.to_change.contains(key)
183    }
184
185    /// Checks whether the given component is present as an addition.
186    pub fn is_added(&self, id: &ComponentKey) -> bool {
187        self.to_add.contains(id)
188    }
189
190    /// Checks whether or not the given component is removed.
191    pub fn is_removed(&self, key: &ComponentKey) -> bool {
192        self.to_remove.contains(key)
193    }
194
195    const fn flip(&mut self) {
196        std::mem::swap(&mut self.to_remove, &mut self.to_add);
197    }
198
199    pub fn changed_and_added(&self) -> impl Iterator<Item = &ComponentKey> {
200        self.to_change.iter().chain(self.to_add.iter())
201    }
202
203    pub fn removed_and_changed(&self) -> impl Iterator<Item = &ComponentKey> {
204        self.to_change.iter().chain(self.to_remove.iter())
205    }
206}
207
208/// Helper function to extract component keys from enrichment tables.
209fn extract_table_component_keys(
210    tables: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
211) -> HashSet<(&ComponentKey, ComponentKey)> {
212    tables
213        .iter()
214        .flat_map(|(table_key, table)| {
215            vec![
216                table
217                    .as_source(table_key)
218                    .map(|(component_key, _)| (table_key, component_key)),
219                table
220                    .as_sink(table_key)
221                    .map(|(component_key, _)| (table_key, component_key)),
222            ]
223        })
224        .flatten()
225        .collect()
226}
227
228#[cfg(test)]
229mod tests {
230    use crate::config::ConfigBuilder;
231    use indoc::indoc;
232
233    use super::*;
234
235    #[test]
236    fn diff_enrichment_tables_uses_correct_keys() {
237        let old_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
238            enrichment_tables:
239              memory_table:
240                type: "memory"
241                ttl: 10
242                inputs: []
243                source_config:
244                  source_key: "memory_table_source"
245                  export_expired_items: true
246                  export_interval: 50
247
248              memory_table_unchanged:
249                type: "memory"
250                ttl: 10
251                inputs: []
252
253              memory_table_old:
254                type: "memory"
255                ttl: 10
256                inputs: []
257
258            sources:
259              test:
260                type: "test_basic"
261
262            sinks:
263              test_sink:
264                type: "test_basic"
265                inputs: ["test"]
266        "#})
267        .unwrap()
268        .build()
269        .unwrap();
270
271        let new_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
272            enrichment_tables:
273              memory_table:
274                type: "memory"
275                ttl: 20
276                inputs: []
277                source_config:
278                  source_key: "memory_table_source"
279                  export_expired_items: true
280                  export_interval: 50
281
282              memory_table_unchanged:
283                type: "memory"
284                ttl: 10
285                inputs: []
286
287              memory_table_new:
288                type: "memory"
289                ttl: 1000
290                inputs: []
291
292            sources:
293              test:
294                type: "test_basic"
295
296            sinks:
297              test_sink:
298                type: "test_basic"
299                inputs: ["test"]
300        "#})
301        .unwrap()
302        .build()
303        .unwrap();
304
305        let diff = Difference::from_enrichment_tables(
306            &old_config.enrichment_tables,
307            &new_config.enrichment_tables,
308        );
309
310        assert_eq!(diff.to_add, HashSet::from_iter(["memory_table_new".into()]));
311        assert_eq!(
312            diff.to_remove,
313            HashSet::from_iter(["memory_table_old".into()])
314        );
315        assert_eq!(
316            diff.to_change,
317            HashSet::from_iter(["memory_table".into(), "memory_table_source".into()])
318        );
319    }
320}