Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] healthcheck: should receive healthcheck updates from all tablets in cells_to_watch #6858

Merged
merged 1 commit into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
// It does not block on making connection.
// name is an optional tag for the tablet, e.g. an alternative address.
func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
// check whether we should really add this tablet
if !hc.isIncluded(tablet) {
return
}
// check whether grpc port is present on tablet, if not return
if tablet.PortMap["grpc"] == 0 {
return
Expand Down Expand Up @@ -368,9 +364,6 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
// RemoveTablet removes the tablet, and stops the health check.
// It does not block.
func (hc *HealthCheckImpl) RemoveTablet(tablet *topodata.Tablet) {
if !hc.isIncluded(tablet) {
return
}
hc.deleteTablet(tablet)
}

Expand Down Expand Up @@ -453,7 +446,10 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealt
all := hc.healthData[targetKey]
allArray := make([]*TabletHealth, 0, len(all))
for _, s := range all {
allArray = append(allArray, s)
// only tablets in same cell / cellAlias are included in healthy list
if hc.isIncluded(shr) {
allArray = append(allArray, s)
}
}
hc.healthy[targetKey] = FilterStatsByReplicationLag(allArray)
}
Expand All @@ -462,7 +458,10 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealt
all := hc.healthData[oldTargetKey]
allArray := make([]*TabletHealth, 0, len(all))
for _, s := range all {
allArray = append(allArray, s)
// only tablets in same cell / cellAlias are included in healthy list
if hc.isIncluded(shr) {
allArray = append(allArray, s)
}
}
hc.healthy[oldTargetKey] = FilterStatsByReplicationLag(allArray)
}
Expand Down Expand Up @@ -703,10 +702,8 @@ func (hc *HealthCheckImpl) keyFromTablet(tablet *topodata.Tablet) keyspaceShardT
return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tablet.Type)))
}

// getAliasByCell should only be called while holding hc.mu
func (hc *HealthCheckImpl) getAliasByCell(cell string) string {
hc.mu.Lock()
defer hc.mu.Unlock()

if alias, ok := hc.cellAliases[cell]; ok {
return alias
}
Expand All @@ -719,14 +716,14 @@ func (hc *HealthCheckImpl) getAliasByCell(cell string) string {
return alias
}

func (hc *HealthCheckImpl) isIncluded(tablet *topodata.Tablet) bool {
if tablet.Type == topodata.TabletType_MASTER {
func (hc *HealthCheckImpl) isIncluded(shr *query.StreamHealthResponse) bool {
if shr.Target.TabletType == topodata.TabletType_MASTER {
return true
}
if tablet.Alias.Cell == hc.cell {
if shr.TabletAlias.Cell == hc.cell {
return true
}
if hc.getAliasByCell(tablet.Alias.Cell) == hc.getAliasByCell(hc.cell) {
if hc.getAliasByCell(shr.TabletAlias.Cell) == hc.getAliasByCell(hc.cell) {
return true
}
return false
Expand Down
149 changes: 119 additions & 30 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,31 +701,77 @@ func TestGetHealthyTablets(t *testing.T) {
mustMatch(t, want, a, "unexpected result")
}

func TestAliases(t *testing.T) {
ts := memorytopo.NewServer("cell", "cell1", "cell2")
hc := createTestHc(ts)
func TestMasterInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
defer hc.Close()

cellsAlias := &topodatapb.CellsAlias{
Cells: []string{"cell", "cell1"},
// add a tablet as master in different cell
tablet := createTestTablet(1, "cell2", "host1")
tablet.Type = topodatapb.TabletType_MASTER
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// should get a result, but this will hang if multi-cell logic is broken
// so wait and timeout
ticker := time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case <-resultChan:
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}
assert.Nil(t, ts.CreateCellsAlias(context.Background(), "region1", cellsAlias), "failed to create cell alias")
defer ts.DeleteCellsAlias(context.Background(), "region1")
cellsAlias = &topodatapb.CellsAlias{
Cells: []string{"cell2"},

shr := &querypb.StreamHealthResponse{
TabletAlias: tablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
Serving: true,
TabletExternallyReparentedTimestamp: 20,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 0, CpuUsage: 0.2},
}
want := &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 0, CpuUsage: 0.2},
MasterTermStartTime: 20,
}
assert.Nil(t, ts.CreateCellsAlias(context.Background(), "region2", cellsAlias), "failed to create cell alias")
defer ts.DeleteCellsAlias(context.Background(), "region2")

// add a tablet as replica in diff cell, same region
tablet := createTestTablet(1, "cell1", "host3")
input <- shr
ticker = time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case got := <-resultChan:
// check that we DO receive health check update for MASTER in other cell
mustMatch(t, want, got, "Wrong TabletHealth data")
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}

// check that MASTER tablet from other cell IS in healthy tablet list
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER})
assert.Len(t, a, 1, "")
mustMatch(t, want, a[0], "Expecting healthy master")
}

func TestReplicaInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
defer hc.Close()

// add a tablet as replica in different cell
tablet := createTestTablet(1, "cell2", "host1")
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// should get a result, but this will hang if cell alias logic is broken
// should get a result, but this will hang if multi-cell logic is broken
// so wait and timeout
ticker := time.NewTicker(1 * time.Second)
select {
Expand All @@ -743,46 +789,89 @@ func TestAliases(t *testing.T) {
TabletExternallyReparentedTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
}
want := []*TabletHealth{{
want := &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
MasterTermStartTime: 0,
}}
}

input <- shr
ticker = time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case <-resultChan:
case got := <-resultChan:
// check that we DO receive health check update for REPLICA in other cell
mustMatch(t, want, got, "Wrong TabletHealth data")
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}

// check it's there
// check that REPLICA tablet from other cell is NOT in healthy tablet list
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
mustMatch(t, want, a, "Wrong TabletHealth data")
assert.Empty(t, a)

// add another tablet in a diff cell, diff region
tablet2 := createTestTablet(2, "cell2", "host4")
tablet2.Type = topodatapb.TabletType_REPLICA
input2 := make(chan *querypb.StreamHealthResponse)
fc = createFakeConn(tablet2, input2)
hc.AddTablet(tablet2)
// we should NOT get a result because this tablet is not of interest to us
}

func TestCellAliases(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
defer hc.Close()

cellsAlias := &topodatapb.CellsAlias{
Cells: []string{"cell1", "cell2"},
}
assert.Nil(t, ts.CreateCellsAlias(context.Background(), "region1", cellsAlias), "failed to create cell alias")
defer ts.DeleteCellsAlias(context.Background(), "region1")

// add a tablet as replica in diff cell, same region
tablet := createTestTablet(1, "cell2", "host2")
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// should get a result, but this will hang if cell alias logic is broken
// so wait and timeout
ticker := time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case <-resultChan:
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}

shr := &querypb.StreamHealthResponse{
TabletAlias: tablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
TabletExternallyReparentedTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
}
want := []*TabletHealth{{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
MasterTermStartTime: 0,
}}

input <- shr
ticker = time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case result := <-resultChan:
require.Fail(t, "Unexpected result: %v", result)
case <-resultChan:
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}

// check that we still have only tablet in healthy list
a = hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
// check it's there
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
mustMatch(t, want, a, "Wrong TabletHealth data")
}

Expand Down