1use std::collections::HashSet;
2
3use indexmap::IndexMap;
4use vector_lib::config::OutputId;
5
6use super::{ComponentKey, Config, EnrichmentTableOuter};
7
8#[derive(Debug)]
9pub struct ConfigDiff {
10 pub sources: Difference,
11 pub transforms: Difference,
12 pub sinks: Difference,
13 pub enrichment_tables: Difference,
16 pub components_to_reload: HashSet<ComponentKey>,
17}
18
19impl ConfigDiff {
20 pub fn initial(initial: &Config) -> Self {
21 Self::new(&Config::default(), initial, HashSet::new())
22 }
23
24 pub fn new(old: &Config, new: &Config, components_to_reload: HashSet<ComponentKey>) -> Self {
25 ConfigDiff {
26 sources: Difference::new(&old.sources, &new.sources, &components_to_reload),
27 transforms: Difference::new(&old.transforms, &new.transforms, &components_to_reload),
28 sinks: Difference::new(&old.sinks, &new.sinks, &components_to_reload),
29 enrichment_tables: Difference::from_enrichment_tables(
30 &old.enrichment_tables,
31 &new.enrichment_tables,
32 ),
33 components_to_reload,
34 }
35 }
36
37 pub const fn flip(mut self) -> Self {
39 self.sources.flip();
40 self.transforms.flip();
41 self.sinks.flip();
42 self.enrichment_tables.flip();
43 self
44 }
45
46 pub fn contains(&self, key: &ComponentKey) -> bool {
48 self.sources.contains(key)
49 || self.transforms.contains(key)
50 || self.sinks.contains(key)
51 || self.enrichment_tables.contains(key)
52 }
53
54 pub fn is_changed(&self, key: &ComponentKey) -> bool {
56 self.sources.is_changed(key)
57 || self.transforms.is_changed(key)
58 || self.sinks.is_changed(key)
59 || self.enrichment_tables.contains(key)
60 }
61
62 pub fn is_removed(&self, key: &ComponentKey) -> bool {
64 self.sources.is_removed(key)
65 || self.transforms.is_removed(key)
66 || self.sinks.is_removed(key)
67 || self.enrichment_tables.contains(key)
68 }
69}
70
71#[derive(Debug)]
72pub struct Difference {
73 pub to_remove: HashSet<ComponentKey>,
74 pub to_change: HashSet<ComponentKey>,
75 pub to_add: HashSet<ComponentKey>,
76}
77
78impl Difference {
79 fn new<C>(
80 old: &IndexMap<ComponentKey, C>,
81 new: &IndexMap<ComponentKey, C>,
82 need_change: &HashSet<ComponentKey>,
83 ) -> Self
84 where
85 C: serde::Serialize + serde::Deserialize<'static>,
86 {
87 let old_names = old.keys().cloned().collect::<HashSet<_>>();
88 let new_names = new.keys().cloned().collect::<HashSet<_>>();
89
90 let to_change = old_names
91 .intersection(&new_names)
92 .filter(|&n| {
93 let old_value = serde_json::to_value(&old[n]).unwrap();
100 let new_value = serde_json::to_value(&new[n]).unwrap();
101 old_value != new_value || need_change.contains(n)
102 })
103 .cloned()
104 .collect::<HashSet<_>>();
105
106 let to_remove = &old_names - &new_names;
107 let to_add = &new_names - &old_names;
108
109 Self {
110 to_remove,
111 to_change,
112 to_add,
113 }
114 }
115
116 fn from_enrichment_tables(
117 old: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
118 new: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
119 ) -> Self {
120 let old_table_keys = extract_table_component_keys(old);
121 let new_table_keys = extract_table_component_keys(new);
122
123 let to_change = old_table_keys
124 .intersection(&new_table_keys)
125 .filter(|(table_key, _derived_component_key)| {
126 let old_value = serde_json::to_value(&old[*table_key]).unwrap();
133 let new_value = serde_json::to_value(&new[*table_key]).unwrap();
134 old_value != new_value
135 })
136 .cloned()
137 .map(|(_table_key, derived_component_key)| derived_component_key)
138 .collect::<HashSet<_>>();
139
140 let old_component_keys = old_table_keys
142 .into_iter()
143 .map(|(_table_key, component_key)| component_key)
144 .collect::<HashSet<_>>();
145 let new_component_keys = new_table_keys
146 .into_iter()
147 .map(|(_table_key, component_key)| component_key)
148 .collect::<HashSet<_>>();
149
150 let to_remove = &old_component_keys - &new_component_keys;
151 let to_add = &new_component_keys - &old_component_keys;
152
153 Self {
154 to_remove,
155 to_change,
156 to_add,
157 }
158 }
159
160 pub fn any_changed_or_added(&self) -> bool {
162 !(self.to_change.is_empty() && self.to_add.is_empty())
163 }
164
165 pub fn any_changed_or_removed(&self) -> bool {
167 !(self.to_change.is_empty() && self.to_remove.is_empty())
168 }
169
170 pub fn contains(&self, id: &ComponentKey) -> bool {
172 self.to_add.contains(id) || self.to_change.contains(id) || self.to_remove.contains(id)
173 }
174
175 pub fn contains_new(&self, id: &ComponentKey) -> bool {
177 self.to_add.contains(id) || self.to_change.contains(id)
178 }
179
180 pub fn is_changed(&self, key: &ComponentKey) -> bool {
182 self.to_change.contains(key)
183 }
184
185 pub fn is_added(&self, id: &ComponentKey) -> bool {
187 self.to_add.contains(id)
188 }
189
190 pub fn is_removed(&self, key: &ComponentKey) -> bool {
192 self.to_remove.contains(key)
193 }
194
195 const fn flip(&mut self) {
196 std::mem::swap(&mut self.to_remove, &mut self.to_add);
197 }
198
199 pub fn changed_and_added(&self) -> impl Iterator<Item = &ComponentKey> {
200 self.to_change.iter().chain(self.to_add.iter())
201 }
202
203 pub fn removed_and_changed(&self) -> impl Iterator<Item = &ComponentKey> {
204 self.to_change.iter().chain(self.to_remove.iter())
205 }
206}
207
208fn extract_table_component_keys(
210 tables: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
211) -> HashSet<(&ComponentKey, ComponentKey)> {
212 tables
213 .iter()
214 .flat_map(|(table_key, table)| {
215 vec![
216 table
217 .as_source(table_key)
218 .map(|(component_key, _)| (table_key, component_key)),
219 table
220 .as_sink(table_key)
221 .map(|(component_key, _)| (table_key, component_key)),
222 ]
223 })
224 .flatten()
225 .collect()
226}
227
228#[cfg(test)]
229mod tests {
230 use crate::config::ConfigBuilder;
231 use indoc::indoc;
232
233 use super::*;
234
235 #[test]
236 fn diff_enrichment_tables_uses_correct_keys() {
237 let old_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
238 enrichment_tables:
239 memory_table:
240 type: "memory"
241 ttl: 10
242 inputs: []
243 source_config:
244 source_key: "memory_table_source"
245 export_expired_items: true
246 export_interval: 50
247
248 memory_table_unchanged:
249 type: "memory"
250 ttl: 10
251 inputs: []
252
253 memory_table_old:
254 type: "memory"
255 ttl: 10
256 inputs: []
257
258 sources:
259 test:
260 type: "test_basic"
261
262 sinks:
263 test_sink:
264 type: "test_basic"
265 inputs: ["test"]
266 "#})
267 .unwrap()
268 .build()
269 .unwrap();
270
271 let new_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
272 enrichment_tables:
273 memory_table:
274 type: "memory"
275 ttl: 20
276 inputs: []
277 source_config:
278 source_key: "memory_table_source"
279 export_expired_items: true
280 export_interval: 50
281
282 memory_table_unchanged:
283 type: "memory"
284 ttl: 10
285 inputs: []
286
287 memory_table_new:
288 type: "memory"
289 ttl: 1000
290 inputs: []
291
292 sources:
293 test:
294 type: "test_basic"
295
296 sinks:
297 test_sink:
298 type: "test_basic"
299 inputs: ["test"]
300 "#})
301 .unwrap()
302 .build()
303 .unwrap();
304
305 let diff = Difference::from_enrichment_tables(
306 &old_config.enrichment_tables,
307 &new_config.enrichment_tables,
308 );
309
310 assert_eq!(diff.to_add, HashSet::from_iter(["memory_table_new".into()]));
311 assert_eq!(
312 diff.to_remove,
313 HashSet::from_iter(["memory_table_old".into()])
314 );
315 assert_eq!(
316 diff.to_change,
317 HashSet::from_iter(["memory_table".into(), "memory_table_source".into()])
318 );
319 }
320}