Skip to content

Commit

Permalink
fix(pool): Reset endpoints gauge on drop (#2928)
Browse files Browse the repository at this point in the history
The p2c pool tracks an endpoints gauge that is incremented and decremented as
endpoints are added and removed from the balancer. When a balancer is dropped,
the proxy does not reset this gauge to 0. If the balancer is recreated later,
this causes endpoints to appear to be double-counted.

This commit fixes this in two ways:

1. When the balancer is dropped, the endpoints gauge is reset to 0.
2. When the pool is reset (i.e., during initialization), we always reset the
   endpoints value rather than use individual inc/dec calls.
  • Loading branch information
olix0r authored Apr 26, 2024
1 parent 1ba97a7 commit 6a845fd
Showing 1 changed file with 52 additions and 2 deletions.
54 changes: 52 additions & 2 deletions linkerd/pool/p2c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ where
} else {
if t.is_none() {
tracing::info!(?addr, "Adding endpoint");
self.metrics.endpoints.inc();
} else {
tracing::info!(?addr, "Updating endpoint");
}
Expand All @@ -164,11 +163,11 @@ where
for (addr, _) in remaining.drain() {
tracing::info!(?addr, "Removing endpoint");
self.pool.evict(&addr);
self.metrics.endpoints.dec();
changed = true;
}

if changed {
self.metrics.endpoints.set(self.endpoints.len() as i64);
self.metrics.updates_reset.inc();
self.next_idx = None;
}
Expand Down Expand Up @@ -275,6 +274,12 @@ where
}
}

impl<T, N, Req, S> Drop for P2cPool<T, N, Req, S> {
fn drop(&mut self) {
self.metrics.endpoints.set(0);
}
}

// === impl P2cMetricFamilies ===

impl<L> Default for P2cMetricFamilies<L>
Expand Down Expand Up @@ -460,6 +465,51 @@ mod tests {
assert_eq!(metrics.updates_rm.get(), 1);
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn update_pool_handle_drop() {
let _trace = linkerd_tracing::test::with_default_filter("trace");

let addr0 = "192.168.10.10:80".parse().unwrap();
let addr1 = "192.168.10.11:80".parse().unwrap();
let addr2 = "192.168.10.12:80".parse().unwrap();

let seen = Arc::new(Mutex::new(HashSet::<(SocketAddr, usize)>::default()));
let metrics = P2cMetrics::default();
let mut pool = P2cPool::new(metrics.clone(), |(addr, n): (SocketAddr, usize)| {
assert!(seen.lock().insert((addr, n)));
PeakEwma::new(
linkerd_stack::service_fn(|()| {
std::future::ready(Ok::<_, std::convert::Infallible>(()))
}),
time::Duration::from_secs(1),
1.0 * 1000.0 * 1000.0,
CompleteOnResponse::default(),
)
});

pool.reset_pool(vec![(addr0, 0), (addr1, 0), (addr2, 0)]);
assert_eq!(pool.endpoints.len(), 3);
assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64);

drop(pool);
assert_eq!(metrics.endpoints.get(), 0);

let mut pool = P2cPool::new(metrics.clone(), |(addr, n): (SocketAddr, usize)| {
assert!(seen.lock().insert((addr, n)));
PeakEwma::new(
linkerd_stack::service_fn(|()| {
std::future::ready(Ok::<_, std::convert::Infallible>(()))
}),
time::Duration::from_secs(1),
1.0 * 1000.0 * 1000.0,
CompleteOnResponse::default(),
)
});
pool.reset_pool(vec![(addr0, 1), (addr1, 1), (addr2, 1)]);
assert_eq!(pool.endpoints.len(), 3);
assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64);
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn p2c_ready_index() {
let _trace = linkerd_tracing::test::with_default_filter("trace");
Expand Down

0 comments on commit 6a845fd

Please sign in to comment.