-
-
Notifications
You must be signed in to change notification settings - Fork 320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(runtime): Add series implementation for event recorder #1655
base: main
Are you sure you want to change the base?
feat(runtime): Add series implementation for event recorder #1655
Conversation
0971ca4
to
adeabe4
Compare
Signed-off-by: Alexander Gil <[email protected]>
adeabe4
to
e09e462
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey, thanks for this! looks largely sensible to me, gave a quick go over with some quick questions and minor nits, lmk if my thinking makes sense.
let default_namespace = "kube-system".to_owned(); // default does not work on k8s < 1.22 | ||
let events = Api::namespaced(client, reference.namespace.as_ref().unwrap_or(&default_namespace)); | ||
pub fn new(client: Client, reporter: Reporter) -> Self { | ||
let events_cache = Arc::new(RwLock::new(HashMap::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think this is equivalent to
let events_cache = Arc::new(RwLock::new(HashMap::new())); | |
let events_cache = Arc::default(); |
name: Some(format!( | ||
"{}.{}", | ||
reference.name.as_ref().unwrap_or(&self.reporter.controller), | ||
now.timestamp() | ||
)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afaikt the most significant thing here (while factoring out the event creation) is to move from a generated name to deterministic name and patch over it. however the name is the name of the object reference, is that sufficiently unique? is this aiming to have one event per object per event type?
related: ev.secondary, | ||
}) | ||
.await?; | ||
pub async fn publish(&self, ev: Event, reference: &ObjectReference) -> Result<(), kube_client::Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my gut feeling is that maybe it's a bit scary to make a breaking change here rather than making a new opt-in publishing alternative, but i'm not sure. the original behaviour was not the best with the mass creation of new objects, so maybe it's worth making this soft break in behavior for less IO. i need to have a quick test of this later.
let series = match &e.series { | ||
Some(series) => EventSeries { | ||
count: series.count + 1, | ||
last_observed_time: MicroTime(now), | ||
}, | ||
None => EventSeries { | ||
count: 2, | ||
last_observed_time: MicroTime(now), | ||
}, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can lift the match into the count:
line and create the struct once rather than in both arms;
let count = if let Some(s) = &e.series { s.count +1 } else { 2 };
let series = EventSeries { count, last_observed_time };
reporter: Reporter, | ||
reference: ObjectReference, | ||
events_cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
events_cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>, | |
cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>, |
minor preference nit: only one cache here
Client, | ||
}; | ||
use tokio::sync::RwLock; | ||
|
||
const EVENT_FINISH_TIME: Duration = Duration::minutes(6); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_TIME is perhaps misleading for a Duration unit.
const EVENT_FINISH_TIME: Duration = Duration::minutes(6); | |
const CACHE_TTL: Duration = Duration::minutes(6); |
or perhaps something else.
cache.retain(|_, v| { | ||
if let Some(series) = v.series.as_ref() { | ||
series.last_observed_time.0 + EVENT_FINISH_TIME > now | ||
} else if let Some(event_time) = v.event_time.as_ref() { | ||
event_time.0 + EVENT_FINISH_TIME > now | ||
} else { | ||
true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment suggestion for why there's a retain at the end of a publisher
cache.retain(|_, v| { | |
if let Some(series) = v.series.as_ref() { | |
series.last_observed_time.0 + EVENT_FINISH_TIME > now | |
} else if let Some(event_time) = v.event_time.as_ref() { | |
event_time.0 + EVENT_FINISH_TIME > now | |
} else { | |
true | |
} | |
// gc past events older than now + CACHE_TTL | |
cache.retain(|_, v| { | |
if let Some(series) = v.series.as_ref() { | |
series.last_observed_time.0 + CACHE_TTL > now | |
} else if let Some(event_time) = v.event_time.as_ref() { | |
event_time.0 + CACHE_TTL > now | |
} else { | |
true | |
} |
secondary: None, | ||
}, | ||
&s.object_ref(&()), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are changing .publish, we should make the integration test also publish twice to the same name and verify we get an EventSeries
|
||
/// Isomorphic key for caching similar events | ||
#[derive(Clone, Debug, PartialEq, Eq, Hash)] | ||
pub struct EventKey { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be pub?
Motivation
The current implementation of the event recorder just creates new events which is not enough for my event handling expectations.
Solution
I added a recorder implementation more similar to the one in client-go library. This implementation caches events in local and groups isomorphic events to increment series count on similar events.