use std::{hash::Hash, time::Duration};
use futures::StreamExt;
use futures_util::Stream;
use kube::{
runtime::{reflector::store, watcher},
Resource,
};
use tokio::pin;
use tokio_util::time::DelayQueue;
use super::meta_cache::{MetaCache, MetaDescribe};
pub async fn custom_reflector<K, W>(
mut store: store::Writer<K>,
mut meta_cache: MetaCache,
stream: W,
delay_deletion: Duration,
) where
K: Resource + Clone + std::fmt::Debug,
K::DynamicType: Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
{
pin!(stream);
let mut delay_queue = DelayQueue::default();
loop {
tokio::select! {
result = stream.next() => {
match result {
Some(Ok(event)) => {
match event {
watcher::Event::Applied(ref obj) => {
trace!(message = "Processing Applied event.", ?event);
store.apply_watcher_event(&event);
let meta_descr = MetaDescribe::from_meta(obj.meta());
meta_cache.store(meta_descr);
}
watcher::Event::Deleted(ref obj) => {
delay_queue.insert(event.to_owned(), delay_deletion);
let meta_descr = MetaDescribe::from_meta(obj.meta());
meta_cache.delete(&meta_descr);
}
watcher::Event::Restarted(_) => {
trace!(message = "Processing Restarted event.", ?event);
delay_queue.clear();
store.apply_watcher_event(&event);
meta_cache.clear();
}
}
},
Some(Err(error)) => {
warn!(message = "Watcher Stream received an error. Retrying.", ?error);
},
None => {
unreachable!("a watcher Stream never ends");
},
}
}
result = delay_queue.next(), if !delay_queue.is_empty() => {
match result {
Some(event) => {
let event = event.into_inner();
match event {
watcher::Event::Deleted(ref obj) => {
let meta_desc = MetaDescribe::from_meta(obj.meta());
if !meta_cache.contains(&meta_desc) {
trace!(message = "Processing Deleted event.", ?event);
store.apply_watcher_event(&event);
}
},
_ => store.apply_watcher_event(&event),
}
},
None => {
unreachable!("an empty DelayQueue is never polled");
},
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use futures::channel::mpsc;
use futures_util::SinkExt;
use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
use kube::runtime::{
reflector::{store, ObjectRef},
watcher,
};
use super::custom_reflector;
use super::MetaCache;
#[tokio::test]
async fn applied_should_add_object() {
let store_w = store::Writer::default();
let store = store_w.as_reader();
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("a".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let (mut tx, rx) = mpsc::channel::<_>(5);
tx.send(Ok(watcher::Event::Applied(cm.clone())))
.await
.unwrap();
let meta_cache = MetaCache::new();
tokio::spawn(custom_reflector(
store_w,
meta_cache,
rx,
Duration::from_secs(1),
));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}
#[tokio::test]
async fn deleted_should_remove_object_after_delay() {
let store_w = store::Writer::default();
let store = store_w.as_reader();
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("a".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let (mut tx, rx) = mpsc::channel::<_>(5);
tx.send(Ok(watcher::Event::Applied(cm.clone())))
.await
.unwrap();
tx.send(Ok(watcher::Event::Deleted(cm.clone())))
.await
.unwrap();
let meta_cache = MetaCache::new();
tokio::spawn(custom_reflector(
store_w,
meta_cache,
rx,
Duration::from_secs(2),
));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
tokio::time::sleep(Duration::from_secs(5)).await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
}
#[tokio::test]
async fn deleted_should_not_remove_object_still_in_use() {
let store_w = store::Writer::default();
let store = store_w.as_reader();
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("name".to_string()),
namespace: Some("namespace".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let (mut tx, rx) = mpsc::channel::<_>(5);
tx.send(Ok(watcher::Event::Applied(cm.clone())))
.await
.unwrap();
tx.send(Ok(watcher::Event::Deleted(cm.clone())))
.await
.unwrap();
tx.send(Ok(watcher::Event::Applied(cm.clone())))
.await
.unwrap();
let meta_cache = MetaCache::new();
tokio::spawn(custom_reflector(
store_w,
meta_cache,
rx,
Duration::from_secs(2),
));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
tokio::time::sleep(Duration::from_secs(5)).await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}
}