vector/kubernetes/
reflector.rs

1//! Intercept [`watcher::Event`]'s.
2
3use std::{hash::Hash, sync::Arc, time::Duration};
4
5use futures::StreamExt;
6use futures_util::Stream;
7use kube::{
8    runtime::{reflector::store, watcher},
9    Resource,
10};
11use tokio::pin;
12use tokio_util::time::DelayQueue;
13
14use super::meta_cache::{MetaCache, MetaDescribe};
15
16/// Handles events from a [`kube::runtime::watcher()`] to delay the application of Deletion events.
17pub async fn custom_reflector<K, W>(
18    mut store: store::Writer<K>,
19    mut meta_cache: MetaCache,
20    stream: W,
21    delay_deletion: Duration,
22) where
23    K: Resource + Clone + std::fmt::Debug,
24    K::DynamicType: Eq + Hash + Clone,
25    W: Stream<Item = watcher::Result<watcher::Event<K>>>,
26{
27    pin!(stream);
28    let mut delay_queue = DelayQueue::default();
29    let mut init_buffer_meta = Vec::new();
30    loop {
31        tokio::select! {
32            result = stream.next() => {
33                match result {
34                    Some(Ok(event)) => {
35                        match event {
36                            // Immediately reconcile `Apply` event
37                            watcher::Event::Apply(ref obj) => {
38                                trace!(message = "Processing Apply event.", event_type = std::any::type_name::<K>(), event = ?event);
39                                store.apply_watcher_event(&event);
40                                let meta_descr = MetaDescribe::from_meta(obj.meta());
41                                meta_cache.store(meta_descr);
42                            }
43                            // Delay reconciling any `Delete` events
44                            watcher::Event::Delete(ref obj) => {
45                                trace!(message = "Delaying processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
46                                delay_queue.insert(event.to_owned(), delay_deletion);
47                                let meta_descr = MetaDescribe::from_meta(obj.meta());
48                                meta_cache.delete(&meta_descr);
49                            }
50                            // Clear all delayed events on `Init` event
51                            watcher::Event::Init => {
52                                trace!(message = "Processing Init event.", event_type = std::any::type_name::<K>(), event = ?event);
53                                delay_queue.clear();
54                                store.apply_watcher_event(&event);
55                                meta_cache.clear();
56                                init_buffer_meta.clear();
57                            }
58                            // Immediately reconcile `InitApply` event (but buffer the obj ref so we can handle implied deletions on InitDone)
59                            watcher::Event::InitApply(ref obj) => {
60                                trace!(message = "Processing InitApply event.", event_type = std::any::type_name::<K>(), event = ?event);
61                                store.apply_watcher_event(&event);
62                                let meta_descr = MetaDescribe::from_meta(obj.meta());
63                                meta_cache.store(meta_descr.clone());
64                                init_buffer_meta.push(meta_descr.clone());
65                            }
66                            // Reconcile `InitApply` events and implied deletions
67                            watcher::Event::InitDone => {
68                                trace!(message = "Processing InitDone event.", event_type = std::any::type_name::<K>(), event = ?event);
69                                store.apply_watcher_event(&event);
70
71
72                                store.as_reader().state().into_iter()
73                                // delay deleting objs that were added before but not during InitApply
74                                .for_each(|obj| {
75                                    if let Some(inner) = Arc::into_inner(obj) {
76                                        let meta_descr = MetaDescribe::from_meta(inner.meta());
77                                        if !init_buffer_meta.contains(&meta_descr) {
78                                            let implied_deletion_event = watcher::Event::Delete(inner);
79                                            trace!(message = "Delaying processing implied deletion.", event_type = std::any::type_name::<K>(), event = ?implied_deletion_event);
80                                            delay_queue.insert(implied_deletion_event, delay_deletion);
81                                            meta_cache.delete(&meta_descr);
82                                        }
83                                    }
84                                });
85
86                                init_buffer_meta.clear();
87                            }
88                        }
89                    },
90                    Some(Err(error)) => {
91                        warn!(message = "Watcher Stream received an error. Retrying.", ?error);
92                    },
93                    // The watcher stream should never yield `None`
94                    // https://docs.rs/kube-runtime/0.71.0/src/kube_runtime/watcher.rs.html#234-237
95                    None => {
96                        unreachable!("a watcher Stream never ends");
97                    },
98                }
99            }
100            result = delay_queue.next(), if !delay_queue.is_empty() => {
101                match result {
102                    Some(event) => {
103                        let event = event.into_inner();
104                        match event {
105                            watcher::Event::Delete(ref obj) => {
106                                let meta_desc = MetaDescribe::from_meta(obj.meta());
107                                if !meta_cache.contains(&meta_desc) {
108                                    trace!(message = "Processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
109                                    store.apply_watcher_event(&event);
110                                }
111                            },
112                            _ => store.apply_watcher_event(&event),
113                        }
114                    },
115                    // DelayQueue returns None if the queue is exhausted,
116                    // however we disable the DelayQueue branch if there are
117                    // no items in the queue.
118                    None => {
119                        unreachable!("an empty DelayQueue is never polled");
120                    },
121                }
122            }
123        }
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use std::time::Duration;
130
131    use futures::channel::mpsc;
132    use futures_util::SinkExt;
133    use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
134    use kube::runtime::{
135        reflector::{store, ObjectRef},
136        watcher,
137    };
138
139    use super::custom_reflector;
140    use super::MetaCache;
141
142    #[tokio::test]
143    async fn applied_should_add_object() {
144        let store_w = store::Writer::default();
145        let store = store_w.as_reader();
146        let cm = ConfigMap {
147            metadata: ObjectMeta {
148                name: Some("a".to_string()),
149                ..ObjectMeta::default()
150            },
151            ..ConfigMap::default()
152        };
153        let (mut tx, rx) = mpsc::channel::<_>(5);
154        tx.send(Ok(watcher::Event::Apply(cm.clone())))
155            .await
156            .unwrap();
157        let meta_cache = MetaCache::new();
158        tokio::spawn(custom_reflector(
159            store_w,
160            meta_cache,
161            rx,
162            Duration::from_secs(1),
163        ));
164        tokio::time::sleep(Duration::from_secs(1)).await;
165        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
166    }
167
168    #[tokio::test]
169    async fn deleted_should_remove_object_after_delay() {
170        let store_w = store::Writer::default();
171        let store = store_w.as_reader();
172        let cm = ConfigMap {
173            metadata: ObjectMeta {
174                name: Some("a".to_string()),
175                ..ObjectMeta::default()
176            },
177            ..ConfigMap::default()
178        };
179        let (mut tx, rx) = mpsc::channel::<_>(5);
180        tx.send(Ok(watcher::Event::Apply(cm.clone())))
181            .await
182            .unwrap();
183        tx.send(Ok(watcher::Event::Delete(cm.clone())))
184            .await
185            .unwrap();
186        let meta_cache = MetaCache::new();
187        tokio::spawn(custom_reflector(
188            store_w,
189            meta_cache,
190            rx,
191            Duration::from_secs(2),
192        ));
193        // Ensure the Resource is still available after deletion
194        tokio::time::sleep(Duration::from_secs(1)).await;
195        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
196        // Ensure the Resource is removed once the `delay_deletion` has elapsed
197        tokio::time::sleep(Duration::from_secs(5)).await;
198        assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
199    }
200
201    #[tokio::test]
202    async fn deleted_should_not_remove_object_still_in_use() {
203        let store_w = store::Writer::default();
204        let store = store_w.as_reader();
205        let cm = ConfigMap {
206            metadata: ObjectMeta {
207                name: Some("name".to_string()),
208                namespace: Some("namespace".to_string()),
209                ..ObjectMeta::default()
210            },
211            ..ConfigMap::default()
212        };
213        let (mut tx, rx) = mpsc::channel::<_>(5);
214        tx.send(Ok(watcher::Event::Apply(cm.clone())))
215            .await
216            .unwrap();
217        tx.send(Ok(watcher::Event::Delete(cm.clone())))
218            .await
219            .unwrap();
220        tx.send(Ok(watcher::Event::Apply(cm.clone())))
221            .await
222            .unwrap();
223        let meta_cache = MetaCache::new();
224        tokio::spawn(custom_reflector(
225            store_w,
226            meta_cache,
227            rx,
228            Duration::from_secs(2),
229        ));
230        tokio::time::sleep(Duration::from_secs(1)).await;
231        // Ensure the Resource is still available after deletion
232        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
233        tokio::time::sleep(Duration::from_secs(5)).await;
234        // Ensure the Resource is still available after Applied event
235        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
236    }
237}