Skip to content

Commit

Permalink
kvserver: avoid redundant liveness heartbeats under a thundering herd
Browse files Browse the repository at this point in the history
When a node's liveness expires, either because its liveness record's
epoch is incremented or it is just slow to heartbeat its record, all of
its epoch-based leases immediately become invalid. As a result, we often
see a thundering herd of requests attempt to synchronously heartbeat the
node's liveness record, on the order of the number of ranges that lost
their lease.

We already limit the concurrency of these heartbeats to 1, so there is
not a huge concern that this will lead to overwhelming the liveness
range, but it does cause other issues. For one, it means that we end up
heartbeating the liveness record very rapidly, which causes large growth
in MVCC history. It also means that heartbeats at the end of the queue
have to wait for all other heartbeats in front of it to complete. Even
if these heartbeats only take 5ms each, if there are 100 of them
waiting, then the last one in line will wait for 500ms and its range
will be unavailable during this time. This also has the potential to
starve the liveness heartbeat loop, which isn't a problem in and of
itself as long as other synchronous heartbeats are succeeding, but
leads to concerning log warnings. Finally, this was an instance where
we were adding additional load to a cluster once it was close to being
overloaded. That's generally a bad property for a system that wants
to stay stable, and this change helps avoid it.

The solution here is to detect redundant heartbeats and make them no-ops
where possible. This has a similar effect to if we were to explicitly
coalesce heartbeats, but it's easier to reason about and requires us to
maintain less state.

The commit is conservative about this, providing a fairly strong
guarantee that a heartbeat attempt, if successful, will ensure that the
liveness record's expiration will be at least the liveness threshold
above the time that the method was called. We may be able to relax this
and say that the heartbeat attempt will just ensure that the expiration
is now above that of the oldLiveness provided, but this weakened
guarantee seems harder to reason about as a consumer of this interface.

Release note (performance improvement): ranges recover moderately faster
when their leaseholder is briefly down before becoming live again.
  • Loading branch information
