Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve efficiency of vtorc topo calls #17071

Merged
merged 12 commits into from
Nov 6, 2024
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
1 change: 1 addition & 0 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
servenv.OnParseFor("vtorc", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
Expand Down
86 changes: 77 additions & 9 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,27 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"path"
"slices"
"sort"
"strings"
"sync"
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/events"
"vitess.io/vitess/go/vt/topo/topoproto"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
)

const (
Expand Down Expand Up @@ -553,7 +552,6 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac
span.Annotate("shard", shard)
span.Annotate("num_cells", len(cells))
defer span.Finish()
ctx = trace.NewContext(ctx, span)
var err error

// The caller intents to all cells
Expand Down Expand Up @@ -597,7 +595,7 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac
case IsErrType(err, NoNode):
// There is no shard replication for this shard in this cell. NOOP
default:
rec.RecordError(vterrors.Wrap(err, fmt.Sprintf("GetShardReplication(%v, %v, %v) failed.", cell, keyspace, shard)))
rec.RecordError(vterrors.Wrapf(err, "GetShardReplication(%v, %v, %v) failed.", cell, keyspace, shard))
return
}
}(cell)
Expand All @@ -616,6 +614,76 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac
return result, err
}

// GetTabletsByShard returns the tablets in the given shard using all cells.
// It can return ErrPartialResult if it couldn't read all the cells, or all
// the individual tablets, in which case the result is valid, but partial.
func (ts *Server) GetTabletsByShard(ctx context.Context, keyspace, shard string) ([]*TabletInfo, error) {
return ts.GetTabletsByShardCell(ctx, keyspace, shard, nil)
}

// GetTabletsByShardCell returns the tablets in the given shard. It can return
// ErrPartialResult if it couldn't read all the cells, or all the individual
// tablets, in which case the result is valid, but partial.
func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard string, cells []string) ([]*TabletInfo, error) {
span, ctx := trace.NewSpan(ctx, "topo.GetTabletsByShardCell")
span.Annotate("keyspace", keyspace)
span.Annotate("shard", shard)
span.Annotate("num_cells", len(cells))
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
defer span.Finish()
var err error

if len(cells) == 0 {
cells, err = ts.GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
if len(cells) == 0 { // Nothing to do
return nil, nil
}
}

// Divide the concurrency limit by the number of cells. If there are more
// cells than the limit, default to concurrency of 1.
cellConcurrency := 1
if len(cells) < DefaultConcurrency {
cellConcurrency = DefaultConcurrency / len(cells)
}

mu := sync.Mutex{}
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(DefaultConcurrency)

tablets := make([]*TabletInfo, 0, len(cells))
var kss *KeyspaceShard
if keyspace != "" {
kss = &KeyspaceShard{
Keyspace: keyspace,
Shard: shard,
}
}
options := &GetTabletsByCellOptions{
Concurrency: cellConcurrency,
KeyspaceShard: kss,
}
for _, cell := range cells {
eg.Go(func() error {
t, err := ts.GetTabletsByCell(ctx, cell, options)
if err != nil {
return vterrors.Wrapf(err, "GetTabletsByCell for %v failed.", cell)
}
mu.Lock()
mattlord marked this conversation as resolved.
Show resolved Hide resolved
defer mu.Unlock()
tablets = append(tablets, t...)
return nil
})
}
if err := eg.Wait(); err != nil {
log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err)
return tablets, NewError(PartialResult, shard)
}
return tablets, nil
}

// GetTabletMapForShard returns the tablets for a shard. It can return
// ErrPartialResult if it couldn't read all the cells, or all
// the individual tablets, in which case the map is valid, but partial.
Expand Down
21 changes: 18 additions & 3 deletions go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t
type GetTabletsByCellOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetTablet.
Concurrency int
// KeyspaceShard is the optional keyspace/shard that tablets must match.
// An empty shard value will match all shards in the keyspace.
KeyspaceShard *KeyspaceShard
}

// GetTabletsByCell returns all the tablets in the cell.
Expand Down Expand Up @@ -263,15 +266,27 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G
return nil, err
}

tablets := make([]*TabletInfo, len(listResults))
var capHint int
if opt != nil && opt.KeyspaceShard == nil {
capHint = len(listResults)
}

tablets := make([]*TabletInfo, 0, capHint)
for n := range listResults {
tablet := &topodatapb.Tablet{}
if err := tablet.UnmarshalVT(listResults[n].Value); err != nil {
return nil, err
}
tablets[n] = &TabletInfo{Tablet: tablet, version: listResults[n].Version}
if opt != nil && opt.KeyspaceShard != nil && opt.KeyspaceShard.Keyspace != "" {
if opt.KeyspaceShard.Keyspace != tablet.Keyspace {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if opt.KeyspaceShard.Shard != "" && opt.KeyspaceShard.Shard != tablet.Shard {
continue
}
}
tablets = append(tablets, &TabletInfo{Tablet: tablet, version: listResults[n].Version})
}

return tablets, nil
}

Expand Down
Loading
Loading