From 7e2a15274d0ef99a3b7c841446bc104535ed2118 Mon Sep 17 00:00:00 2001 From: clux Date: Fri, 13 May 2022 20:07:10 +0100 Subject: [PATCH] PoC Predicates for kube-rs/kube-rs#52 Signed-off-by: clux --- src/manager.rs | 50 ++++++++++++++++++++++++++++++++++++++++++++++++-- yaml/crd.yaml | 4 ++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/src/manager.rs b/src/manager.rs index f799301..00a6697 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -38,7 +38,7 @@ pub struct DocumentSpec { #[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] pub struct DocumentStatus { hidden: bool, - //last_updated: Option>, + last_updated: Option>, } impl Document { @@ -60,10 +60,55 @@ struct Data { state: Arc>, /// Various prometheus metrics metrics: Metrics, + /// Cache for predicate + predicate_cache: Arc, i64>>>, +} + + +use kube::runtime::reflector::ObjectRef; +pub trait Predicate { + fn cmp_update(&self, store: &mut HashMap, V>, obj: &K) -> bool; +} + +// A simple getter from a &K to some optional value V will be able to compare against +// a value in a HashMap, V> +impl Option)> Predicate for F +where + K::DynamicType: Default + Eq + std::hash::Hash, + V: PartialEq, +{ + fn cmp_update(&self, cache: &mut HashMap, V>, obj: &K) -> bool { + if let Some(val) = (self)(obj) { + let key = ObjectRef::from_obj(obj); + let changed = if let Some(old) = cache.get(&key) { + *old != val // changed if key different + } else { + true // always changed if not in map + }; + if let Some(old) = cache.get_mut(&key) { + *old = val; + } else { + cache.insert(key, val); + } + changed + } else { + true + } + } +} + +// impl Predicate +fn generation(x: &Document) -> Option { + x.meta().generation } #[instrument(skip(ctx, doc), fields(trace_id))] async fn reconcile(doc: Arc, ctx: Arc) -> Result { + if !generation.cmp_update(&mut *ctx.predicate_cache.write().await, &doc) { + info!("ignoring generationally equivalent reconcile for {}", doc.name()); + return Ok(Action::requeue(Duration::from_secs(30 * 60))); + } + let trace_id = telemetry::get_trace_id(); Span::current().record("trace_id", &field::display(&trace_id)); let start = Instant::now(); @@ -97,7 +142,7 @@ async fn reconcile(doc: Arc, ctx: Arc) -> Result "kind": "Document", "status": DocumentStatus { hidden: should_hide, - //last_updated: Some(Utc::now()), + last_updated: Some(Utc::now()), } })); let ps = PatchParams::apply("cntrlr").force(); @@ -193,6 +238,7 @@ impl Manager { client: client.clone(), metrics: metrics.clone(), state: state.clone(), + predicate_cache: Arc::new(RwLock::new(HashMap::new())), }); let docs = Api::::all(client); diff --git a/yaml/crd.yaml b/yaml/crd.yaml index 9d52ad9..259e03a 100644 --- a/yaml/crd.yaml +++ b/yaml/crd.yaml @@ -39,6 +39,10 @@ spec: properties: hidden: type: boolean + last_updated: + format: date-time + nullable: true + type: string required: - hidden type: object