Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

healthcheck: update healthy tablets correctly when a stream returns an error or times out #7654

Merged
merged 3 commits into from
Mar 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,20 +404,20 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
}
}

func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealthResponse, currentTarget *query.Target, trivialNonMasterUpdate bool, isMasterUpdate bool, isMasterChange bool) {
func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Target, trivialUpdate bool, isPrimaryUp bool) {
// hc.healthByAlias is authoritative, it should be updated
hc.mu.Lock()
defer hc.mu.Unlock()

tabletAlias := tabletAliasString(topoproto.TabletAliasString(shr.TabletAlias))

hcErrorCounters.Add([]string{shr.Target.Keyspace, shr.Target.Shard, topoproto.TabletTypeLString(shr.Target.TabletType)}, 0)
targetKey := hc.keyFromTarget(shr.Target)
targetChanged := currentTarget.TabletType != shr.Target.TabletType || currentTarget.Keyspace != shr.Target.Keyspace || currentTarget.Shard != shr.Target.Shard
tabletAlias := tabletAliasString(topoproto.TabletAliasString(th.Tablet.Alias))
targetKey := hc.keyFromTarget(th.Target)
targetChanged := prevTarget.TabletType != th.Target.TabletType || prevTarget.Keyspace != th.Target.Keyspace || prevTarget.Shard != th.Target.Shard
if targetChanged {
// Error counter has to be set here in case we get a new tablet type for the first time in a stream response
hcErrorCounters.Add([]string{th.Target.Keyspace, th.Target.Shard, topoproto.TabletTypeLString(th.Target.TabletType)}, 0)
// keyspace and shard are not expected to change, but just in case ...
// move this tabletHealthCheck to the correct map
oldTargetKey := hc.keyFromTarget(currentTarget)
oldTargetKey := hc.keyFromTarget(prevTarget)
delete(hc.healthData[oldTargetKey], tabletAlias)
_, ok := hc.healthData[targetKey]
if !ok {
Expand All @@ -427,44 +427,52 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealt
// add it to the map by target
hc.healthData[targetKey][tabletAlias] = th

if isMasterUpdate {
isPrimary := th.Target.TabletType == topodata.TabletType_MASTER
switch {
case isPrimary && isPrimaryUp:
if len(hc.healthy[targetKey]) == 0 {
hc.healthy[targetKey] = append(hc.healthy[targetKey], th)
} else {
// We already have one up server, see if we
// need to replace it.
if shr.TabletExternallyReparentedTimestamp < hc.healthy[targetKey][0].MasterTermStartTime {
if th.MasterTermStartTime < hc.healthy[targetKey][0].MasterTermStartTime {
log.Warningf("not marking healthy master %s as Up for %s because its MasterTermStartTime is smaller than the highest known timestamp from previous MASTERs %s: %d < %d ",
topoproto.TabletAliasString(shr.TabletAlias),
topoproto.KeyspaceShardString(shr.Target.Keyspace, shr.Target.Shard),
topoproto.TabletAliasString(th.Tablet.Alias),
topoproto.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard),
topoproto.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias),
shr.TabletExternallyReparentedTimestamp,
th.MasterTermStartTime,
hc.healthy[targetKey][0].MasterTermStartTime)
} else {
// Just replace it.
hc.healthy[targetKey][0] = th
}
}
case isPrimary && !isPrimaryUp:
// No healthy master tablet
hc.healthy[targetKey] = []*TabletHealth{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deepthi / @sougou I think this might not behave correctly in case of an unplanned failover with an external reparent, please let me know if my thinking is correct here #7906

Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. This should be fixed.

}
if !trivialNonMasterUpdate {

if !trivialUpdate {
// We re-sort the healthy tablet list whenever we get a health update for tablets we can route to.
// Tablets from other cells for non-master targets should not trigger a re-sort;
// they should also be excluded from healthy list.
if shr.Target.TabletType != topodata.TabletType_MASTER && hc.isIncluded(shr.Target.TabletType, shr.TabletAlias) {
if th.Target.TabletType != topodata.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) {
hc.recomputeHealthy(targetKey)
}
if targetChanged && currentTarget.TabletType != topodata.TabletType_MASTER && hc.isIncluded(shr.Target.TabletType, shr.TabletAlias) { // also recompute old target's healthy list
oldTargetKey := hc.keyFromTarget(currentTarget)
if targetChanged && prevTarget.TabletType != topodata.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { // also recompute old target's healthy list
oldTargetKey := hc.keyFromTarget(prevTarget)
hc.recomputeHealthy(oldTargetKey)
}
}
if isMasterChange {
log.Errorf("Adding 1 to MasterPromoted counter for tablet: %v, shr.Tablet: %v, shr.TabletType: %v", currentTarget, topoproto.TabletAliasString(shr.TabletAlias), shr.Target.TabletType)
hcMasterPromotedCounters.Add([]string{shr.Target.Keyspace, shr.Target.Shard}, 1)

isNewPrimary := isPrimary && prevTarget.TabletType != topodata.TabletType_MASTER
if isNewPrimary {
log.Errorf("Adding 1 to MasterPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType)
hcMasterPromotedCounters.Add([]string{th.Target.Keyspace, th.Target.Shard}, 1)
}

// broadcast to subscribers
hc.broadcast(th)

}

func (hc *HealthCheckImpl) recomputeHealthy(key keyspaceShardTabletType) {
Expand Down
73 changes: 72 additions & 1 deletion go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func init() {
}

func TestHealthCheck(t *testing.T) {
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer("cell")
hc := createTestHc(ts)
// close healthcheck
Expand Down Expand Up @@ -257,6 +259,69 @@ func TestHealthCheckStreamError(t *testing.T) {
result = <-resultChan
//TODO: figure out how to compare objects that contain errors using utils.MustMatch
assert.True(t, want.DeepEqual(result), "Wrong TabletHealth data\n Expected: %v\n Actual: %v", want, result)
// tablet should be removed from healthy list
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
assert.Empty(t, a, "wrong result, expected empty list")
}

// TestHealthCheckErrorOnPrimary is the same as TestHealthCheckStreamError except for tablet type
func TestHealthCheckErrorOnPrimary(t *testing.T) {
ts := memorytopo.NewServer("cell")
hc := createTestHc(ts)
defer hc.Close()

tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse)
resultChan := hc.Subscribe()
fc := createFakeConn(tablet, input)
fc.errCh = make(chan error)
hc.AddTablet(tablet)

// Immediately after AddTablet() there will be the first notification.
want := &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
Serving: false,
MasterTermStartTime: 0,
}
result := <-resultChan
mustMatch(t, want, result, "Wrong TabletHealth data")

// one tablet after receiving a StreamHealthResponse
shr := &querypb.StreamHealthResponse{
TabletAlias: tablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
Serving: true,
TabletExternallyReparentedTimestamp: 10,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
}
want = &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
MasterTermStartTime: 10,
}
input <- shr
result = <-resultChan
mustMatch(t, want, result, "Wrong TabletHealth data")

// Stream error
fc.errCh <- fmt.Errorf("some stream error")
want = &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
Serving: false,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
MasterTermStartTime: 10,
LastError: fmt.Errorf("some stream error"),
}
result = <-resultChan
//TODO: figure out how to compare objects that contain errors using utils.MustMatch
assert.True(t, want.DeepEqual(result), "Wrong TabletHealth data\n Expected: %v\n Actual: %v", want, result)
// tablet should be removed from healthy list
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER})
assert.Empty(t, a, "wrong result, expected empty list")
}

func TestHealthCheckVerifiesTabletAlias(t *testing.T) {
Expand Down Expand Up @@ -363,6 +428,8 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) {
}

func TestHealthCheckTimeout(t *testing.T) {
// reset counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer("cell")
hc := createTestHc(ts)
hc.healthCheckTimeout = 500 * time.Millisecond
Expand Down Expand Up @@ -410,6 +477,10 @@ func TestHealthCheckTimeout(t *testing.T) {
assert.Nil(t, checkErrorCounter("k", "s", topodatapb.TabletType_REPLICA, 1))
assert.True(t, fc.isCanceled(), "StreamHealth should be canceled after timeout, but is not")

// tablet should be removed from healthy list
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
assert.Empty(t, a, "wrong result, expected empty list")

// repeat the wait. It will timeout one more time trying to get the connection.
fc.resetCanceledFlag()
time.Sleep(hc.healthCheckTimeout)
Expand Down Expand Up @@ -798,7 +869,7 @@ func TestMasterInOtherCell(t *testing.T) {

// check that MASTER tablet from other cell IS in healthy tablet list
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER})
assert.Len(t, a, 1, "")
require.Len(t, a, 1, "")
mustMatch(t, want, a[0], "Expecting healthy master")
}

Expand Down
23 changes: 14 additions & 9 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,10 @@ func (thc *tabletHealthCheck) processResponse(hc *HealthCheckImpl, shr *query.St
return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, fmt.Sprintf("health stats mismatch, tablet %+v alias does not match response alias %v", thc.Tablet, shr.TabletAlias))
}

currentTarget := thc.Target
prevTarget := thc.Target
// check whether this is a trivial update so as to update healthy map
trivialNonMasterUpdate := thc.LastError == nil && thc.Serving && shr.RealtimeStats.HealthError == "" && shr.Serving &&
currentTarget.TabletType != topodata.TabletType_MASTER && currentTarget.TabletType == shr.Target.TabletType && thc.isTrivialReplagChange(shr.RealtimeStats)
isMasterUpdate := shr.Target.TabletType == topodata.TabletType_MASTER
isMasterChange := thc.Target.TabletType != topodata.TabletType_MASTER && shr.Target.TabletType == topodata.TabletType_MASTER
trivialUpdate := thc.LastError == nil && thc.Serving && shr.RealtimeStats.HealthError == "" && shr.Serving &&
prevTarget.TabletType != topodata.TabletType_MASTER && prevTarget.TabletType == shr.Target.TabletType && thc.isTrivialReplagChange(shr.RealtimeStats)
thc.lastResponseTimestamp = time.Now()
thc.Target = shr.Target
thc.MasterTermStartTime = shr.TabletExternallyReparentedTimestamp
Expand All @@ -202,7 +200,7 @@ func (thc *tabletHealthCheck) processResponse(hc *HealthCheckImpl, shr *query.St
thc.setServingState(serving, reason)

// notify downstream for master change
hc.updateHealth(thc.SimpleCopy(), shr, currentTarget, trivialNonMasterUpdate, isMasterUpdate, isMasterChange)
hc.updateHealth(thc.SimpleCopy(), prevTarget, trivialUpdate, true)
return nil
}

Expand Down Expand Up @@ -241,6 +239,9 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
hc.connsWG.Done()
}()

// Initialize error counter
hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 0)

retryDelay := hc.retryDelay
for {
streamCtx, streamCancel := context.WithCancel(thc.ctx)
Expand Down Expand Up @@ -287,12 +288,14 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
streamCancel()

if err != nil {
hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 1)
if strings.Contains(err.Error(), "health stats mismatch") {
hc.deleteTablet(thc.Tablet)
return
}
res := thc.SimpleCopy()
hc.broadcast(res)
// trivialUpdate = false because this is an error
// isPrimaryUp = false because we did not get a healthy response
hc.updateHealth(thc.SimpleCopy(), thc.Target, false, false)
}
// If there was a timeout send an error. We do this after stream has returned.
// This will ensure that this update prevails over any previous message that
Expand All @@ -301,7 +304,9 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
thc.LastError = fmt.Errorf("healthcheck timed out (latest %v)", thc.lastResponseTimestamp)
thc.setServingState(false, thc.LastError.Error())
hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 1)
hc.broadcast(thc.SimpleCopy())
// trivialUpdate = false because this is an error
// isPrimaryUp = false because we did not get a healthy response within the timeout
hc.updateHealth(thc.SimpleCopy(), thc.Target, false, false)
}

// Streaming RPC failed e.g. because vttablet was restarted or took too long.
Expand Down