Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slack-19.0: Ensure all topo read calls consider --topo_read_concurrency #571

Open
wants to merge 12 commits into
base: slack-19.0
Choose a base branch
from
1 change: 1 addition & 0 deletions go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets))
}

hc.topoWatchers = topoWatchers
Expand Down
29 changes: 12 additions & 17 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/topo/topoproto"

"vitess.io/vitess/go/vt/key"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"

"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
)

const (
Expand All @@ -56,7 +53,7 @@ var (
// tabletInfo is used internally by the TopologyWatcher struct.
type tabletInfo struct {
alias string
tablet *topodata.Tablet
tablet *topodatapb.Tablet
}

// TopologyWatcher polls the topology periodically for changes to
Expand All @@ -70,7 +67,6 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
concurrency int
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -92,15 +88,14 @@ type TopologyWatcher struct {

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
tabletFilter: f,
cell: cell,
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
concurrency: topoReadConcurrency,
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})
Expand All @@ -112,7 +107,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
}

func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency})
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil)
}

// Start starts the topology watcher.
Expand Down Expand Up @@ -271,14 +266,14 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 {
// to be applied as an additional filter on the list of tablets returned by its getTablets function.
type TabletFilter interface {
// IsIncluded returns whether tablet is included in this filter
IsIncluded(tablet *topodata.Tablet) bool
IsIncluded(tablet *topodatapb.Tablet) bool
}

// TabletFilters contains filters for tablets.
type TabletFilters []TabletFilter

// IsIncluded returns true if a tablet passes all filters.
func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool {
for _, filter := range tf {
if !filter.IsIncluded(tablet) {
return false
Expand All @@ -299,7 +294,7 @@ type FilterByShard struct {
type filterShard struct {
keyspace string
shard string
keyRange *topodata.KeyRange // only set if shard is also a KeyRange
keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange
}

// NewFilterByShard creates a new FilterByShard for use by a
Expand Down Expand Up @@ -344,7 +339,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) {
}

// IsIncluded returns true iff the tablet's keyspace and shard match what we have.
func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool {
canonical, kr, err := topo.ValidateShardName(tablet.Shard)
if err != nil {
log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err)
Expand Down Expand Up @@ -384,7 +379,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
}

// IsIncluded returns true if the tablet's keyspace matches what we have.
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodatapb.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}
Expand All @@ -403,7 +398,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
}

// IsIncluded returns true if the tablet's tags match what we expect.
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodatapb.Tablet) bool {
if fbtg.tags == nil {
return true
}
Expand Down
10 changes: 5 additions & 5 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) {
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true)

done := make(chan bool, 3)
result := make(chan bool, 1)
Expand Down Expand Up @@ -127,7 +127,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
logger := logutil.NewMemoryLogger()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestFilterByKeyspace(t *testing.T) {
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
ts := memorytopo.NewServer(ctx, testCell)
defer ts.Close()
tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5)
tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true)

for _, test := range testFilterByKeyspace {
// Add a new tablet to the topology.
Expand Down Expand Up @@ -502,7 +502,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
Expand Down Expand Up @@ -639,7 +639,7 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true)
defer tw.Stop()

// Force fallback to getting tablets individually.
Expand Down
28 changes: 5 additions & 23 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,26 @@ import (
"sort"
"sync"

"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/event"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/events"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/events"
"vitess.io/vitess/go/vt/vterrors"
)

// This file contains keyspace utility functions.

// Default concurrency to use in order to avoid overhwelming the topo server.
var DefaultConcurrency = 32

// shardKeySuffix is the suffix of a shard key.
// The full key looks like this:
// /vitess/global/keyspaces/customer/shards/80-/Shard
const shardKeySuffix = "Shard"

func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.")
}

func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
servenv.OnParseFor("vtorc", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
// data more context and convenience. This is the main way we interact
// with a keyspace.
Expand Down Expand Up @@ -198,7 +180,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error {
type FindAllShardsInKeyspaceOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetShard.
// If <= 0, Concurrency is set to 1.
Concurrency int
Concurrency int64
}

// FindAllShardsInKeyspace reads and returns all the existing shards in a
Expand All @@ -212,7 +194,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
opt = &FindAllShardsInKeyspaceOptions{}
}
if opt.Concurrency <= 0 {
opt.Concurrency = DefaultConcurrency
opt.Concurrency = DefaultReadConcurrency
}

// Unescape the keyspace name as this can e.g. come from the VSchema where
Expand Down
13 changes: 10 additions & 3 deletions go/vt/topo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"sync"

"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -175,6 +176,9 @@ var (

FlagBinaries = []string{"vttablet", "vtctl", "vtctld", "vtcombo", "vtgate",
"vtorc", "vtbackup"}

// Default read concurrency to use in order to avoid overhwelming the topo server.
DefaultReadConcurrency int64 = 32
)

func init() {
Expand All @@ -187,6 +191,7 @@ func registerTopoFlags(fs *pflag.FlagSet) {
fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use")
fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server")
fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server")
fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads per global or local cell.")
}

// RegisterFactory registers a Factory for an implementation for a Server.
Expand All @@ -202,19 +207,20 @@ func RegisterFactory(name string, factory Factory) {
// NewWithFactory creates a new Server based on the given Factory.
// It also opens the global cell connection.
func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error) {
globalReadSem := semaphore.NewWeighted(DefaultReadConcurrency)
conn, err := factory.Create(GlobalCell, serverAddress, root)
if err != nil {
return nil, err
}
conn = NewStatsConn(GlobalCell, conn)
conn = NewStatsConn(GlobalCell, conn, globalReadSem)

var connReadOnly Conn
if factory.HasGlobalReadOnlyCell(serverAddress, root) {
connReadOnly, err = factory.Create(GlobalReadOnlyCell, serverAddress, root)
if err != nil {
return nil, err
}
connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly)
connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly, globalReadSem)
} else {
connReadOnly = conn
}
Expand Down Expand Up @@ -295,7 +301,8 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) {
conn, err := ts.factory.Create(cell, ci.ServerAddress, ci.Root)
switch {
case err == nil:
conn = NewStatsConn(cell, conn)
cellReadSem := semaphore.NewWeighted(DefaultReadConcurrency)
conn = NewStatsConn(cell, conn, cellReadSem)
ts.cellConns[cell] = cellConn{ci, conn}
return conn, nil
case IsErrType(err, NoNode):
Expand Down
9 changes: 0 additions & 9 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,16 +671,8 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
}
}

// Divide the concurrency limit by the number of cells. If there are more
// cells than the limit, default to concurrency of 1.
cellConcurrency := 1
if len(cells) < DefaultConcurrency {
cellConcurrency = DefaultConcurrency / len(cells)
}

mu := sync.Mutex{}
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(DefaultConcurrency)

tablets := make([]*TabletInfo, 0, len(cells))
var kss *KeyspaceShard
Expand All @@ -691,7 +683,6 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
}
}
options := &GetTabletsByCellOptions{
Concurrency: cellConcurrency,
KeyspaceShard: kss,
}
for _, cell := range cells {
Expand Down
Loading
Loading