nvanbenschoten committed Aug 10, 2020
1 parent 09804ff commit 1dc18df
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestGossipNodeLivenessOnLeaseChange(t *testing.T) {
// Turn off liveness heartbeats on all nodes to ensure that updates to node
// liveness are not triggering gossiping.
for i := range mtc.nodeLivenesses {
mtc.nodeLivenesses[i].PauseHeartbeat(true)
mtc.nodeLivenesses[i].PauseHeartbeatLoopForTest()
}

nodeLivenessKey := gossip.MakeNodeLivenessKey(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {

// Turn off liveness heartbeats on the second store, then advance the clock
// past the liveness expiration time. This expires all leases on all stores.
mtc.nodeLivenesses[1].PauseHeartbeat(true)
mtc.nodeLivenesses[1].PauseHeartbeatLoopForTest()
mtc.advanceClock(ctx)

// Manually heartbeat the liveness on the first store to ensure it's
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ func TestRefreshPendingCommands(t *testing.T) {
// Disable node liveness heartbeats which can reacquire leases when we're
// trying to expire them. We pause liveness heartbeats here after node 0
// was restarted (which creates a new NodeLiveness).
pauseNodeLivenessHeartbeats(mtc, true)
pauseNodeLivenessHeartbeatLoops(mtc)

// Start draining stores 0 and 1 to prevent them from grabbing any new
// leases.
Expand Down Expand Up @@ -3900,7 +3900,7 @@ func TestRangeQuiescence(t *testing.T) {
defer mtc.Stop()
mtc.Start(t, 3)

pauseNodeLivenessHeartbeats(mtc, true)
pauseNodeLivenessHeartbeatLoops(mtc)

// Replica range 1 to all 3 nodes.
const rangeID = roachpb.RangeID(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func forceLeaseTransferOnSubsumedRange(
}
return nil
})
restartHeartbeats := oldLeaseholderStore.NodeLiveness().DisableAllHeartbeatsForTest()
restartHeartbeats := oldLeaseholderStore.NodeLiveness().PauseAllHeartbeatsForTest()
defer restartHeartbeats()
log.Infof(ctx, "paused RHS rightLeaseholder's liveness heartbeats")
time.Sleep(oldLeaseholderStore.NodeLiveness().GetLivenessThreshold())
Expand Down
119 changes: 91 additions & 28 deletions pkg/kv/kvserver/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ var (
Measurement: "Nodes",
Unit: metric.Unit_COUNT,
}
metaHeartbeatsInFlight = metric.Metadata{
Name: "liveness.heartbeatsinflight",
Help: "Number of in-flight liveness heartbeats from this node",
Measurement: "Requests",
Unit: metric.Unit_COUNT,
}
metaHeartbeatSuccesses = metric.Metadata{
Name: "liveness.heartbeatsuccesses",
Help: "Number of successful node liveness heartbeats from this node",
Expand Down Expand Up @@ -109,6 +115,7 @@ var (
// LivenessMetrics holds metrics for use with node liveness activity.
type LivenessMetrics struct {
LiveNodes *metric.Gauge
HeartbeatsInFlight *metric.Gauge
HeartbeatSuccesses *metric.Counter
HeartbeatFailures *metric.Counter
EpochIncrements *metric.Counter
Expand Down Expand Up @@ -215,6 +222,7 @@ func NewNodeLiveness(
}
nl.metrics = LivenessMetrics{
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
HeartbeatsInFlight: metric.NewGauge(metaHeartbeatsInFlight),
HeartbeatSuccesses: metric.NewCounter(metaHeartbeatSuccesses),
HeartbeatFailures: metric.NewCounter(metaHeartbeatFailures),
EpochIncrements: metric.NewCounter(metaEpochIncrements),
Expand Down Expand Up @@ -612,27 +620,27 @@ of network connectivity problems. For help troubleshooting, visit:
`

// PauseHeartbeat stops or restarts the periodic heartbeat depending on the
// pause parameter. When pause is true, waits until it acquires the heartbeatToken
// (unless heartbeat was already paused); this ensures that no heartbeats happen
// after this is called. This function is only safe for use in tests.
func (nl *NodeLiveness) PauseHeartbeat(pause bool) {
if pause {
if swapped := atomic.CompareAndSwapUint32(&nl.heartbeatPaused, 0, 1); swapped {
<-nl.heartbeatToken
}
} else {
// PauseHeartbeatLoopForTest stops the periodic heartbeat. The function
// waits until it acquires the heartbeatToken (unless heartbeat was
// already paused); this ensures that no heartbeats happen after this is
// called. Returns a closure to call to re-enable the heartbeat loop.
// This function is only safe for use in tests.
func (nl *NodeLiveness) PauseHeartbeatLoopForTest() func() {
if swapped := atomic.CompareAndSwapUint32(&nl.heartbeatPaused, 0, 1); swapped {
<-nl.heartbeatToken
}
return func() {
if swapped := atomic.CompareAndSwapUint32(&nl.heartbeatPaused, 1, 0); swapped {
nl.heartbeatToken <- struct{}{}
}
}
}

// DisableAllHeartbeatsForTest disables all node liveness heartbeats, including
// those triggered from outside the normal StartHeartbeat loop. Returns a
// closure to call to re-enable heartbeats. Only safe for use in tests.
func (nl *NodeLiveness) DisableAllHeartbeatsForTest() func() {
nl.PauseHeartbeat(true)
// PauseSynchronousHeartbeatsForTest disables all node liveness
// heartbeats triggered from outside the normal StartHeartbeat loop.
// Returns a closure to call to re-enable synchronous heartbeats. Only
// safe for use in tests.
func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() {
nl.selfSem <- struct{}{}
nl.otherSem <- struct{}{}
return func() {
Expand All @@ -641,6 +649,19 @@ func (nl *NodeLiveness) DisableAllHeartbeatsForTest() func() {
}
}

// PauseAllHeartbeatsForTest disables all node liveness heartbeats,
// including those triggered from outside the normal StartHeartbeat
// loop. Returns a closure to call to re-enable heartbeats. Only safe
// for use in tests.
func (nl *NodeLiveness) PauseAllHeartbeatsForTest() func() {
enableLoop := nl.PauseHeartbeatLoopForTest()
enableSync := nl.PauseSynchronousHeartbeatsForTest()
return func() {
enableLoop()
enableSync()
}
}

var errNodeAlreadyLive = errors.New("node already live")

// Heartbeat is called to update a node's expiration timestamp. This
Expand Down Expand Up @@ -678,6 +699,24 @@ func (nl *NodeLiveness) heartbeatInternal(
}
}(timeutil.Now())

// Collect a clock reading from before we begin queuing on the heartbeat
// semaphore. This method (attempts to, see [*]) guarantees that, if
// successful, the liveness record's expiration will be at least the
// liveness threshold above the time that the method was called.
// Collecting this clock reading before queuing allows us to enforce
// this while avoiding redundant liveness heartbeats during thundering
// herds without needing to explicitly coalesce heartbeats.
//
// [*]: see TODO below about how errNodeAlreadyLive handling does not
// enforce this guarantee.
beforeQueueTS := nl.clock.Now()
minExpiration := hlc.LegacyTimestamp(
beforeQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))

// Before queueing, record the heartbeat as in-flight.
nl.metrics.HeartbeatsInFlight.Inc(1)
defer nl.metrics.HeartbeatsInFlight.Dec(1)

// Allow only one heartbeat at a time.
nodeID := nl.gossip.NodeID.Get()
sem := nl.sem(nodeID)
Expand All @@ -690,6 +729,20 @@ func (nl *NodeLiveness) heartbeatInternal(
<-sem
}()

// If we are not intending to increment the node's liveness epoch, detect
// whether this heartbeat is needed anymore. It is possible that we queued
// for long enough on the sempahore such that other heartbeat attempts ahead
// of us already incremented the expiration past what we wanted. Note that
// if we allowed the heartbeat to proceed in this case, we know that it
// would hit a ConditionFailedError and return a errNodeAlreadyLive down
// below.
if !incrementEpoch {
curLiveness, err := nl.Self()
if err == nil && minExpiration.Less(curLiveness.Expiration) {
return nil
}
}

// Let's compute what our new liveness record should be.
var newLiveness kvserverpb.Liveness
if oldLiveness == (kvserverpb.Liveness{}) {
Expand All @@ -706,18 +759,17 @@ func (nl *NodeLiveness) heartbeatInternal(
}
}

// We need to add the maximum clock offset to the expiration because it's
// used when determining liveness for a node.
{
newLiveness.Expiration = hlc.LegacyTimestamp(
nl.clock.Now().Add((nl.livenessThreshold).Nanoseconds(), 0))
// This guards against the system clock moving backwards. As long
// as the cockroach process is running, checks inside hlc.Clock
// will ensure that the clock never moves backwards, but these
// checks don't work across process restarts.
if newLiveness.Expiration.Less(oldLiveness.Expiration) {
return errors.Errorf("proposed liveness update expires earlier than previous record")
}
// Grab a new clock reading to compute the new expiration time,
// since we may have queued on the semaphore for a while.
afterQueueTS := nl.clock.Now()
newLiveness.Expiration = hlc.LegacyTimestamp(
afterQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))
// This guards against the system clock moving backwards. As long
// as the cockroach process is running, checks inside hlc.Clock
// will ensure that the clock never moves backwards, but these
// checks don't work across process restarts.
if newLiveness.Expiration.Less(oldLiveness.Expiration) {
return errors.Errorf("proposed liveness update expires earlier than previous record")
}

update := livenessUpdate{
Expand All @@ -740,6 +792,17 @@ func (nl *NodeLiveness) heartbeatInternal(
// expired while in flight, so maybe we don't have to care about
// that and only need to distinguish between same and different
// epochs in our return value.
//
// TODO(nvanbenschoten): Unlike the early return above, this doesn't
// guarantee that the resulting expiration is past minExpiration,
// only that it's different than our oldLiveness. Is that ok? It
// hasn't caused issues so far, but we might want to detect this
// case and retry, at least in the case of the liveness heartbeat
// loop. The downside of this is that a heartbeat that's intending
// to bump the expiration of a record out 9s into the future may
// return a success even if the expiration is only 5 seconds in the
// future. The next heartbeat will then start with only 0.5 seconds
// before expiration.
if actual.IsLive(nl.clock.Now().GoTime()) && !incrementEpoch {
return errNodeAlreadyLive
}
Expand Down Expand Up @@ -770,7 +833,7 @@ func (nl *NodeLiveness) Self() (kvserverpb.Liveness, error) {
if err != nil {
return kvserverpb.Liveness{}, err
}
return rec.Liveness, err
return rec.Liveness, nil
}

// SelfEx is like Self, but returns the raw, encoded value that the database has
Expand Down
Loading

0 comments on commit 1dc18df

Please sign in to comment.