Skip to content

Commit

Permalink
Merge pull request #6857 from planetscale/ds-hc-cross-cell-70
Browse files Browse the repository at this point in the history
healthcheck: should receive healthcheck updates from all tablets in cells_to_watch
  • Loading branch information
askdba authored Oct 13, 2020
2 parents ab9f507 + 2713aef commit 91a595c
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 46 deletions.
29 changes: 13 additions & 16 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
// It does not block on making connection.
// name is an optional tag for the tablet, e.g. an alternative address.
func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
// check whether we should really add this tablet
if !hc.isIncluded(tablet) {
return
}
// check whether grpc port is present on tablet, if not return
if tablet.PortMap["grpc"] == 0 {
return
Expand Down Expand Up @@ -368,9 +364,6 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
// RemoveTablet removes the tablet, and stops the health check.
// It does not block.
func (hc *HealthCheckImpl) RemoveTablet(tablet *topodata.Tablet) {
if !hc.isIncluded(tablet) {
return
}
hc.deleteTablet(tablet)
}

Expand Down Expand Up @@ -453,7 +446,10 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealt
all := hc.healthData[targetKey]
allArray := make([]*TabletHealth, 0, len(all))
for _, s := range all {
allArray = append(allArray, s)
// only tablets in same cell / cellAlias are included in healthy list
if hc.isIncluded(shr) {
allArray = append(allArray, s)
}
}
hc.healthy[targetKey] = FilterStatsByReplicationLag(allArray)
}
Expand All @@ -462,7 +458,10 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealt
all := hc.healthData[oldTargetKey]
allArray := make([]*TabletHealth, 0, len(all))
for _, s := range all {
allArray = append(allArray, s)
// only tablets in same cell / cellAlias are included in healthy list
if hc.isIncluded(shr) {
allArray = append(allArray, s)
}
}
hc.healthy[oldTargetKey] = FilterStatsByReplicationLag(allArray)
}
Expand Down Expand Up @@ -673,10 +672,8 @@ func (hc *HealthCheckImpl) keyFromTablet(tablet *topodata.Tablet) keyspaceShardT
return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tablet.Type)))
}

