Skip to content

Commit

Permalink
reload keyspace shards if not specified
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Dec 13, 2024
1 parent a1c4167 commit 0df0848
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 52 deletions.
82 changes: 44 additions & 38 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
clustersToWatch []string
shutdownWaitTime = 30 * time.Second
shardsLockCounter int32
shardsToWatch = make(map[string]bool, 0)
shardsToWatch map[string][]string
shardsToWatchMu sync.Mutex

// ErrNoPrimaryTablet is a fixed error message.
Expand All @@ -64,27 +64,26 @@ func RegisterFlags(fs *pflag.FlagSet) {
// updateShardsToWatch parses the --clusters_to_watch flag-value
// into a map of keyspace/shards.
func updateShardsToWatch() {
if ts == nil {
if len(clustersToWatch) == 0 {
return
}
shardsToWatchMu.Lock()
defer shardsToWatchMu.Unlock()

newShardsToWatch := make(map[string][]string, 0)
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") {
// Validate keyspace/shard parses.
if _, _, err := topoproto.ParseKeyspaceShard(ks); err != nil {
k, s, err := topoproto.ParseKeyspaceShard(ks)
if err != nil {
log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err)
continue
}
shardsToWatch[ks] = true
newShardsToWatch[k] = append(newShardsToWatch[k], s)
} else {
// Remove trailing slash, if exists.
ks = strings.TrimSuffix(ks, "/")

// Assume this is a keyspace and find all shards in keyspace.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
// Assume this is a keyspace and find all shards in keyspace.
// Remove trailing slash if exists.
ks = strings.TrimSuffix(ks, "/")
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
// Log the err and continue.
Expand All @@ -95,11 +94,38 @@ func updateShardsToWatch() {
log.Errorf("Topo has no shards for ks: %v", ks)
continue
}
for _, s := range shards {
shardsToWatch[topoproto.KeyspaceShardString(ks, s)] = true
}
newShardsToWatch[ks] = shards
}
}
if len(newShardsToWatch) == 0 {
log.Error("No keyspace/shards to watch")
return
}

shardsToWatchMu.Lock()
defer shardsToWatchMu.Unlock()
shardsToWatch = newShardsToWatch
}

// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
// channel for polling.
func OpenTabletDiscovery() <-chan time.Time {
ts = topo.Open()
tmc = inst.InitializeTMC()
// Clear existing cache and perform a new refresh.
if _, err := db.ExecVTOrc("DELETE FROM vitess_tablet"); err != nil {
log.Error(err)
}
// Parse --clusters_to_watch into a filter.
updateShardsToWatch()
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
if err := refreshAllInformation(ctx); err != nil {
log.Errorf("failed to initialize topo information: %+v", err)
}
return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// getAllTablets gets all tablets from all cells using a goroutine per cell.
Expand All @@ -126,27 +152,6 @@ func getAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo {
return tablets
}

// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
// channel for polling.
func OpenTabletDiscovery() <-chan time.Time {
ts = topo.Open()
tmc = inst.InitializeTMC()
// Clear existing cache and perform a new refresh.
if _, err := db.ExecVTOrc("DELETE FROM vitess_tablet"); err != nil {
log.Error(err)
}
// Parse --clusters_to_watch into a filter.
updateShardsToWatch()
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
if err := refreshAllInformation(ctx); err != nil {
log.Errorf("failed to initialize topo information: %+v", err)
}
return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
func refreshAllTablets(ctx context.Context) error {
return refreshTabletsUsing(ctx, func(tabletAlias string) {
Expand Down Expand Up @@ -179,9 +184,11 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f
shardsToWatchMu.Lock()
defer shardsToWatchMu.Unlock()
for _, t := range tablets {
shardKey := topoproto.KeyspaceShardString(t.Tablet.Keyspace, t.Tablet.Shard)
if len(shardsToWatch) > 0 && !shardsToWatch[shardKey] {
continue // filter
if len(shardsToWatch) > 0 {
_, ok := shardsToWatch[t.Tablet.Keyspace]
if !ok || !slices.Contains(shardsToWatch[t.Tablet.Keyspace], t.Tablet.Shard) {
continue // filter
}
}
matchedTablets = append(matchedTablets, t)
}
Expand All @@ -190,7 +197,6 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f
// Refresh the filtered tablets.
query := "select alias from vitess_tablet"
refreshTablets(matchedTablets, query, nil, loader, forceRefresh, nil)

return nil
}

Expand Down
27 changes: 13 additions & 14 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,50 +121,49 @@ func TestUpdateShardsToWatch(t *testing.T) {

testCases := []struct {
in []string
expected map[string]bool
expected map[string][]string
}{
{
in: []string{},
expected: map[string]bool{},
expected: nil,
},
{
in: []string{""},
expected: map[string]bool{},
expected: map[string][]string{},
},
{
in: []string{"test/-"},
expected: map[string]bool{
"test/-": true,
expected: map[string][]string{
"test": []string{"-"},
},
},
{
in: []string{"test/-", "test2/-80", "test2/80-"},
expected: map[string]bool{
"test/-": true,
"test2/-80": true,
"test2/80-": true,
expected: map[string][]string{
"test": []string{"-"},
"test2": []string{"-80", "80-"},
},
},
{
// confirm shards fetch from topo
in: []string{keyspace},
expected: map[string]bool{
topoproto.KeyspaceShardString(keyspace, shard): true,
expected: map[string][]string{
keyspace: []string{shard},
},
},
{
// confirm shards fetch from topo when keyspace has trailing-slash
in: []string{keyspace + "/"},
expected: map[string]bool{
topoproto.KeyspaceShardString(keyspace, shard): true,
expected: map[string][]string{
keyspace: []string{shard},
},
},
}

for _, testCase := range testCases {
t.Run(strings.Join(testCase.in, ","), func(t *testing.T) {
defer func() {
shardsToWatch = make(map[string]bool, 0)
shardsToWatch = make(map[string][]string, 0)
}()
clustersToWatch = testCase.in
updateShardsToWatch()
Expand Down

0 comments on commit 0df0848

Please sign in to comment.