vector/kubernetes/
reflector.rs1use std::{hash::Hash, sync::Arc, time::Duration};
4
5use futures::StreamExt;
6use futures_util::Stream;
7use kube::{
8 runtime::{reflector::store, watcher},
9 Resource,
10};
11use tokio::pin;
12use tokio_util::time::DelayQueue;
13
14use super::meta_cache::{MetaCache, MetaDescribe};
15
16pub async fn custom_reflector<K, W>(
18 mut store: store::Writer<K>,
19 mut meta_cache: MetaCache,
20 stream: W,
21 delay_deletion: Duration,
22) where
23 K: Resource + Clone + std::fmt::Debug,
24 K::DynamicType: Eq + Hash + Clone,
25 W: Stream<Item = watcher::Result<watcher::Event<K>>>,
26{
27 pin!(stream);
28 let mut delay_queue = DelayQueue::default();
29 let mut init_buffer_meta = Vec::new();
30 loop {
31 tokio::select! {
32 result = stream.next() => {
33 match result {
34 Some(Ok(event)) => {
35 match event {
36 watcher::Event::Apply(ref obj) => {
38 trace!(message = "Processing Apply event.", event_type = std::any::type_name::<K>(), event = ?event);
39 store.apply_watcher_event(&event);
40 let meta_descr = MetaDescribe::from_meta(obj.meta());
41 meta_cache.store(meta_descr);
42 }
43 watcher::Event::Delete(ref obj) => {
45 trace!(message = "Delaying processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
46 delay_queue.insert(event.to_owned(), delay_deletion);
47 let meta_descr = MetaDescribe::from_meta(obj.meta());
48 meta_cache.delete(&meta_descr);
49 }
50 watcher::Event::Init => {
52 trace!(message = "Processing Init event.", event_type = std::any::type_name::<K>(), event = ?event);
53 delay_queue.clear();
54 store.apply_watcher_event(&event);
55 meta_cache.clear();
56 init_buffer_meta.clear();
57 }
58 watcher::Event::InitApply(ref obj) => {
60 trace!(message = "Processing InitApply event.", event_type = std::any::type_name::<K>(), event = ?event);
61 store.apply_watcher_event(&event);
62 let meta_descr = MetaDescribe::from_meta(obj.meta());
63 meta_cache.store(meta_descr.clone());
64 init_buffer_meta.push(meta_descr.clone());
65 }
66 watcher::Event::InitDone => {
68 trace!(message = "Processing InitDone event.", event_type = std::any::type_name::<K>(), event = ?event);
69 store.apply_watcher_event(&event);
70
71
72 store.as_reader().state().into_iter()
73 .for_each(|obj| {
75 if let Some(inner) = Arc::into_inner(obj) {
76 let meta_descr = MetaDescribe::from_meta(inner.meta());
77 if !init_buffer_meta.contains(&meta_descr) {
78 let implied_deletion_event = watcher::Event::Delete(inner);
79 trace!(message = "Delaying processing implied deletion.", event_type = std::any::type_name::<K>(), event = ?implied_deletion_event);
80 delay_queue.insert(implied_deletion_event, delay_deletion);
81 meta_cache.delete(&meta_descr);
82 }
83 }
84 });
85
86 init_buffer_meta.clear();
87 }
88 }
89 },
90 Some(Err(error)) => {
91 warn!(message = "Watcher Stream received an error. Retrying.", ?error);
92 },
93 None => {
96 unreachable!("a watcher Stream never ends");
97 },
98 }
99 }
100 result = delay_queue.next(), if !delay_queue.is_empty() => {
101 match result {
102 Some(event) => {
103 let event = event.into_inner();
104 match event {
105 watcher::Event::Delete(ref obj) => {
106 let meta_desc = MetaDescribe::from_meta(obj.meta());
107 if !meta_cache.contains(&meta_desc) {
108 trace!(message = "Processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
109 store.apply_watcher_event(&event);
110 }
111 },
112 _ => store.apply_watcher_event(&event),
113 }
114 },
115 None => {
119 unreachable!("an empty DelayQueue is never polled");
120 },
121 }
122 }
123 }
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use std::time::Duration;
130
131 use futures::channel::mpsc;
132 use futures_util::SinkExt;
133 use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
134 use kube::runtime::{
135 reflector::{store, ObjectRef},
136 watcher,
137 };
138
139 use super::custom_reflector;
140 use super::MetaCache;
141
142 #[tokio::test]
143 async fn applied_should_add_object() {
144 let store_w = store::Writer::default();
145 let store = store_w.as_reader();
146 let cm = ConfigMap {
147 metadata: ObjectMeta {
148 name: Some("a".to_string()),
149 ..ObjectMeta::default()
150 },
151 ..ConfigMap::default()
152 };
153 let (mut tx, rx) = mpsc::channel::<_>(5);
154 tx.send(Ok(watcher::Event::Apply(cm.clone())))
155 .await
156 .unwrap();
157 let meta_cache = MetaCache::new();
158 tokio::spawn(custom_reflector(
159 store_w,
160 meta_cache,
161 rx,
162 Duration::from_secs(1),
163 ));
164 tokio::time::sleep(Duration::from_secs(1)).await;
165 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
166 }
167
168 #[tokio::test]
169 async fn deleted_should_remove_object_after_delay() {
170 let store_w = store::Writer::default();
171 let store = store_w.as_reader();
172 let cm = ConfigMap {
173 metadata: ObjectMeta {
174 name: Some("a".to_string()),
175 ..ObjectMeta::default()
176 },
177 ..ConfigMap::default()
178 };
179 let (mut tx, rx) = mpsc::channel::<_>(5);
180 tx.send(Ok(watcher::Event::Apply(cm.clone())))
181 .await
182 .unwrap();
183 tx.send(Ok(watcher::Event::Delete(cm.clone())))
184 .await
185 .unwrap();
186 let meta_cache = MetaCache::new();
187 tokio::spawn(custom_reflector(
188 store_w,
189 meta_cache,
190 rx,
191 Duration::from_secs(2),
192 ));
193 tokio::time::sleep(Duration::from_secs(1)).await;
195 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
196 tokio::time::sleep(Duration::from_secs(5)).await;
198 assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
199 }
200
201 #[tokio::test]
202 async fn deleted_should_not_remove_object_still_in_use() {
203 let store_w = store::Writer::default();
204 let store = store_w.as_reader();
205 let cm = ConfigMap {
206 metadata: ObjectMeta {
207 name: Some("name".to_string()),
208 namespace: Some("namespace".to_string()),
209 ..ObjectMeta::default()
210 },
211 ..ConfigMap::default()
212 };
213 let (mut tx, rx) = mpsc::channel::<_>(5);
214 tx.send(Ok(watcher::Event::Apply(cm.clone())))
215 .await
216 .unwrap();
217 tx.send(Ok(watcher::Event::Delete(cm.clone())))
218 .await
219 .unwrap();
220 tx.send(Ok(watcher::Event::Apply(cm.clone())))
221 .await
222 .unwrap();
223 let meta_cache = MetaCache::new();
224 tokio::spawn(custom_reflector(
225 store_w,
226 meta_cache,
227 rx,
228 Duration::from_secs(2),
229 ));
230 tokio::time::sleep(Duration::from_secs(1)).await;
231 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
233 tokio::time::sleep(Duration::from_secs(5)).await;
234 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
236 }
237}