Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Slack sync upstream 2018 12 17.r1 #121

Merged
merged 6 commits into from
Dec 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions go/vt/discovery/tablet_stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type tabletStatsCacheEntry struct {
healthy []*TabletStats
// aggregates has the per-cell aggregates.
aggregates map[string]*querypb.AggregateStats
// aggregatesPerRegion has the per-region aggregates.
aggregatesPerRegion map[string]*querypb.AggregateStats
}

func (e *tabletStatsCacheEntry) updateHealthyMapForMaster(ts *TabletStats) {
Expand Down Expand Up @@ -266,18 +268,21 @@ func (tc *TabletStatsCache) StatsUpdate(ts *TabletStats) {
tc.updateAggregateMap(ts.Target.Keyspace, ts.Target.Shard, ts.Target.TabletType, e, allArray)
}

// MakeAggregateMap takes a list of TabletStats and builds a per-cell
// makeAggregateMap takes a list of TabletStats and builds a per-cell
// AggregateStats map.
func MakeAggregateMap(stats []*TabletStats) map[string]*querypb.AggregateStats {
func (tc *TabletStatsCache) makeAggregateMap(stats []*TabletStats, buildForRegion bool) map[string]*querypb.AggregateStats {
result := make(map[string]*querypb.AggregateStats)
for _, ts := range stats {
cell := ts.Tablet.Alias.Cell
agg, ok := result[cell]
cellOrRegion := ts.Tablet.Alias.Cell
if buildForRegion {
cellOrRegion = tc.getRegionByCell(cellOrRegion)
}
agg, ok := result[cellOrRegion]
if !ok {
agg = &querypb.AggregateStats{
SecondsBehindMasterMin: math.MaxUint32,
}
result[cell] = agg
result[cellOrRegion] = agg
}

if ts.Serving && ts.LastError == nil {
Expand All @@ -295,9 +300,9 @@ func MakeAggregateMap(stats []*TabletStats) map[string]*querypb.AggregateStats {
return result
}

// MakeAggregateMapDiff computes the entries that need to be broadcast
// makeAggregateMapDiff computes the entries that need to be broadcast
// when the map goes from oldMap to newMap.
func MakeAggregateMapDiff(keyspace, shard string, tabletType topodatapb.TabletType, ter int64, oldMap map[string]*querypb.AggregateStats, newMap map[string]*querypb.AggregateStats) []*srvtopo.TargetStatsEntry {
func makeAggregateMapDiff(keyspace, shard string, tabletType topodatapb.TabletType, ter int64, oldMap map[string]*querypb.AggregateStats, newMap map[string]*querypb.AggregateStats) []*srvtopo.TargetStatsEntry {
var result []*srvtopo.TargetStatsEntry
for cell, oldValue := range oldMap {
newValue, ok := newMap[cell]
Expand Down Expand Up @@ -360,8 +365,9 @@ func MakeAggregateMapDiff(keyspace, shard string, tabletType topodatapb.TabletTy
func (tc *TabletStatsCache) updateAggregateMap(keyspace, shard string, tabletType topodatapb.TabletType, e *tabletStatsCacheEntry, stats []*TabletStats) {
// Save the new value
oldAgg := e.aggregates
newAgg := MakeAggregateMap(stats)
newAgg := tc.makeAggregateMap(stats /* buildForRegion */, false)
e.aggregates = newAgg
e.aggregatesPerRegion = tc.makeAggregateMap(stats /* buildForRegion */, true)

// And broadcast the change in the background, if we need to.
tc.mu.RLock()
Expand All @@ -376,7 +382,7 @@ func (tc *TabletStatsCache) updateAggregateMap(keyspace, shard string, tabletTyp
if len(stats) > 0 {
ter = stats[0].TabletExternallyReparentedTimestamp
}
diffs := MakeAggregateMapDiff(keyspace, shard, tabletType, ter, oldAgg, newAgg)
diffs := makeAggregateMapDiff(keyspace, shard, tabletType, ter, oldAgg, newAgg)
tc.aggregatesChan <- diffs
}

Expand Down Expand Up @@ -498,7 +504,8 @@ func (tc *TabletStatsCache) GetAggregateStats(target *querypb.Target) (*querypb.
return agg, nil
}
}
agg, ok := e.aggregates[target.Cell]
targetRegion := tc.getRegionByCell(target.Cell)
agg, ok := e.aggregatesPerRegion[targetRegion]
if !ok {
return nil, topo.NewError(topo.NoNode, topotools.TargetIdent(target))
}
Expand Down
22 changes: 17 additions & 5 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ var commands = []commandGroup{
"[-ping-tablets]",
"Validates that all nodes reachable from the global replication graph and that all tablets in all discoverable cells are consistent."},
{"ListAllTablets", commandListAllTablets,
"<cell name>",
"<cell name1>, <cell name2>, ...",
"Lists all tablets in an awk-friendly way."},
{"ListTablets", commandListTablets,
"<tablet alias> ...",
Expand Down Expand Up @@ -1799,12 +1799,24 @@ func commandListAllTablets(ctx context.Context, wr *wrangler.Wrangler, subFlags
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("the <cell name> argument is required for the ListAllTablets command")
var cells []string
var err error
if subFlags.NArg() == 1 {
cells = strings.Split(subFlags.Arg(0), ",")
} else {
cells, err = wr.TopoServer().GetKnownCells(ctx)
if err != nil {
return err
}
}

cell := subFlags.Arg(0)
return dumpAllTablets(ctx, wr, cell)
for _, cell := range cells {
err := dumpAllTablets(ctx, wr, cell)
if err != nil {
return err
}
}
return nil
}

func commandListTablets(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand Down
145 changes: 145 additions & 0 deletions go/vt/vtgate/gateway/discoverygateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,107 @@ func TestShuffleTablets(t *testing.T) {
}
}

func TestDiscoveryGatewayGetAggregateStats(t *testing.T) {
keyspace := "ks"
shard := "0"
hc := discovery.NewFakeHealthCheck()
dg := createDiscoveryGateway(hc, nil, "cell1", 2).(*discoveryGateway)

// replica should only use local ones
hc.Reset()
dg.tsc.ResetForTesting()
hc.AddTestTablet("cell1", "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil)
hc.AddTestTablet("cell1", "2.2.2.2", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil)
target := &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: topodatapb.TabletType_REPLICA,
Cell: "cell1",
}
tsl, err := dg.tsc.GetAggregateStats(target)
if err != nil {
t.Error(err)
}
if tsl.HealthyTabletCount != 2 {
t.Errorf("Expected 2 healthy replica tablets, got: %v", tsl.HealthyTabletCount)
}
}

func TestDiscoveryGatewayGetAggregateStatsRegion(t *testing.T) {
keyspace := "ks"
shard := "0"
hc := discovery.NewFakeHealthCheck()
dg := createDiscoveryGateway(hc, nil, "local-east", 2).(*discoveryGateway)

topo.UpdateCellsToRegionsForTests(map[string]string{
"local-west": "local",
"local-east": "local",
"remote": "remote",
})

hc.Reset()
dg.tsc.ResetForTesting()
hc.AddTestTablet("remote", "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil)
hc.AddTestTablet("local-west", "2.2.2.2", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil)
hc.AddTestTablet("local-east", "3.3.3.3", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil)

// Non master targets in the same region as the gateway should be discoverable
target := &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: topodatapb.TabletType_REPLICA,
Cell: "local-west",
}
tsl, err := dg.tsc.GetAggregateStats(target)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if tsl.HealthyTabletCount != 2 {
t.Errorf("Expected 2 healthy replica tablets, got: %v", tsl.HealthyTabletCount)
}
}

func TestDiscoveryGatewayGetAggregateStatsMaster(t *testing.T) {
keyspace := "ks"
shard := "0"
hc := discovery.NewFakeHealthCheck()
dg := createDiscoveryGateway(hc, nil, "cell1", 2).(*discoveryGateway)

// replica should only use local ones
hc.Reset()
dg.tsc.ResetForTesting()
hc.AddTestTablet("cell1", "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_MASTER, true, 10, nil)
target := &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: topodatapb.TabletType_MASTER,
Cell: "cell1",
}
tsl, err := dg.tsc.GetAggregateStats(target)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if tsl.HealthyTabletCount != 1 {
t.Errorf("Expected one healthy master, got: %v", tsl.HealthyTabletCount)
}

// You can get aggregate regardless of the cell when requesting a master
target = &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: topodatapb.TabletType_MASTER,
Cell: "cell2",
}

tsl, err = dg.tsc.GetAggregateStats(target)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if tsl.HealthyTabletCount != 1 {
t.Errorf("Expected one healthy master, got: %v", tsl.HealthyTabletCount)
}
}

func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) {
keyspace := "ks"
shard := "0"
Expand All @@ -230,6 +331,50 @@ func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) {
}
}

func BenchmarkOneCellGetAggregateStats(b *testing.B) { benchmarkCellsGetAggregateStats(1, b) }

func BenchmarkTenCellGetAggregateStats(b *testing.B) { benchmarkCellsGetAggregateStats(10, b) }

func Benchmark100CellGetAggregateStats(b *testing.B) { benchmarkCellsGetAggregateStats(100, b) }

func Benchmark1000CellGetAggregateStats(b *testing.B) { benchmarkCellsGetAggregateStats(1000, b) }

func benchmarkCellsGetAggregateStats(i int, b *testing.B) {
keyspace := "ks"
shard := "0"
hc := discovery.NewFakeHealthCheck()
dg := createDiscoveryGateway(hc, nil, "cell0", 2).(*discoveryGateway)
cellsToregions := make(map[string]string)
for j := 0; j < i; j++ {
cell := fmt.Sprintf("cell%v", j)
cellsToregions[cell] = "local"
}

topo.UpdateCellsToRegionsForTests(cellsToregions)
hc.Reset()
dg.tsc.ResetForTesting()

for j := 0; j < i; j++ {
cell := fmt.Sprintf("cell%v", j)
ip := fmt.Sprintf("%v.%v.%v,%v", j, j, j, j)
hc.AddTestTablet(cell, ip, 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil)
}

target := &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: topodatapb.TabletType_REPLICA,
Cell: "cell0",
}

for n := 0; n < b.N; n++ {
_, err := dg.tsc.GetAggregateStats(target)
if err != nil {
b.Fatalf("Expected no error, got %v", err)
}
}
}

func testDiscoveryGatewayGeneric(t *testing.T, streaming bool, f func(dg Gateway, target *querypb.Target) error) {
keyspace := "ks"
shard := "0"
Expand Down