vector/kubernetes/
reflector.rs

1//! Intercept [`watcher::Event`]'s.
2
3use std::{hash::Hash, sync::Arc, time::Duration};
4
5use futures::StreamExt;
6use futures_util::Stream;
7use kube::{
8    Resource,
9    runtime::{reflector::store, watcher},
10};
11use tokio::pin;
12use tokio_util::time::DelayQueue;
13
14use super::meta_cache::{MetaCache, MetaDescribe};
15
16/// Handles events from a [`kube::runtime::watcher()`] to delay the application of Deletion events.
17pub async fn custom_reflector<K, W>(
18    mut store: store::Writer<K>,
19    mut meta_cache: MetaCache,
20    stream: W,
21    delay_deletion: Duration,
22) where
23    K: Resource + Clone + std::fmt::Debug,
24    K::DynamicType: Eq + Hash + Clone,
25    W: Stream<Item = watcher::Result<watcher::Event<K>>>,
26{
27    pin!(stream);
28    let mut delay_queue = DelayQueue::default();
29    let mut init_buffer_meta = Vec::new();
30    loop {
31        tokio::select! {
32            result = stream.next() => {
33                match result {
34                    Some(Ok(event)) => {
35                        match event {
36                            // Immediately reconcile `Apply` event
37                            watcher::Event::Apply(ref obj) => {
38                                trace!(message = "Processing Apply event.", event_type = std::any::type_name::<K>(), event = ?event);
39                                store.apply_watcher_event(&event);
40                                let meta_descr = MetaDescribe::from_meta(obj.meta());
41                                meta_cache.store(meta_descr);
42                            }
43                            // Delay reconciling any `Delete` events
44                            watcher::Event::Delete(ref obj) => {
45                                trace!(message = "Delaying processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
46                                delay_queue.insert(event.to_owned(), delay_deletion);
47                                let meta_descr = MetaDescribe::from_meta(obj.meta());
48                                meta_cache.delete(&meta_descr);
49                            }
50                            // Clear all delayed events on `Init` event
51                            watcher::Event::Init => {
52                                trace!(message = "Processing Init event.", event_type = std::any::type_name::<K>(), event = ?event);
53                                delay_queue.clear();
54                                store.apply_watcher_event(&event);
55                                meta_cache.clear();
56                                init_buffer_meta.clear();
57                            }
58                            // Immediately reconcile `InitApply` event (but buffer the obj ref so we can handle implied deletions on InitDone)
59                            watcher::Event::InitApply(ref obj) => {
60                                trace!(message = "Processing InitApply event.", event_type = std::any::type_name::<K>(), event = ?event);
61                                store.apply_watcher_event(&event);
62                                let meta_descr = MetaDescribe::from_meta(obj.meta());
63                                meta_cache.store(meta_descr.clone());
64                                init_buffer_meta.push(meta_descr.clone());
65                            }
66                            // Reconcile `InitApply` events and implied deletions
67                            watcher::Event::InitDone => {
68                                trace!(message = "Processing InitDone event.", event_type = std::any::type_name::<K>(), event = ?event);
69                                store.apply_watcher_event(&event);
70
71
72                                store.as_reader().state().into_iter()
73                                // delay deleting objs that were added before but not during InitApply
74                                .for_each(|obj| {
75                                    if let Some(inner) = Arc::into_inner(obj) {
76                                        let meta_descr = MetaDescribe::from_meta(inner.meta());
77                                        if !init_buffer_meta.contains(&meta_descr) {
78                                            let implied_deletion_event = watcher::Event::Delete(inner);
79                                            trace!(message = "Delaying processing implied deletion.", event_type = std::any::type_name::<K>(), event = ?implied_deletion_event);
80                                            delay_queue.insert(implied_deletion_event, delay_deletion);
81                                            meta_cache.delete(&meta_descr);
82                                        }
83                                    }
84                                });
85
86                                init_buffer_meta.clear();
87                            }
88                        }
89                    },
90                    Some(Err(error)) => {
91                        warn!(message = "Watcher Stream received an error. Retrying.", ?error);
92                    },
93                    // The watcher stream should never yield `None`
94                    // https://docs.rs/kube-runtime/0.71.0/src/kube_runtime/watcher.rs.html#234-237
95                    None => {
96                        unreachable!("a watcher Stream never ends");
97                    },
98                }
99            }
100            result = delay_queue.next(), if !delay_queue.is_empty() => {
101                match result {
102                    Some(event) => {
103                        let event = event.into_inner();
104                        match event {
105                            watcher::Event::Delete(ref obj) => {
106                                let meta_desc = MetaDescribe::from_meta(obj.meta());
107                                if !meta_cache.contains(&meta_desc) {
108                                    trace!(message = "Processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
109                                    store.apply_watcher_event(&event);
110                                }
111                            },
112                            _ => store.apply_watcher_event(&event),
113                        }
114                    },
115                    // DelayQueue returns None if the queue is exhausted,
116                    // however we disable the DelayQueue branch if there are
117                    // no items in the queue.
118                    None => {
119                        unreachable!("an empty DelayQueue is never polled");
120                    },
121                }
122            }
123        }
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use std::time::Duration;
130
131    use futures::channel::mpsc;
132    use futures_util::SinkExt;
133    use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
134    use kube::runtime::{
135        reflector::{ObjectRef, store},
136        watcher,
137    };
138
139    use super::{MetaCache, custom_reflector};
140
141    #[tokio::test]
142    async fn applied_should_add_object() {
143        let store_w = store::Writer::default();
144        let store = store_w.as_reader();
145        let cm = ConfigMap {
146            metadata: ObjectMeta {
147                name: Some("a".to_string()),
148                ..ObjectMeta::default()
149            },
150            ..ConfigMap::default()
151        };
152        let (mut tx, rx) = mpsc::channel::<_>(5);
153        tx.send(Ok(watcher::Event::Apply(cm.clone())))
154            .await
155            .unwrap();
156        let meta_cache = MetaCache::new();
157        tokio::spawn(custom_reflector(
158            store_w,
159            meta_cache,
160            rx,
161            Duration::from_secs(1),
162        ));
163        tokio::time::sleep(Duration::from_secs(1)).await;
164        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
165    }
166
167    #[tokio::test]
168    async fn deleted_should_remove_object_after_delay() {
169        let store_w = store::Writer::default();
170        let store = store_w.as_reader();
171        let cm = ConfigMap {
172            metadata: ObjectMeta {
173                name: Some("a".to_string()),
174                ..ObjectMeta::default()
175            },
176            ..ConfigMap::default()
177        };
178        let (mut tx, rx) = mpsc::channel::<_>(5);
179        tx.send(Ok(watcher::Event::Apply(cm.clone())))
180            .await
181            .unwrap();
182        tx.send(Ok(watcher::Event::Delete(cm.clone())))
183            .await
184            .unwrap();
185        let meta_cache = MetaCache::new();
186        tokio::spawn(custom_reflector(
187            store_w,
188            meta_cache,
189            rx,
190            Duration::from_secs(2),
191        ));
192        // Ensure the Resource is still available after deletion
193        tokio::time::sleep(Duration::from_secs(1)).await;
194        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
195        // Ensure the Resource is removed once the `delay_deletion` has elapsed
196        tokio::time::sleep(Duration::from_secs(5)).await;
197        assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
198    }
199
200    #[tokio::test]
201    async fn deleted_should_not_remove_object_still_in_use() {
202        let store_w = store::Writer::default();
203        let store = store_w.as_reader();
204        let cm = ConfigMap {
205            metadata: ObjectMeta {
206                name: Some("name".to_string()),
207                namespace: Some("namespace".to_string()),
208                ..ObjectMeta::default()
209            },
210            ..ConfigMap::default()
211        };
212        let (mut tx, rx) = mpsc::channel::<_>(5);
213        tx.send(Ok(watcher::Event::Apply(cm.clone())))
214            .await
215            .unwrap();
216        tx.send(Ok(watcher::Event::Delete(cm.clone())))
217            .await
218            .unwrap();
219        tx.send(Ok(watcher::Event::Apply(cm.clone())))
220            .await
221            .unwrap();
222        let meta_cache = MetaCache::new();
223        tokio::spawn(custom_reflector(
224            store_w,
225            meta_cache,
226            rx,
227            Duration::from_secs(2),
228        ));
229        tokio::time::sleep(Duration::from_secs(1)).await;
230        // Ensure the Resource is still available after deletion
231        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
232        tokio::time::sleep(Duration::from_secs(5)).await;
233        // Ensure the Resource is still available after Applied event
234        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
235    }
236}