diff --git a/policy-controller/k8s/status/src/index.rs b/policy-controller/k8s/status/src/index.rs index 2213c3e3a036f..ce21926e78679 100644 --- a/policy-controller/k8s/status/src/index.rs +++ b/policy-controller/k8s/status/src/index.rs @@ -371,30 +371,48 @@ impl Index { /// Controller applies patches or the write leaseholder changes, all /// routes have an up-to-date status. pub async fn run(index: Arc>, reconciliation_period: Duration) { - // Clone the claims watch out of the index. This will immediately - // drop the read lock on the index so that it is not held for the - // lifetime of this function. - let mut claims = index.read().claims.clone(); + // Extract what we need from the index so we don't need to lock it for + // housekeeping. + let (instance, mut claims) = { + let idx = index.read(); + (idx.name.clone(), idx.claims.clone()) + }; + + // The timer is reset when this instance becomes the leader and it is + // polled as long as it is the leader. The timer ensures that + // reconciliation happens at consistent intervals after leadership is + // acquired. + let mut timer = time::interval(reconciliation_period); + timer.set_missed_tick_behavior(time::MissedTickBehavior::Delay); + let mut was_leader = false; loop { + // Refresh the state of the lease on each iteration to ensure we're + // checking expiration. + let is_leader = claims.borrow_and_update().is_current_for(&instance); + if is_leader && !was_leader { + tracing::debug!("Became leader; resetting timer"); + timer.reset_immediately(); + } + was_leader = is_leader; + tokio::select! { + // Eagerly process claim updates to track leadership changes. If + // the claim changes, refesh the leadership status. + biased; res = claims.changed() => { res.expect("Claims watch must not be dropped"); - tracing::trace!("Lease updated"); + if tracing::enabled!(tracing::Level::TRACE) { + let c = claims.borrow(); + tracing::trace!(claim=?*c, "Changed"); + } } - _ = time::sleep(reconciliation_period) => {} - } - - // The claimant has changed, or we should attempt to reconcile all - //routes to account for any errors. In either case, we should - // only proceed if we are the current leader. - let claims = claims.borrow_and_update(); - let index = index.read(); - if !claims.is_current_for(&index.name) { - continue; + // Only wait for the timer if this instance is the leader. + _ = timer.tick(), if is_leader => { + index.read().reconcile_if_leader(); + } } - index.reconcile(); } } @@ -784,16 +802,16 @@ impl Index { make_patch(id, status) } + /// If this instance is the leader, reconcile the statuses for all resources + /// for which we control the status. fn reconcile_if_leader(&self) { let lease = self.claims.borrow(); if !lease.is_current_for(&self.name) { - tracing::trace!(%lease.holder, "Reconcilation skipped"); + tracing::trace!(%lease.holder, ?lease.expiry, "Reconcilation skipped"); return; } - self.reconcile(); - } + drop(lease); - fn reconcile(&self) { tracing::trace!( egressnetworks = self.egress_networks.len(), routes = self.route_refs.len(),