Skip to content

Commit

Permalink
PoC Predicates for kube-rs/kube#52
Browse files Browse the repository at this point in the history
Signed-off-by: clux <[email protected]>
  • Loading branch information
clux committed May 13, 2022
1 parent 37e962e commit 7e2a152
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
50 changes: 48 additions & 2 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct DocumentSpec {
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct DocumentStatus {
hidden: bool,
//last_updated: Option<DateTime<Utc>>,
last_updated: Option<DateTime<Utc>>,
}

impl Document {
Expand All @@ -60,10 +60,55 @@ struct Data {
state: Arc<RwLock<State>>,
/// Various prometheus metrics
metrics: Metrics,
/// Cache for predicate
predicate_cache: Arc<RwLock<HashMap<ObjectRef<Document>, i64>>>,
}


use kube::runtime::reflector::ObjectRef;
pub trait Predicate<K: Resource, V> {
fn cmp_update(&self, store: &mut HashMap<ObjectRef<K>, V>, obj: &K) -> bool;
}

// A simple getter from a &K to some optional value V will be able to compare against
// a value in a HashMap<ObjectRef<K>, V>
impl<K: Resource, V, F: (Fn(&K) -> Option<V>)> Predicate<K, V> for F
where
K::DynamicType: Default + Eq + std::hash::Hash,
V: PartialEq,
{
fn cmp_update(&self, cache: &mut HashMap<ObjectRef<K>, V>, obj: &K) -> bool {
if let Some(val) = (self)(obj) {
let key = ObjectRef::from_obj(obj);
let changed = if let Some(old) = cache.get(&key) {
*old != val // changed if key different
} else {
true // always changed if not in map
};
if let Some(old) = cache.get_mut(&key) {
*old = val;
} else {
cache.insert(key, val);
}
changed
} else {
true
}
}
}

// impl Predicate
fn generation(x: &Document) -> Option<i64> {
x.meta().generation
}

#[instrument(skip(ctx, doc), fields(trace_id))]
async fn reconcile(doc: Arc<Document>, ctx: Arc<Data>) -> Result<Action, Error> {
if !generation.cmp_update(&mut *ctx.predicate_cache.write().await, &doc) {
info!("ignoring generationally equivalent reconcile for {}", doc.name());
return Ok(Action::requeue(Duration::from_secs(30 * 60)));
}

let trace_id = telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));
let start = Instant::now();
Expand Down Expand Up @@ -97,7 +142,7 @@ async fn reconcile(doc: Arc<Document>, ctx: Arc<Data>) -> Result<Action, Error>
"kind": "Document",
"status": DocumentStatus {
hidden: should_hide,
//last_updated: Some(Utc::now()),
last_updated: Some(Utc::now()),
}
}));
let ps = PatchParams::apply("cntrlr").force();
Expand Down Expand Up @@ -193,6 +238,7 @@ impl Manager {
client: client.clone(),
metrics: metrics.clone(),
state: state.clone(),
predicate_cache: Arc::new(RwLock::new(HashMap::new())),
});

let docs = Api::<Document>::all(client);
Expand Down
4 changes: 4 additions & 0 deletions yaml/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ spec:
properties:
hidden:
type: boolean
last_updated:
format: date-time
nullable: true
type: string
required:
- hidden
type: object
Expand Down

0 comments on commit 7e2a152

Please sign in to comment.