vector/kubernetes/
reflector.rs1use std::{hash::Hash, sync::Arc, time::Duration};
4
5use futures::StreamExt;
6use futures_util::Stream;
7use kube::{
8 Resource,
9 runtime::{reflector::store, watcher},
10};
11use tokio::pin;
12use tokio_util::time::DelayQueue;
13
14use super::meta_cache::{MetaCache, MetaDescribe};
15
16pub async fn custom_reflector<K, W>(
18 mut store: store::Writer<K>,
19 mut meta_cache: MetaCache,
20 stream: W,
21 delay_deletion: Duration,
22) where
23 K: Resource + Clone + std::fmt::Debug,
24 K::DynamicType: Eq + Hash + Clone,
25 W: Stream<Item = watcher::Result<watcher::Event<K>>>,
26{
27 pin!(stream);
28 let mut delay_queue = DelayQueue::default();
29 let mut init_buffer_meta = Vec::new();
30 loop {
31 tokio::select! {
32 result = stream.next() => {
33 match result {
34 Some(Ok(event)) => {
35 match event {
36 watcher::Event::Apply(ref obj) => {
38 trace!(message = "Processing Apply event.", event_type = std::any::type_name::<K>(), event = ?event);
39 store.apply_watcher_event(&event);
40 let meta_descr = MetaDescribe::from_meta(obj.meta());
41 meta_cache.store(meta_descr);
42 }
43 watcher::Event::Delete(ref obj) => {
45 trace!(message = "Delaying processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
46 delay_queue.insert(event.to_owned(), delay_deletion);
47 let meta_descr = MetaDescribe::from_meta(obj.meta());
48 meta_cache.delete(&meta_descr);
49 }
50 watcher::Event::Init => {
52 trace!(message = "Processing Init event.", event_type = std::any::type_name::<K>(), event = ?event);
53 delay_queue.clear();
54 store.apply_watcher_event(&event);
55 meta_cache.clear();
56 init_buffer_meta.clear();
57 }
58 watcher::Event::InitApply(ref obj) => {
60 trace!(message = "Processing InitApply event.", event_type = std::any::type_name::<K>(), event = ?event);
61 store.apply_watcher_event(&event);
62 let meta_descr = MetaDescribe::from_meta(obj.meta());
63 meta_cache.store(meta_descr.clone());
64 init_buffer_meta.push(meta_descr.clone());
65 }
66 watcher::Event::InitDone => {
68 trace!(message = "Processing InitDone event.", event_type = std::any::type_name::<K>(), event = ?event);
69 store.apply_watcher_event(&event);
70
71
72 store.as_reader().state().into_iter()
73 .for_each(|obj| {
75 if let Some(inner) = Arc::into_inner(obj) {
76 let meta_descr = MetaDescribe::from_meta(inner.meta());
77 if !init_buffer_meta.contains(&meta_descr) {
78 let implied_deletion_event = watcher::Event::Delete(inner);
79 trace!(message = "Delaying processing implied deletion.", event_type = std::any::type_name::<K>(), event = ?implied_deletion_event);
80 delay_queue.insert(implied_deletion_event, delay_deletion);
81 meta_cache.delete(&meta_descr);
82 }
83 }
84 });
85
86 init_buffer_meta.clear();
87 }
88 }
89 },
90 Some(Err(error)) => {
91 warn!(message = "Watcher Stream received an error. Retrying.", ?error);
92 },
93 None => {
96 unreachable!("a watcher Stream never ends");
97 },
98 }
99 }
100 result = delay_queue.next(), if !delay_queue.is_empty() => {
101 match result {
102 Some(event) => {
103 let event = event.into_inner();
104 match event {
105 watcher::Event::Delete(ref obj) => {
106 let meta_desc = MetaDescribe::from_meta(obj.meta());
107 if !meta_cache.contains(&meta_desc) {
108 trace!(message = "Processing Delete event.", event_type = std::any::type_name::<K>(), event = ?event);
109 store.apply_watcher_event(&event);
110 }
111 },
112 _ => store.apply_watcher_event(&event),
113 }
114 },
115 None => {
119 unreachable!("an empty DelayQueue is never polled");
120 },
121 }
122 }
123 }
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use std::time::Duration;
130
131 use futures::channel::mpsc;
132 use futures_util::SinkExt;
133 use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
134 use kube::runtime::{
135 reflector::{ObjectRef, store},
136 watcher,
137 };
138
139 use super::{MetaCache, custom_reflector};
140
141 #[tokio::test]
142 async fn applied_should_add_object() {
143 let store_w = store::Writer::default();
144 let store = store_w.as_reader();
145 let cm = ConfigMap {
146 metadata: ObjectMeta {
147 name: Some("a".to_string()),
148 ..ObjectMeta::default()
149 },
150 ..ConfigMap::default()
151 };
152 let (mut tx, rx) = mpsc::channel::<_>(5);
153 tx.send(Ok(watcher::Event::Apply(cm.clone())))
154 .await
155 .unwrap();
156 let meta_cache = MetaCache::new();
157 tokio::spawn(custom_reflector(
158 store_w,
159 meta_cache,
160 rx,
161 Duration::from_secs(1),
162 ));
163 tokio::time::sleep(Duration::from_secs(1)).await;
164 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
165 }
166
167 #[tokio::test]
168 async fn deleted_should_remove_object_after_delay() {
169 let store_w = store::Writer::default();
170 let store = store_w.as_reader();
171 let cm = ConfigMap {
172 metadata: ObjectMeta {
173 name: Some("a".to_string()),
174 ..ObjectMeta::default()
175 },
176 ..ConfigMap::default()
177 };
178 let (mut tx, rx) = mpsc::channel::<_>(5);
179 tx.send(Ok(watcher::Event::Apply(cm.clone())))
180 .await
181 .unwrap();
182 tx.send(Ok(watcher::Event::Delete(cm.clone())))
183 .await
184 .unwrap();
185 let meta_cache = MetaCache::new();
186 tokio::spawn(custom_reflector(
187 store_w,
188 meta_cache,
189 rx,
190 Duration::from_secs(2),
191 ));
192 tokio::time::sleep(Duration::from_secs(1)).await;
194 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
195 tokio::time::sleep(Duration::from_secs(5)).await;
197 assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
198 }
199
200 #[tokio::test]
201 async fn deleted_should_not_remove_object_still_in_use() {
202 let store_w = store::Writer::default();
203 let store = store_w.as_reader();
204 let cm = ConfigMap {
205 metadata: ObjectMeta {
206 name: Some("name".to_string()),
207 namespace: Some("namespace".to_string()),
208 ..ObjectMeta::default()
209 },
210 ..ConfigMap::default()
211 };
212 let (mut tx, rx) = mpsc::channel::<_>(5);
213 tx.send(Ok(watcher::Event::Apply(cm.clone())))
214 .await
215 .unwrap();
216 tx.send(Ok(watcher::Event::Delete(cm.clone())))
217 .await
218 .unwrap();
219 tx.send(Ok(watcher::Event::Apply(cm.clone())))
220 .await
221 .unwrap();
222 let meta_cache = MetaCache::new();
223 tokio::spawn(custom_reflector(
224 store_w,
225 meta_cache,
226 rx,
227 Duration::from_secs(2),
228 ));
229 tokio::time::sleep(Duration::from_secs(1)).await;
230 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
232 tokio::time::sleep(Duration::from_secs(5)).await;
233 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
235 }
236}