// getAliasByCell should only be called while holding hc.mu
func (hc *HealthCheckImpl) getAliasByCell(cell string) string {
hc.mu.Lock()
defer hc.mu.Unlock()

if alias, ok := hc.cellAliases[cell]; ok {
return alias
}
Expand All @@ -689,14 +686,14 @@ func (hc *HealthCheckImpl) getAliasByCell(cell string) string {
return alias
}

func (hc *HealthCheckImpl) isIncluded(tablet *topodata.Tablet) bool {
if tablet.Type == topodata.TabletType_MASTER {
func (hc *HealthCheckImpl) isIncluded(shr *query.StreamHealthResponse) bool {
if shr.Target.TabletType == topodata.TabletType_MASTER {
return true
}
if tablet.Alias.Cell == hc.cell {
if shr.TabletAlias.Cell == hc.cell {
return true
}
if hc.getAliasByCell(tablet.Alias.Cell) == hc.getAliasByCell(hc.cell) {
if hc.getAliasByCell(shr.TabletAlias.Cell) == hc.getAliasByCell(hc.cell) {
return true
}
return false
Expand Down
149 changes: 119 additions & 30 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,31 +612,77 @@ func TestGetHealthyTablets(t *testing.T) {
mustMatch(t, want, a, "unexpected result")
}

func TestAliases(t *testing.T) {
ts := memorytopo.NewServer("cell", "cell1", "cell2")
hc := createTestHc(ts)
func TestMasterInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
defer hc.Close()

cellsAlias := &topodatapb.CellsAlias{
Cells: []string{"cell", "cell1"},
// add a tablet as master in different cell
tablet := createTestTablet(1, "cell2", "host1")
tablet.Type = topodatapb.TabletType_MASTER
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// should get a result, but this will hang if multi-cell logic is broken
// so wait and timeout
ticker := time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case <-resultChan:
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}
assert.Nil(t, ts.CreateCellsAlias(context.Background(), "region1", cellsAlias), "failed to create cell alias")
defer ts.DeleteCellsAlias(context.Background(), "region1")
cellsAlias = &topodatapb.CellsAlias{
Cells: []string{"cell2"},

shr := &querypb.StreamHealthResponse{
TabletAlias: tablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
Serving: true,
TabletExternallyReparentedTimestamp: 20,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 0, CpuUsage: 0.2},
}
want := &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 0, CpuUsage: 0.2},
MasterTermStartTime: 20,
}
assert.Nil(t, ts.CreateCellsAlias(context.Background(), "region2", cellsAlias), "failed to create cell alias")
defer ts.DeleteCellsAlias(context.Background(), "region2")

// add a tablet as replica in diff cell, same region
tablet := createTestTablet(1, "cell1", "host3")
input <- shr
ticker = time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case got := <-resultChan:
// check that we DO receive health check update for MASTER in other cell
mustMatch(t, want, got, "Wrong TabletHealth data")
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}

// check that MASTER tablet from other cell IS in healthy tablet list
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER})
assert.Len(t, a, 1, "")
mustMatch(t, want, a[0], "Expecting healthy master")
}

func TestReplicaInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
defer hc.Close()

// add a tablet as replica in different cell
tablet := createTestTablet(1, "cell2", "host1")
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// should get a result, but this will hang if cell alias logic is broken
// should get a result, but this will hang if multi-cell logic is broken
// so wait and timeout
ticker := time.NewTicker(1 * time.Second)
select {
Expand All @@ -654,46 +700,89 @@ func TestAliases(t *testing.T) {
TabletExternallyReparentedTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
}
want := []*TabletHealth{{
want := &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
MasterTermStartTime: 0,
}}
}

input <- shr
ticker = time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case <-resultChan:
case got := <-resultChan:
// check that we DO receive health check update for REPLICA in other cell
mustMatch(t, want, got, "Wrong TabletHealth data")
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}

// check it's there
// check that REPLICA tablet from other cell is NOT in healthy tablet list
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
mustMatch(t, want, a, "Wrong TabletHealth data")
assert.Empty(t, a)

// add another tablet in a diff cell, diff region
tablet2 := createTestTablet(2, "cell2", "host4")
tablet2.Type = topodatapb.TabletType_REPLICA
input2 := make(chan *querypb.StreamHealthResponse)
fc = createFakeConn(tablet2, input2)
hc.AddTablet(tablet2)
// we should NOT get a result because this tablet is not of interest to us
}

func TestCellAliases(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
defer hc.Close()

cellsAlias := &topodatapb.CellsAlias{
Cells: []string{"cell1", "cell2"},
}
assert.Nil(t, ts.CreateCellsAlias(context.Background(), "region1", cellsAlias), "failed to create cell alias")
defer ts.DeleteCellsAlias(context.Background(), "region1")

// add a tablet as replica in diff cell, same region
tablet := createTestTablet(1, "cell2", "host2")
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// should get a result, but this will hang if cell alias logic is broken
// so wait and timeout
ticker := time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case <-resultChan:
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}

shr := &querypb.StreamHealthResponse{
TabletAlias: tablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
TabletExternallyReparentedTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
}
want := []*TabletHealth{{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
MasterTermStartTime: 0,
}}

input <- shr
ticker = time.NewTicker(1 * time.Second)
select {
case err := <-fc.cbErrCh:
require.Fail(t, "Unexpected error: %v", err)
case result := <-resultChan:
require.Fail(t, "Unexpected result: %v", result)
case <-resultChan:
case <-ticker.C:
require.Fail(t, "Timed out waiting for HealthCheck update")
}

// check that we still have only tablet in healthy list
a = hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
// check it's there
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
mustMatch(t, want, a, "Wrong TabletHealth data")
}

Expand Down

0 comments on commit 91a595c

Please sign in to comment.