From 81bc0d921a0a3c839fbcb9882dca355f3c578975 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 4 Dec 2024 14:11:09 +0100 Subject: [PATCH 01/10] Ensure all topo read calls consider `--topo_read_concurrency` Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtbackup.txt | 1 + go/flags/endtoend/vtcombo.txt | 2 +- go/flags/endtoend/vtctld.txt | 2 +- go/flags/endtoend/vtgate.txt | 2 +- go/flags/endtoend/vtorc.txt | 2 +- go/flags/endtoend/vttablet.txt | 1 + go/vt/discovery/healthcheck.go | 2 +- go/vt/discovery/topology_watcher.go | 29 +++++++++++------------- go/vt/topo/keyspace.go | 28 +++++------------------ go/vt/topo/server.go | 17 +++++++++++--- go/vt/topo/shard.go | 9 -------- go/vt/topo/stats_conn.go | 26 +++++++++++++++++++++- go/vt/topo/stats_conn_test.go | 22 ++++++++++-------- go/vt/topo/tablet.go | 32 ++++----------------------- go/vt/topo/tablet_test.go | 5 ----- go/vt/vtorc/logic/tablet_discovery.go | 2 +- 16 files changed, 82 insertions(+), 100 deletions(-) diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index 25730ab892f..49878247f7d 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -235,6 +235,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 126b073c6b5..45e46880b2e 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -379,7 +379,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index 325c4ae30cd..4e9f2e5322b 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -169,7 +169,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 07353a0beaa..b5068dc702e 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -227,7 +227,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index e36c35924b1..516e797e284 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -107,7 +107,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 502b81f1f6a..8278aefd4f9 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -381,6 +381,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 47146988de2..82e45a8b707 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -382,7 +382,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if c == "" { continue } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultReadConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 64346d524ad..6581149f520 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -26,16 +26,13 @@ import ( "sync" "time" - "vitess.io/vitess/go/vt/topo/topoproto" - - "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/stats" "vitess.io/vitess/go/trace" - + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/topodata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" ) const ( @@ -56,7 +53,7 @@ var ( // tabletInfo is used internally by the TopologyWatcher struct. type tabletInfo struct { alias string - tablet *topodata.Tablet + tablet *topodatapb.Tablet } // TopologyWatcher polls the topology periodically for changes to @@ -70,7 +67,7 @@ type TopologyWatcher struct { cell string refreshInterval time.Duration refreshKnownTablets bool - concurrency int + concurrency int64 ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -92,7 +89,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, @@ -112,7 +109,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC } func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) { - return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency}) + return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil) } // Start starts the topology watcher. @@ -271,14 +268,14 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 { // to be applied as an additional filter on the list of tablets returned by its getTablets function. type TabletFilter interface { // IsIncluded returns whether tablet is included in this filter - IsIncluded(tablet *topodata.Tablet) bool + IsIncluded(tablet *topodatapb.Tablet) bool } // TabletFilters contains filters for tablets. type TabletFilters []TabletFilter // IsIncluded returns true if a tablet passes all filters. -func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool { +func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool { for _, filter := range tf { if !filter.IsIncluded(tablet) { return false @@ -299,7 +296,7 @@ type FilterByShard struct { type filterShard struct { keyspace string shard string - keyRange *topodata.KeyRange // only set if shard is also a KeyRange + keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange } // NewFilterByShard creates a new FilterByShard for use by a @@ -344,7 +341,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) { } // IsIncluded returns true iff the tablet's keyspace and shard match what we have. -func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool { +func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool { canonical, kr, err := topo.ValidateShardName(tablet.Shard) if err != nil { log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err) @@ -384,7 +381,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace { } // IsIncluded returns true if the tablet's keyspace matches what we have. -func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool { +func (fbk *FilterByKeyspace) IsIncluded(tablet *topodatapb.Tablet) bool { _, exist := fbk.keyspaces[tablet.Keyspace] return exist } @@ -403,7 +400,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { } // IsIncluded returns true if the tablet's tags match what we expect. -func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { +func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodatapb.Tablet) bool { if fbtg.tags == nil { return true } diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 08b1c59f4cc..4e1d19ec79a 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -22,44 +22,26 @@ import ( "sort" "sync" - "github.com/spf13/pflag" "golang.org/x/sync/errgroup" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/event" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/vterrors" - - "vitess.io/vitess/go/event" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/topo/events" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo/events" + "vitess.io/vitess/go/vt/vterrors" ) // This file contains keyspace utility functions. -// Default concurrency to use in order to avoid overhwelming the topo server. -var DefaultConcurrency = 32 - // shardKeySuffix is the suffix of a shard key. // The full key looks like this: // /vitess/global/keyspaces/customer/shards/80-/Shard const shardKeySuffix = "Shard" -func registerFlags(fs *pflag.FlagSet) { - fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.") -} - -func init() { - servenv.OnParseFor("vtcombo", registerFlags) - servenv.OnParseFor("vtctld", registerFlags) - servenv.OnParseFor("vtgate", registerFlags) - servenv.OnParseFor("vtorc", registerFlags) -} - // KeyspaceInfo is a meta struct that contains metadata to give the // data more context and convenience. This is the main way we interact // with a keyspace. @@ -198,7 +180,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error { type FindAllShardsInKeyspaceOptions struct { // Concurrency controls the maximum number of concurrent calls to GetShard. // If <= 0, Concurrency is set to 1. - Concurrency int + Concurrency int64 } // FindAllShardsInKeyspace reads and returns all the existing shards in a @@ -212,7 +194,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt = &FindAllShardsInKeyspaceOptions{} } if opt.Concurrency <= 0 { - opt.Concurrency = DefaultConcurrency + opt.Concurrency = DefaultReadConcurrency } // Unescape the keyspace name as this can e.g. come from the VSchema where diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 1995e8b6ec4..49d2e3bc12f 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -48,6 +48,7 @@ import ( "sync" "github.com/spf13/pflag" + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/topodata" @@ -141,6 +142,10 @@ type Server struct { // will read the list of addresses for that cell from the // global cluster and create clients as needed. cellConns map[string]cellConn + + // cellReadSem is a semaphore limiting the number of concurrent read + // operations to all cell-based topos. + cellReadSem *semaphore.Weighted } type cellConn struct { @@ -175,6 +180,9 @@ var ( FlagBinaries = []string{"vttablet", "vtctl", "vtctld", "vtcombo", "vtgate", "vtorc", "vtbackup"} + + // Default read concurrency to use in order to avoid overhwelming the topo server. + DefaultReadConcurrency int64 = 32 ) func init() { @@ -187,6 +195,7 @@ func registerTopoFlags(fs *pflag.FlagSet) { fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use") fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server") fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server") + fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads.") } // RegisterFactory registers a Factory for an implementation for a Server. @@ -202,11 +211,12 @@ func RegisterFactory(name string, factory Factory) { // NewWithFactory creates a new Server based on the given Factory. // It also opens the global cell connection. func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error) { + globalReadSem := semaphore.NewWeighted(DefaultReadConcurrency) conn, err := factory.Create(GlobalCell, serverAddress, root) if err != nil { return nil, err } - conn = NewStatsConn(GlobalCell, conn) + conn = NewStatsConn(GlobalCell, conn, globalReadSem) var connReadOnly Conn if factory.HasGlobalReadOnlyCell(serverAddress, root) { @@ -214,7 +224,7 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error if err != nil { return nil, err } - connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly) + connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly, globalReadSem) } else { connReadOnly = conn } @@ -224,6 +234,7 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error globalReadOnlyCell: connReadOnly, factory: factory, cellConns: make(map[string]cellConn), + cellReadSem: semaphore.NewWeighted(DefaultReadConcurrency), }, nil } @@ -295,7 +306,7 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) { conn, err := ts.factory.Create(cell, ci.ServerAddress, ci.Root) switch { case err == nil: - conn = NewStatsConn(cell, conn) + conn = NewStatsConn(cell, conn, ts.cellReadSem) ts.cellConns[cell] = cellConn{ci, conn} return conn, nil case IsErrType(err, NoNode): diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 220510e71f5..613a9f265ba 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -671,16 +671,8 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str } } - // Divide the concurrency limit by the number of cells. If there are more - // cells than the limit, default to concurrency of 1. - cellConcurrency := 1 - if len(cells) < DefaultConcurrency { - cellConcurrency = DefaultConcurrency / len(cells) - } - mu := sync.Mutex{} eg, ctx := errgroup.WithContext(ctx) - eg.SetLimit(DefaultConcurrency) tablets := make([]*TabletInfo, 0, len(cells)) var kss *KeyspaceShard @@ -691,7 +683,6 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str } } options := &GetTabletsByCellOptions{ - Concurrency: cellConcurrency, KeyspaceShard: kss, } for _, cell := range cells { diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 08f44c0f75e..25a54fdfa18 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -20,6 +20,8 @@ import ( "context" "time" + "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -33,6 +35,11 @@ var ( "TopologyConnOperations timings", []string{"Operation", "Cell"}) + topoStatsConnWaitTimings = stats.NewMultiTimings( + "TopologyConnWaits", + "TopologyConnWait timings", + []string{"Operation", "Cell"}) + topoStatsConnErrors = stats.NewCountersWithMultiLabels( "TopologyConnErrors", "TopologyConnErrors errors per operation", @@ -46,14 +53,16 @@ type StatsConn struct { cell string conn Conn readOnly bool + readSem *semaphore.Weighted } // NewStatsConn returns a StatsConn -func NewStatsConn(cell string, conn Conn) *StatsConn { +func NewStatsConn(cell string, conn Conn, readSem *semaphore.Weighted) *StatsConn { return &StatsConn{ cell: cell, conn: conn, readOnly: false, + readSem: readSem, } } @@ -61,6 +70,11 @@ func NewStatsConn(cell string, conn Conn) *StatsConn { func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([]DirEntry, error) { startTime := time.Now() statsKey := []string{"ListDir", st.cell} + if err := st.readSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer st.readSem.Release(1) + topoStatsConnWaitTimings.Record(statsKey, startTime) defer topoStatsConnTimings.Record(statsKey, startTime) res, err := st.conn.ListDir(ctx, dirPath, full) if err != nil { @@ -106,6 +120,11 @@ func (st *StatsConn) Update(ctx context.Context, filePath string, contents []byt func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, error) { startTime := time.Now() statsKey := []string{"Get", st.cell} + if err := st.readSem.Acquire(ctx, 1); err != nil { + return nil, nil, err + } + defer st.readSem.Release(1) + topoStatsConnWaitTimings.Record(statsKey, startTime) defer topoStatsConnTimings.Record(statsKey, startTime) bytes, version, err := st.conn.Get(ctx, filePath) if err != nil { @@ -119,6 +138,11 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, error) { startTime := time.Now() statsKey := []string{"List", st.cell} + if err := st.readSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer st.readSem.Release(1) + topoStatsConnWaitTimings.Record(statsKey, startTime) defer topoStatsConnTimings.Record(statsKey, startTime) bytes, err := st.conn.List(ctx, filePathPrefix) if err != nil { diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index e26e8c97f31..baefab365be 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -21,10 +21,14 @@ import ( "fmt" "testing" + "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" ) +var testStatsConnReadSem = semaphore.NewWeighted(1) + // The fakeConn is a wrapper for a Conn that emits stats for every operation type fakeConn struct { v Version @@ -153,7 +157,7 @@ func (st *fakeConn) IsReadOnly() bool { // TestStatsConnTopoListDir emits stats on ListDir func TestStatsConnTopoListDir(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.ListDir(ctx, "", true) @@ -180,7 +184,7 @@ func TestStatsConnTopoListDir(t *testing.T) { // TestStatsConnTopoCreate emits stats on Create func TestStatsConnTopoCreate(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Create(ctx, "", []byte{}) @@ -207,7 +211,7 @@ func TestStatsConnTopoCreate(t *testing.T) { // TestStatsConnTopoUpdate emits stats on Update func TestStatsConnTopoUpdate(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Update(ctx, "", []byte{}, conn.v) @@ -234,7 +238,7 @@ func TestStatsConnTopoUpdate(t *testing.T) { // TestStatsConnTopoGet emits stats on Get func TestStatsConnTopoGet(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Get(ctx, "") @@ -261,7 +265,7 @@ func TestStatsConnTopoGet(t *testing.T) { // TestStatsConnTopoDelete emits stats on Delete func TestStatsConnTopoDelete(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Delete(ctx, "", conn.v) @@ -288,7 +292,7 @@ func TestStatsConnTopoDelete(t *testing.T) { // TestStatsConnTopoLock emits stats on Lock func TestStatsConnTopoLock(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Lock(ctx, "", "") @@ -315,7 +319,7 @@ func TestStatsConnTopoLock(t *testing.T) { // TestStatsConnTopoWatch emits stats on Watch func TestStatsConnTopoWatch(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Watch(ctx, "") @@ -329,7 +333,7 @@ func TestStatsConnTopoWatch(t *testing.T) { // TestStatsConnTopoNewLeaderParticipation emits stats on NewLeaderParticipation func TestStatsConnTopoNewLeaderParticipation(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) _, _ = statsConn.NewLeaderParticipation("", "") timingCounts := topoStatsConnTimings.Counts()["NewLeaderParticipation.global"] @@ -355,7 +359,7 @@ func TestStatsConnTopoNewLeaderParticipation(t *testing.T) { // TestStatsConnTopoClose emits stats on Close func TestStatsConnTopoClose(t *testing.T) { conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) statsConn.Close() timingCounts := topoStatsConnTimings.Counts()["Close.global"] diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index 7dccf3bf183..ffc21153a7a 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -24,21 +24,17 @@ import ( "sync" "time" - "golang.org/x/sync/semaphore" - - "vitess.io/vitess/go/protoutil" - "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/event" "vitess.io/vitess/go/netutil" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/events" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" ) // IsTrivialTypeChange returns if this db type be trivially reassigned @@ -287,8 +283,6 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t // GetTabletsByCellOptions controls the behavior of // Server.FindAllShardsInKeyspace. type GetTabletsByCellOptions struct { - // Concurrency controls the maximum number of concurrent calls to GetTablet. - Concurrency int // KeyspaceShard is the optional keyspace/shard that tablets must match. // An empty shard value will match all shards in the keyspace. KeyspaceShard *KeyspaceShard @@ -550,29 +544,11 @@ func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb. returnErr error ) - concurrency := DefaultConcurrency - if opt != nil && opt.Concurrency > 0 { - concurrency = opt.Concurrency - } - var sem = semaphore.NewWeighted(int64(concurrency)) - for _, tabletAlias := range tabletAliases { wg.Add(1) go func(tabletAlias *topodatapb.TabletAlias) { defer wg.Done() - if err := sem.Acquire(ctx, 1); err != nil { - // Only happens if context is cancelled. - mu.Lock() - defer mu.Unlock() - log.Warningf("%v: %v", tabletAlias, err) - // We only need to set this on the first error. - if returnErr == nil { - returnErr = NewError(PartialResult, tabletAlias.GetCell()) - } - return - } tabletInfo, err := ts.GetTablet(ctx, tabletAlias) - sem.Release(1) mu.Lock() defer mu.Unlock() if err != nil { diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go index e659a0d01b9..1c242e8778b 100644 --- a/go/vt/topo/tablet_test.go +++ b/go/vt/topo/tablet_test.go @@ -69,7 +69,6 @@ func TestServerGetTabletsByCell(t *testing.T) { }, }, // Ensure this doesn't panic. - opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, }, { name: "single", @@ -151,7 +150,6 @@ func TestServerGetTabletsByCell(t *testing.T) { Shard: shard, }, }, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, }, { name: "multiple with list error", @@ -210,7 +208,6 @@ func TestServerGetTabletsByCell(t *testing.T) { Shard: shard, }, }, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, listError: topo.NewError(topo.ResourceExhausted, ""), }, { @@ -249,7 +246,6 @@ func TestServerGetTabletsByCell(t *testing.T) { }, }, opt: &topo.GetTabletsByCellOptions{ - Concurrency: 1, KeyspaceShard: &topo.KeyspaceShard{ Keyspace: keyspace, Shard: shard, @@ -317,7 +313,6 @@ func TestServerGetTabletsByCell(t *testing.T) { }, }, opt: &topo.GetTabletsByCellOptions{ - Concurrency: 1, KeyspaceShard: &topo.KeyspaceShard{ Keyspace: keyspace, Shard: "", diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 73cd61676ca..f59efcaaff4 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -202,7 +202,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { } func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) { - tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency}) + tablets, err := ts.GetTabletsByCell(ctx, cell, nil) if err != nil { log.Errorf("Error fetching topo info for cell %v: %v", cell, err) return From a0b15be3a523a479ab9ce5b3c9ac45b68d3c79d5 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 10 Dec 2024 01:15:19 +0100 Subject: [PATCH 02/10] semaphore per cell, update help+changelog Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtbackup.txt | 2 +- go/flags/endtoend/vtcombo.txt | 2 +- go/flags/endtoend/vtctld.txt | 2 +- go/flags/endtoend/vtgate.txt | 2 +- go/flags/endtoend/vtorc.txt | 2 +- go/flags/endtoend/vttablet.txt | 2 +- go/vt/topo/server.go | 10 +++------- 7 files changed, 9 insertions(+), 13 deletions(-) diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index 49878247f7d..151487d5522 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -235,7 +235,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 45e46880b2e..4336385a5f4 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -379,7 +379,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index 4e9f2e5322b..6ef6e4685db 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -169,7 +169,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index b5068dc702e..a56a6893934 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -227,7 +227,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 516e797e284..55e95b5a7a1 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -107,7 +107,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 8278aefd4f9..89f15a24239 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -381,7 +381,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Maximum concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 49d2e3bc12f..897d6b66c67 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -142,10 +142,6 @@ type Server struct { // will read the list of addresses for that cell from the // global cluster and create clients as needed. cellConns map[string]cellConn - - // cellReadSem is a semaphore limiting the number of concurrent read - // operations to all cell-based topos. - cellReadSem *semaphore.Weighted } type cellConn struct { @@ -195,7 +191,7 @@ func registerTopoFlags(fs *pflag.FlagSet) { fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use") fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server") fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server") - fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads.") + fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads per global or local cell") } // RegisterFactory registers a Factory for an implementation for a Server. @@ -234,7 +230,6 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error globalReadOnlyCell: connReadOnly, factory: factory, cellConns: make(map[string]cellConn), - cellReadSem: semaphore.NewWeighted(DefaultReadConcurrency), }, nil } @@ -306,7 +301,8 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) { conn, err := ts.factory.Create(cell, ci.ServerAddress, ci.Root) switch { case err == nil: - conn = NewStatsConn(cell, conn, ts.cellReadSem) + cellReadSem := semaphore.NewWeighted(DefaultReadConcurrency) + conn = NewStatsConn(cell, conn, cellReadSem) ts.cellConns[cell] = cellConn{ci, conn} return conn, nil case IsErrType(err, NoNode): From a5051cf46c51aeafc4c9507313e0bb3fd05c6e4b Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 10 Dec 2024 16:03:23 +0100 Subject: [PATCH 03/10] help typo Signed-off-by: Tim Vaillancourt --- go/vt/topo/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 897d6b66c67..b9bf46a1c3d 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -191,7 +191,7 @@ func registerTopoFlags(fs *pflag.FlagSet) { fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use") fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server") fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server") - fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads per global or local cell") + fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads per global or local cell.") } // RegisterFactory registers a Factory for an implementation for a Server. From eb4a62432faa8f966551422e76daab627bd8a62c Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 10 Dec 2024 01:34:56 +0100 Subject: [PATCH 04/10] better stat name Signed-off-by: Tim Vaillancourt --- go/vt/topo/stats_conn.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 25a54fdfa18..9229c2681e3 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -35,15 +35,15 @@ var ( "TopologyConnOperations timings", []string{"Operation", "Cell"}) - topoStatsConnWaitTimings = stats.NewMultiTimings( - "TopologyConnWaits", - "TopologyConnWait timings", - []string{"Operation", "Cell"}) - topoStatsConnErrors = stats.NewCountersWithMultiLabels( "TopologyConnErrors", "TopologyConnErrors errors per operation", []string{"Operation", "Cell"}) + + topoStatsReadWaitTimings = stats.NewMultiTimings( + "TopologyReadWaits", + "TopologyReadWait timings", + []string{"Operation", "Cell"}) ) const readOnlyErrorStrFormat = "cannot perform %s on %s as the topology server connection is read-only" @@ -74,7 +74,7 @@ func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([] return nil, err } defer st.readSem.Release(1) - topoStatsConnWaitTimings.Record(statsKey, startTime) + topoStatsReadWaitTimings.Record(statsKey, startTime) defer topoStatsConnTimings.Record(statsKey, startTime) res, err := st.conn.ListDir(ctx, dirPath, full) if err != nil { @@ -124,7 +124,7 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, return nil, nil, err } defer st.readSem.Release(1) - topoStatsConnWaitTimings.Record(statsKey, startTime) + topoStatsReadWaitTimings.Record(statsKey, startTime) defer topoStatsConnTimings.Record(statsKey, startTime) bytes, version, err := st.conn.Get(ctx, filePath) if err != nil { @@ -142,7 +142,7 @@ func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, return nil, err } defer st.readSem.Release(1) - topoStatsConnWaitTimings.Record(statsKey, startTime) + topoStatsReadWaitTimings.Record(statsKey, startTime) defer topoStatsConnTimings.Record(statsKey, startTime) bytes, err := st.conn.List(ctx, filePathPrefix) if err != nil { From 50b95e1f22e9b77f6a69b51a4b95bb4a4cf79657 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 10 Dec 2024 16:28:02 +0100 Subject: [PATCH 05/10] better stat name again Signed-off-by: Tim Vaillancourt --- go/vt/topo/stats_conn.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 9229c2681e3..89831596ea4 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -40,9 +40,9 @@ var ( "TopologyConnErrors errors per operation", []string{"Operation", "Cell"}) - topoStatsReadWaitTimings = stats.NewMultiTimings( - "TopologyReadWaits", - "TopologyReadWait timings", + topoStatsConnReadWaitTimings = stats.NewMultiTimings( + "TopologyConnReadWaits", + "TopologyConnReadWait timings", []string{"Operation", "Cell"}) ) @@ -74,7 +74,7 @@ func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([] return nil, err } defer st.readSem.Release(1) - topoStatsReadWaitTimings.Record(statsKey, startTime) + topoStatsConnReadWaitTimings.Record(statsKey, startTime) defer topoStatsConnTimings.Record(statsKey, startTime) res, err := st.conn.ListDir(ctx, dirPath, full) if err != nil { @@ -124,7 +124,7 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, return nil, nil, err } defer st.readSem.Release(1) - topoStatsReadWaitTimings.Record(statsKey, startTime) + topoStatsConnReadWaitTimings.Record(statsKey, startTime) defer topoStatsConnTimings.Record(statsKey, startTime) bytes, version, err := st.conn.Get(ctx, filePath) if err != nil { @@ -142,7 +142,7 @@ func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, return nil, err } defer st.readSem.Release(1) - topoStatsReadWaitTimings.Record(statsKey, startTime) + topoStatsConnReadWaitTimings.Record(statsKey, startTime) defer topoStatsConnTimings.Record(statsKey, startTime) bytes, err := st.conn.List(ctx, filePathPrefix) if err != nil { From 48a72eb21ebe7bfcdd56d484b12ba99c2aec2773 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 11 Dec 2024 00:18:26 +0100 Subject: [PATCH 06/10] reset op start time Signed-off-by: Tim Vaillancourt --- go/vt/topo/stats_conn.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 89831596ea4..65c2b8b479a 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -75,6 +75,7 @@ func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([] } defer st.readSem.Release(1) topoStatsConnReadWaitTimings.Record(statsKey, startTime) + startTime = time.Now() // reset defer topoStatsConnTimings.Record(statsKey, startTime) res, err := st.conn.ListDir(ctx, dirPath, full) if err != nil { @@ -125,6 +126,7 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, } defer st.readSem.Release(1) topoStatsConnReadWaitTimings.Record(statsKey, startTime) + startTime = time.Now() // reset defer topoStatsConnTimings.Record(statsKey, startTime) bytes, version, err := st.conn.Get(ctx, filePath) if err != nil { @@ -143,6 +145,7 @@ func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, } defer st.readSem.Release(1) topoStatsConnReadWaitTimings.Record(statsKey, startTime) + startTime = time.Now() // reset defer topoStatsConnTimings.Record(statsKey, startTime) bytes, err := st.conn.List(ctx, filePathPrefix) if err != nil { From 1142ced7b39c1c16cb50a3dd16dabea491bcacbb Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sat, 14 Dec 2024 23:43:34 +0100 Subject: [PATCH 07/10] remove unused `concurrency` field from topo watcher Signed-off-by: Tim Vaillancourt --- go/vt/discovery/healthcheck.go | 2 +- go/vt/discovery/topology_watcher.go | 4 +--- go/vt/discovery/topology_watcher_test.go | 10 +++++----- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 82e45a8b707..be1e6b24be9 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -382,7 +382,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if c == "" { continue } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultReadConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 6581149f520..d1e358e1aa5 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -67,7 +67,6 @@ type TopologyWatcher struct { cell string refreshInterval time.Duration refreshKnownTablets bool - concurrency int64 ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -89,7 +88,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, @@ -97,7 +96,6 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC cell: cell, refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, - concurrency: topoReadConcurrency, tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 834fdcb1afe..b84347d38ce 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -67,7 +67,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { fhc := NewFakeHealthCheck(nil) defer fhc.Close() topologyWatcherOperations.ZeroAll() - tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true) done := make(chan bool, 3) result := make(chan bool, 1) @@ -127,7 +127,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -421,7 +421,7 @@ func TestFilterByKeyspace(t *testing.T) { f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} ts := memorytopo.NewServer(ctx, testCell) defer ts.Close() - tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true) for _, test := range testFilterByKeyspace { // Add a new tablet to the topology. @@ -502,7 +502,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} - tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -639,7 +639,7 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { defer fhc.Close() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true) defer tw.Stop() // Force fallback to getting tablets individually. From a1710ba6b131e094026aa8edeadfa9468d04c11d Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 16 Dec 2024 19:28:12 +0100 Subject: [PATCH 08/10] simulate semaphore contention, test Signed-off-by: Tim Vaillancourt --- go/vt/topo/stats_conn_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index baefab365be..16eada5a324 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -154,18 +154,33 @@ func (st *fakeConn) IsReadOnly() bool { return st.readOnly } +// createSemaphoreContention simulates semaphore contention on the test read semaphore. +func createSemaphoreContention(ctx context.Context, duration time.Duration) { + if err := testStatsConnReadSem.Acquire(ctx, 1); err != nil { + panic(err) + } + defer testStatsConnReadSem.Release(1) + time.Sleep(duration) +} + // TestStatsConnTopoListDir emits stats on ListDir func TestStatsConnTopoListDir(t *testing.T) { conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() + go createSemaphoreContention(ctx, 100*time.Millisecond) statsConn.ListDir(ctx, "", true) timingCounts := topoStatsConnTimings.Counts()["ListDir.global"] if got, want := timingCounts, int64(1); got != want { t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) } + waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["ListDir.global"] + if got := waitTimingsCounts; got != 1 { + t.Errorf("stats were not properly recorded: got = %d, want = 1", got) + } + // error is zero before getting an error errorCount := topoStatsConnErrors.Counts()["ListDir.global"] if got, want := errorCount, int64(0); got != want { @@ -241,12 +256,18 @@ func TestStatsConnTopoGet(t *testing.T) { statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() + go createSemaphoreContention(ctx, time.Millisecond*100) statsConn.Get(ctx, "") timingCounts := topoStatsConnTimings.Counts()["Get.global"] if got, want := timingCounts, int64(1); got != want { t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) } + waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["Get.global"] + if got := waitTimingsCounts; got != 1 { + t.Errorf("stats were not properly recorded: got = %d, want = 1", got) + } + // error is zero before getting an error errorCount := topoStatsConnErrors.Counts()["Get.global"] if got, want := errorCount, int64(0); got != want { From 77e93a5dcd99c3c181a17b70120b4d99247ed418 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 16 Dec 2024 19:33:06 +0100 Subject: [PATCH 09/10] func rename Signed-off-by: Tim Vaillancourt --- go/vt/topo/stats_conn_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index 16eada5a324..c5ef0d15f41 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -154,8 +154,8 @@ func (st *fakeConn) IsReadOnly() bool { return st.readOnly } -// createSemaphoreContention simulates semaphore contention on the test read semaphore. -func createSemaphoreContention(ctx context.Context, duration time.Duration) { +// createTestReadSemaphoreContention simulates semaphore contention on the test read semaphore. +func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration) { if err := testStatsConnReadSem.Acquire(ctx, 1); err != nil { panic(err) } @@ -169,7 +169,7 @@ func TestStatsConnTopoListDir(t *testing.T) { statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() - go createSemaphoreContention(ctx, 100*time.Millisecond) + go createTestReadSemaphoreContention(ctx, 100*time.Millisecond) statsConn.ListDir(ctx, "", true) timingCounts := topoStatsConnTimings.Counts()["ListDir.global"] if got, want := timingCounts, int64(1); got != want { @@ -256,7 +256,7 @@ func TestStatsConnTopoGet(t *testing.T) { statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() - go createSemaphoreContention(ctx, time.Millisecond*100) + go createTestReadSemaphoreContention(ctx, time.Millisecond*100) statsConn.Get(ctx, "") timingCounts := topoStatsConnTimings.Counts()["Get.global"] if got, want := timingCounts, int64(1); got != want { From cd189c5ac83ecf8f978c77380bb4efa5192d8dfb Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 16 Dec 2024 19:43:32 +0100 Subject: [PATCH 10/10] goimports Signed-off-by: Tim Vaillancourt --- go/vt/topo/stats_conn_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index c5ef0d15f41..730a2fc1613 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "testing" + "time" "golang.org/x/sync/semaphore"