diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index 25730ab892f..151487d5522 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -235,6 +235,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 126b073c6b5..4336385a5f4 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -379,7 +379,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index 325c4ae30cd..6ef6e4685db 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -169,7 +169,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 07353a0beaa..a56a6893934 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -227,7 +227,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 5fbfe95dc49..ba500a3732b 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -25,7 +25,7 @@ Flags: --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. --catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified --change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED - --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80" + --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/keyranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80" --config string config file name --config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored. --config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn) @@ -107,7 +107,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use - --topo_read_concurrency int Concurrency of topo reads. (default 32) + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 502b81f1f6a..89f15a24239 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -381,6 +381,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 7dd5c50eefa..13e72632221 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -48,10 +48,10 @@ func TestAPIEndpoints(t *testing.T) { status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool { return code == 0 }) - // When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false. - assert.Equal(t, 500, status) + // When VTOrc starts it runs OpenTabletDiscovery(), which triggers a topo-refresh. VTOrc should be healthy and HasDiscovered should be true. + assert.Equal(t, 200, status) assert.Contains(t, resp, `"Healthy": true,`) - assert.Contains(t, resp, `"DiscoveredOnce": false`) + assert.Contains(t, resp, `"DiscoveredOnce": true`) // find primary from topo primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) @@ -96,6 +96,24 @@ func TestAPIEndpoints(t *testing.T) { return response != "null" }) + t.Run("Database State", func(t *testing.T) { + // Get database state + status, resp, err := utils.MakeAPICall(t, vtorc, "/api/database-state") + require.NoError(t, err) + assert.Equal(t, 200, status) + assert.Contains(t, resp, `"alias": "zone1-0000000101"`) + assert.Contains(t, resp, `{ + "TableName": "vitess_keyspace", + "Rows": [ + { + "durability_policy": "none", + "keyspace": "ks", + "keyspace_type": "0" + } + ] + },`) + }) + t.Run("Disable Recoveries API", func(t *testing.T) { // Disable recoveries of VTOrc status, resp, err := utils.MakeAPICall(t, vtorc, "/api/disable-global-recoveries") diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 0531a8aee13..010cfa9eab3 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -381,7 +381,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if c == "" { continue } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 64346d524ad..d1e358e1aa5 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -26,16 +26,13 @@ import ( "sync" "time" - "vitess.io/vitess/go/vt/topo/topoproto" - - "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/stats" "vitess.io/vitess/go/trace" - + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/topodata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" ) const ( @@ -56,7 +53,7 @@ var ( // tabletInfo is used internally by the TopologyWatcher struct. type tabletInfo struct { alias string - tablet *topodata.Tablet + tablet *topodatapb.Tablet } // TopologyWatcher polls the topology periodically for changes to @@ -70,7 +67,6 @@ type TopologyWatcher struct { cell string refreshInterval time.Duration refreshKnownTablets bool - concurrency int ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -92,7 +88,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, @@ -100,7 +96,6 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC cell: cell, refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, - concurrency: topoReadConcurrency, tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) @@ -112,7 +107,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC } func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) { - return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency}) + return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil) } // Start starts the topology watcher. @@ -271,14 +266,14 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 { // to be applied as an additional filter on the list of tablets returned by its getTablets function. type TabletFilter interface { // IsIncluded returns whether tablet is included in this filter - IsIncluded(tablet *topodata.Tablet) bool + IsIncluded(tablet *topodatapb.Tablet) bool } // TabletFilters contains filters for tablets. type TabletFilters []TabletFilter // IsIncluded returns true if a tablet passes all filters. -func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool { +func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool { for _, filter := range tf { if !filter.IsIncluded(tablet) { return false @@ -299,7 +294,7 @@ type FilterByShard struct { type filterShard struct { keyspace string shard string - keyRange *topodata.KeyRange // only set if shard is also a KeyRange + keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange } // NewFilterByShard creates a new FilterByShard for use by a @@ -344,7 +339,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) { } // IsIncluded returns true iff the tablet's keyspace and shard match what we have. -func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool { +func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool { canonical, kr, err := topo.ValidateShardName(tablet.Shard) if err != nil { log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err) @@ -384,7 +379,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace { } // IsIncluded returns true if the tablet's keyspace matches what we have. -func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool { +func (fbk *FilterByKeyspace) IsIncluded(tablet *topodatapb.Tablet) bool { _, exist := fbk.keyspaces[tablet.Keyspace] return exist } @@ -403,7 +398,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { } // IsIncluded returns true if the tablet's tags match what we expect. -func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { +func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodatapb.Tablet) bool { if fbtg.tags == nil { return true } diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 834fdcb1afe..b84347d38ce 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -67,7 +67,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { fhc := NewFakeHealthCheck(nil) defer fhc.Close() topologyWatcherOperations.ZeroAll() - tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true) done := make(chan bool, 3) result := make(chan bool, 1) @@ -127,7 +127,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -421,7 +421,7 @@ func TestFilterByKeyspace(t *testing.T) { f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} ts := memorytopo.NewServer(ctx, testCell) defer ts.Close() - tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true) for _, test := range testFilterByKeyspace { // Add a new tablet to the topology. @@ -502,7 +502,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} - tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -639,7 +639,7 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { defer fhc.Close() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true) defer tw.Stop() // Force fallback to getting tablets individually. diff --git a/go/vt/external/golib/sqlutils/sqlutils.go b/go/vt/external/golib/sqlutils/sqlutils.go index eb1cb8c8941..df6984b5634 100644 --- a/go/vt/external/golib/sqlutils/sqlutils.go +++ b/go/vt/external/golib/sqlutils/sqlutils.go @@ -40,7 +40,7 @@ type RowMap map[string]CellData // CellData is the result of a single (atomic) column in a single row type CellData sql.NullString -func (this *CellData) MarshalJSON() ([]byte, error) { +func (this CellData) MarshalJSON() ([]byte, error) { if this.Valid { return json.Marshal(this.String) } else { diff --git a/go/vt/key/key.go b/go/vt/key/key.go index dcdcda47f81..82852daa16e 100644 --- a/go/vt/key/key.go +++ b/go/vt/key/key.go @@ -90,6 +90,19 @@ func Empty(id []byte) bool { // KeyRange helper methods // +// Make a Key Range +func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange { + return &topodatapb.KeyRange{Start: start, End: end} +} + +// NewCompleteKeyRange returns a complete key range. +func NewCompleteKeyRange() *topodatapb.KeyRange { + return &topodatapb.KeyRange{ + Start: nil, + End: nil, + } +} + // KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent, // it returns false. func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) { diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 08b1c59f4cc..4e1d19ec79a 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -22,44 +22,26 @@ import ( "sort" "sync" - "github.com/spf13/pflag" "golang.org/x/sync/errgroup" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/event" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/vterrors" - - "vitess.io/vitess/go/event" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/topo/events" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo/events" + "vitess.io/vitess/go/vt/vterrors" ) // This file contains keyspace utility functions. -// Default concurrency to use in order to avoid overhwelming the topo server. -var DefaultConcurrency = 32 - // shardKeySuffix is the suffix of a shard key. // The full key looks like this: // /vitess/global/keyspaces/customer/shards/80-/Shard const shardKeySuffix = "Shard" -func registerFlags(fs *pflag.FlagSet) { - fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.") -} - -func init() { - servenv.OnParseFor("vtcombo", registerFlags) - servenv.OnParseFor("vtctld", registerFlags) - servenv.OnParseFor("vtgate", registerFlags) - servenv.OnParseFor("vtorc", registerFlags) -} - // KeyspaceInfo is a meta struct that contains metadata to give the // data more context and convenience. This is the main way we interact // with a keyspace. @@ -198,7 +180,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error { type FindAllShardsInKeyspaceOptions struct { // Concurrency controls the maximum number of concurrent calls to GetShard. // If <= 0, Concurrency is set to 1. - Concurrency int + Concurrency int64 } // FindAllShardsInKeyspace reads and returns all the existing shards in a @@ -212,7 +194,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt = &FindAllShardsInKeyspaceOptions{} } if opt.Concurrency <= 0 { - opt.Concurrency = DefaultConcurrency + opt.Concurrency = DefaultReadConcurrency } // Unescape the keyspace name as this can e.g. come from the VSchema where diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 1995e8b6ec4..b9bf46a1c3d 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -48,6 +48,7 @@ import ( "sync" "github.com/spf13/pflag" + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/topodata" @@ -175,6 +176,9 @@ var ( FlagBinaries = []string{"vttablet", "vtctl", "vtctld", "vtcombo", "vtgate", "vtorc", "vtbackup"} + + // Default read concurrency to use in order to avoid overhwelming the topo server. + DefaultReadConcurrency int64 = 32 ) func init() { @@ -187,6 +191,7 @@ func registerTopoFlags(fs *pflag.FlagSet) { fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use") fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server") fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server") + fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads per global or local cell.") } // RegisterFactory registers a Factory for an implementation for a Server. @@ -202,11 +207,12 @@ func RegisterFactory(name string, factory Factory) { // NewWithFactory creates a new Server based on the given Factory. // It also opens the global cell connection. func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error) { + globalReadSem := semaphore.NewWeighted(DefaultReadConcurrency) conn, err := factory.Create(GlobalCell, serverAddress, root) if err != nil { return nil, err } - conn = NewStatsConn(GlobalCell, conn) + conn = NewStatsConn(GlobalCell, conn, globalReadSem) var connReadOnly Conn if factory.HasGlobalReadOnlyCell(serverAddress, root) { @@ -214,7 +220,7 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error if err != nil { return nil, err } - connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly) + connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly, globalReadSem) } else { connReadOnly = conn } @@ -295,7 +301,8 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) { conn, err := ts.factory.Create(cell, ci.ServerAddress, ci.Root) switch { case err == nil: - conn = NewStatsConn(cell, conn) + cellReadSem := semaphore.NewWeighted(DefaultReadConcurrency) + conn = NewStatsConn(cell, conn, cellReadSem) ts.cellConns[cell] = cellConn{ci, conn} return conn, nil case IsErrType(err, NoNode): diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 220510e71f5..613a9f265ba 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -671,16 +671,8 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str } } - // Divide the concurrency limit by the number of cells. If there are more - // cells than the limit, default to concurrency of 1. - cellConcurrency := 1 - if len(cells) < DefaultConcurrency { - cellConcurrency = DefaultConcurrency / len(cells) - } - mu := sync.Mutex{} eg, ctx := errgroup.WithContext(ctx) - eg.SetLimit(DefaultConcurrency) tablets := make([]*TabletInfo, 0, len(cells)) var kss *KeyspaceShard @@ -691,7 +683,6 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str } } options := &GetTabletsByCellOptions{ - Concurrency: cellConcurrency, KeyspaceShard: kss, } for _, cell := range cells { diff --git a/go/vt/topo/shard_test.go b/go/vt/topo/shard_test.go index 2c0b9082816..1f003ac561c 100644 --- a/go/vt/topo/shard_test.go +++ b/go/vt/topo/shard_test.go @@ -244,6 +244,14 @@ func TestValidateShardName(t *testing.T) { }, valid: true, }, + { + name: "-", + expectedRange: &topodatapb.KeyRange{ + Start: []byte{}, + End: []byte{}, + }, + valid: true, + }, { name: "40-80", expectedRange: &topodatapb.KeyRange{ diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 08f44c0f75e..65c2b8b479a 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -20,6 +20,8 @@ import ( "context" "time" + "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -37,6 +39,11 @@ var ( "TopologyConnErrors", "TopologyConnErrors errors per operation", []string{"Operation", "Cell"}) + + topoStatsConnReadWaitTimings = stats.NewMultiTimings( + "TopologyConnReadWaits", + "TopologyConnReadWait timings", + []string{"Operation", "Cell"}) ) const readOnlyErrorStrFormat = "cannot perform %s on %s as the topology server connection is read-only" @@ -46,14 +53,16 @@ type StatsConn struct { cell string conn Conn readOnly bool + readSem *semaphore.Weighted } // NewStatsConn returns a StatsConn -func NewStatsConn(cell string, conn Conn) *StatsConn { +func NewStatsConn(cell string, conn Conn, readSem *semaphore.Weighted) *StatsConn { return &StatsConn{ cell: cell, conn: conn, readOnly: false, + readSem: readSem, } } @@ -61,6 +70,12 @@ func NewStatsConn(cell string, conn Conn) *StatsConn { func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([]DirEntry, error) { startTime := time.Now() statsKey := []string{"ListDir", st.cell} + if err := st.readSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer st.readSem.Release(1) + topoStatsConnReadWaitTimings.Record(statsKey, startTime) + startTime = time.Now() // reset defer topoStatsConnTimings.Record(statsKey, startTime) res, err := st.conn.ListDir(ctx, dirPath, full) if err != nil { @@ -106,6 +121,12 @@ func (st *StatsConn) Update(ctx context.Context, filePath string, contents []byt func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, error) { startTime := time.Now() statsKey := []string{"Get", st.cell} + if err := st.readSem.Acquire(ctx, 1); err != nil { + return nil, nil, err + } + defer st.readSem.Release(1) + topoStatsConnReadWaitTimings.Record(statsKey, startTime) + startTime = time.Now() // reset defer topoStatsConnTimings.Record(statsKey, startTime) bytes, version, err := st.conn.Get(ctx, filePath) if err != nil { @@ -119,6 +140,12 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, error) { startTime := time.Now() statsKey := []string{"List", st.cell} + if err := st.readSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer st.readSem.Release(1) + topoStatsConnReadWaitTimings.Record(statsKey, startTime) + startTime = time.Now() // reset defer topoStatsConnTimings.Record(statsKey, startTime) bytes, err := st.conn.List(ctx, filePathPrefix) if err != nil { diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index e26e8c97f31..5f5cab69adc 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -20,11 +20,27 @@ import ( "context" "fmt" "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" ) +// testStatsConnReadSem is a semaphore for unit tests. +// It intentionally has a concurrency limit of '1' to +// allow semaphore contention in tests. +var testStatsConnReadSem = semaphore.NewWeighted(1) + +// testStatsConnStatsReset resets StatsConn-based stats. +func testStatsConnStatsReset() { + topoStatsConnErrors.ResetAll() + topoStatsConnReadWaitTimings.Reset() + topoStatsConnTimings.Reset() +} + // The fakeConn is a wrapper for a Conn that emits stats for every operation type fakeConn struct { v Version @@ -150,216 +166,211 @@ func (st *fakeConn) IsReadOnly() bool { return st.readOnly } +// createTestReadSemaphoreContention simulates semaphore contention on the test read semaphore. +func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration, semAcquiredChan chan struct{}) { + if err := testStatsConnReadSem.Acquire(ctx, 1); err != nil { + panic(err) + } + defer testStatsConnReadSem.Release(1) + semAcquiredChan <- struct{}{} + time.Sleep(duration) +} + // TestStatsConnTopoListDir emits stats on ListDir func TestStatsConnTopoListDir(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() + semAcquiredChan := make(chan struct{}) + go createTestReadSemaphoreContention(ctx, 100*time.Millisecond, semAcquiredChan) + <-semAcquiredChan statsConn.ListDir(ctx, "", true) - timingCounts := topoStatsConnTimings.Counts()["ListDir.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["ListDir.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + + require.Equal(t, int64(1), topoStatsConnReadWaitTimings.Counts()["ListDir.global"]) + require.NotZero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["ListDir.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["ListDir.global"]) statsConn.ListDir(ctx, "error", true) // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["ListDir.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["ListDir.global"]) } // TestStatsConnTopoCreate emits stats on Create func TestStatsConnTopoCreate(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Create(ctx, "", []byte{}) - timingCounts := topoStatsConnTimings.Counts()["Create.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Create.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["Create.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["Create.global"]) statsConn.Create(ctx, "error", []byte{}) // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["Create.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Create.global"]) } // TestStatsConnTopoUpdate emits stats on Update func TestStatsConnTopoUpdate(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Update(ctx, "", []byte{}, conn.v) - timingCounts := topoStatsConnTimings.Counts()["Update.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Update.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["Update.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["Update.global"]) statsConn.Update(ctx, "error", []byte{}, conn.v) // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["Update.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Update.global"]) } // TestStatsConnTopoGet emits stats on Get func TestStatsConnTopoGet(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() + semAcquiredChan := make(chan struct{}) + go createTestReadSemaphoreContention(ctx, time.Millisecond*100, semAcquiredChan) + <-semAcquiredChan statsConn.Get(ctx, "") - timingCounts := topoStatsConnTimings.Counts()["Get.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Get.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + + require.Equal(t, int64(1), topoStatsConnReadWaitTimings.Counts()["Get.global"]) + require.NotZero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["Get.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["Get.global"]) statsConn.Get(ctx, "error") // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["Get.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Get.global"]) } // TestStatsConnTopoDelete emits stats on Delete func TestStatsConnTopoDelete(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Delete(ctx, "", conn.v) - timingCounts := topoStatsConnTimings.Counts()["Delete.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Delete.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["Delete.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["Delete.global"]) statsConn.Delete(ctx, "error", conn.v) // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["Delete.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Delete.global"]) } // TestStatsConnTopoLock emits stats on Lock func TestStatsConnTopoLock(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Lock(ctx, "", "") - timingCounts := topoStatsConnTimings.Counts()["Lock.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Lock.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) - // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["Lock.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + // Error is zero before getting an error. + require.Zero(t, topoStatsConnErrors.Counts()["Lock.global"]) statsConn.Lock(ctx, "error", "") - // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["Lock.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + // Error stats gets emitted. + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Lock.global"]) } // TestStatsConnTopoWatch emits stats on Watch func TestStatsConnTopoWatch(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Watch(ctx, "") - timingCounts := topoStatsConnTimings.Counts()["Watch.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } - + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Watch.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) } // TestStatsConnTopoNewLeaderParticipation emits stats on NewLeaderParticipation func TestStatsConnTopoNewLeaderParticipation(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) _, _ = statsConn.NewLeaderParticipation("", "") - timingCounts := topoStatsConnTimings.Counts()["NewLeaderParticipation.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["NewLeaderParticipation.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["NewLeaderParticipation.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["NewLeaderParticipation.global"]) _, _ = statsConn.NewLeaderParticipation("error", "") // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["NewLeaderParticipation.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["NewLeaderParticipation.global"]) } // TestStatsConnTopoClose emits stats on Close func TestStatsConnTopoClose(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} - statsConn := NewStatsConn("global", conn) + statsConn := NewStatsConn("global", conn, testStatsConnReadSem) statsConn.Close() - timingCounts := topoStatsConnTimings.Counts()["Close.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Close.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) } diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index 7dccf3bf183..ffc21153a7a 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -24,21 +24,17 @@ import ( "sync" "time" - "golang.org/x/sync/semaphore" - - "vitess.io/vitess/go/protoutil" - "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/event" "vitess.io/vitess/go/netutil" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/events" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" ) // IsTrivialTypeChange returns if this db type be trivially reassigned @@ -287,8 +283,6 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t // GetTabletsByCellOptions controls the behavior of // Server.FindAllShardsInKeyspace. type GetTabletsByCellOptions struct { - // Concurrency controls the maximum number of concurrent calls to GetTablet. - Concurrency int // KeyspaceShard is the optional keyspace/shard that tablets must match. // An empty shard value will match all shards in the keyspace. KeyspaceShard *KeyspaceShard @@ -550,29 +544,11 @@ func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb. returnErr error ) - concurrency := DefaultConcurrency - if opt != nil && opt.Concurrency > 0 { - concurrency = opt.Concurrency - } - var sem = semaphore.NewWeighted(int64(concurrency)) - for _, tabletAlias := range tabletAliases { wg.Add(1) go func(tabletAlias *topodatapb.TabletAlias) { defer wg.Done() - if err := sem.Acquire(ctx, 1); err != nil { - // Only happens if context is cancelled. - mu.Lock() - defer mu.Unlock() - log.Warningf("%v: %v", tabletAlias, err) - // We only need to set this on the first error. - if returnErr == nil { - returnErr = NewError(PartialResult, tabletAlias.GetCell()) - } - return - } tabletInfo, err := ts.GetTablet(ctx, tabletAlias) - sem.Release(1) mu.Lock() defer mu.Unlock() if err != nil { diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go index e659a0d01b9..1c242e8778b 100644 --- a/go/vt/topo/tablet_test.go +++ b/go/vt/topo/tablet_test.go @@ -69,7 +69,6 @@ func TestServerGetTabletsByCell(t *testing.T) { }, }, // Ensure this doesn't panic. - opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, }, { name: "single", @@ -151,7 +150,6 @@ func TestServerGetTabletsByCell(t *testing.T) { Shard: shard, }, }, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, }, { name: "multiple with list error", @@ -210,7 +208,6 @@ func TestServerGetTabletsByCell(t *testing.T) { Shard: shard, }, }, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, listError: topo.NewError(topo.ResourceExhausted, ""), }, { @@ -249,7 +246,6 @@ func TestServerGetTabletsByCell(t *testing.T) { }, }, opt: &topo.GetTabletsByCellOptions{ - Concurrency: 1, KeyspaceShard: &topo.KeyspaceShard{ Keyspace: keyspace, Shard: shard, @@ -317,7 +313,6 @@ func TestServerGetTabletsByCell(t *testing.T) { }, }, opt: &topo.GetTabletsByCellOptions{ - Concurrency: 1, KeyspaceShard: &topo.KeyspaceShard{ Keyspace: keyspace, Shard: "", diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index 73238802920..94daebbf7f0 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -16,6 +16,28 @@ package db +var TableNames = []string{ + "database_instance", + "audit", + "active_node", + "node_health", + "topology_recovery", + "database_instance_topology_history", + "candidate_database_instance", + "topology_failure_detection", + "blocked_topology_recovery", + "database_instance_last_analysis", + "database_instance_analysis_changelog", + "node_health_history", + "vtorc_db_deployments", + "global_recovery_disable", + "topology_recovery_steps", + "database_instance_stale_binlog_coordinates", + "vitess_tablet", + "vitess_keyspace", + "vitess_shard", +} + // vtorcBackend is a list of SQL statements required to build the vtorc backend var vtorcBackend = []string{ ` diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index e72ff2f0da9..c61fbab46d4 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -17,6 +17,7 @@ package inst import ( + "encoding/json" "errors" "fmt" "regexp" @@ -1196,3 +1197,32 @@ func ExpireStaleInstanceBinlogCoordinates() error { } return ExecDBWriteFunc(writeFunc) } + +// GetDatabaseState takes the snapshot of the database and returns it. +func GetDatabaseState() (string, error) { + type tableState struct { + TableName string + Rows []sqlutils.RowMap + } + + var dbState []tableState + for _, tableName := range db.TableNames { + ts := tableState{ + TableName: tableName, + } + err := db.QueryVTOrc("select * from "+tableName, nil, func(rowMap sqlutils.RowMap) error { + ts.Rows = append(ts.Rows, rowMap) + return nil + }) + if err != nil { + return "", err + } + dbState = append(dbState, ts) + } + jsonData, err := json.MarshalIndent(dbState, "", "\t") + if err != nil { + return "", err + } + + return string(jsonData), nil +} diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index 14b1bdf36d5..a471d26e6aa 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -733,3 +733,19 @@ func waitForCacheInitialization() { time.Sleep(100 * time.Millisecond) } } + +func TestGetDatabaseState(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + for _, query := range initialSQL { + _, err := db.ExecVTOrc(query) + require.NoError(t, err) + } + + ds, err := GetDatabaseState() + require.NoError(t, err) + require.Contains(t, ds, `"alias": "zone1-0000000112"`) +} diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index b1e93fe2a01..8115e614418 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -18,10 +18,10 @@ package logic import ( "context" - "sort" - "strings" "sync" + "golang.org/x/exp/maps" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -29,40 +29,23 @@ import ( ) // RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with. -func RefreshAllKeyspacesAndShards() { +func RefreshAllKeyspacesAndShards(ctx context.Context) error { var keyspaces []string - if len(clustersToWatch) == 0 { // all known keyspaces - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + if len(shardsToWatch) == 0 { // all known keyspaces + ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() var err error // Get all the keyspaces keyspaces, err = ts.GetKeyspaces(ctx) if err != nil { - log.Error(err) - return + return err } } else { - // Parse input and build list of keyspaces - for _, ks := range clustersToWatch { - if strings.Contains(ks, "/") { - // This is a keyspace/shard specification - input := strings.Split(ks, "/") - keyspaces = append(keyspaces, input[0]) - } else { - // Assume this is a keyspace - keyspaces = append(keyspaces, ks) - } - } - if len(keyspaces) == 0 { - log.Errorf("Found no keyspaces for input: %+v", clustersToWatch) - return - } + // Get keyspaces to watch from the list of known keyspaces. + keyspaces = maps.Keys(shardsToWatch) } - // Sort the list of keyspaces. - // The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace - sort.Strings(keyspaces) - refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() var wg sync.WaitGroup for idx, keyspace := range keyspaces { @@ -83,6 +66,8 @@ func RefreshAllKeyspacesAndShards() { }(keyspace) } wg.Wait() + + return nil } // RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard. diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 2911b3d29c2..42d0cdebbdf 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -93,7 +93,9 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Set clusters to watch to only watch ks1 and ks3 onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"} clustersToWatch = onlyKs1and3 - RefreshAllKeyspacesAndShards() + err := initializeShardsToWatch() + require.NoError(t, err) + require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) // Verify that we only have ks1 and ks3 in vtorc's db. verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "") @@ -106,9 +108,11 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Set clusters to watch to watch all keyspaces clustersToWatch = nil + err = initializeShardsToWatch() + require.NoError(t, err) // Change the durability policy of ks1 reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", "semi_sync") - RefreshAllKeyspacesAndShards() + require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) // Verify that all the keyspaces are correctly reloaded verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "") @@ -119,7 +123,6 @@ func TestRefreshAllKeyspaces(t *testing.T) { verifyPrimaryAlias(t, "ks3", "80-", "zone_ks3-0000000101", "") verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "") verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "") - } func TestRefreshKeyspace(t *testing.T) { diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 73cd61676ca..6d51c437ab4 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "maps" "slices" "strings" "sync" @@ -28,13 +27,14 @@ import ( "time" "github.com/spf13/pflag" - + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/external/golib/sqlutils" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtctl/reparentutil" @@ -42,8 +42,6 @@ import ( "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vttablet/tmclient" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -52,16 +50,83 @@ var ( clustersToWatch []string shutdownWaitTime = 30 * time.Second shardsLockCounter int32 + // shardsToWatch is a map storing the shards for a given keyspace that need to be watched. + // We store the key range for all the shards that we want to watch. + // This is populated by parsing `--clusters_to_watch` flag. + shardsToWatch map[string][]*topodatapb.KeyRange + // ErrNoPrimaryTablet is a fixed error message. ErrNoPrimaryTablet = errors.New("no primary tablet found") ) // RegisterFlags registers the flags required by VTOrc func RegisterFlags(fs *pflag.FlagSet) { - fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") + fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/keyranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") } +// initializeShardsToWatch parses the --clusters_to_watch flag-value +// into a map of keyspace/shards. +func initializeShardsToWatch() error { + shardsToWatch = make(map[string][]*topodatapb.KeyRange) + if len(clustersToWatch) == 0 { + return nil + } + + for _, ks := range clustersToWatch { + if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") { + // Validate keyspace/shard parses. + k, s, err := topoproto.ParseKeyspaceShard(ks) + if err != nil { + log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err) + continue + } + if !key.IsValidKeyRange(s) { + return fmt.Errorf("invalid key range %q while parsing clusters to watch", s) + } + // Parse the shard name into key range value. + keyRanges, err := key.ParseShardingSpec(s) + if err != nil { + return fmt.Errorf("could not parse shard name %q: %+v", s, err) + } + shardsToWatch[k] = append(shardsToWatch[k], keyRanges...) + } else { + // Remove trailing slash if exists. + ks = strings.TrimSuffix(ks, "/") + // We store the entire range of key range if nothing is specified. + shardsToWatch[ks] = []*topodatapb.KeyRange{key.NewCompleteKeyRange()} + } + } + + if len(shardsToWatch) == 0 { + log.Error("No keyspace/shards to watch, watching all keyspaces") + } + return nil +} + +// shouldWatchTablet checks if the given tablet is part of the watch list. +func shouldWatchTablet(tablet *topodatapb.Tablet) bool { + // If we are watching all keyspaces, then we want to watch this tablet too. + if len(shardsToWatch) == 0 { + return true + } + shardRanges, ok := shardsToWatch[tablet.GetKeyspace()] + // If we don't have the keyspace in our map, then this tablet + // doesn't need to be watched. + if !ok { + return false + } + // Get the tablet's key range, and check if + // it is part of the shard ranges we are watching. + kr := tablet.GetKeyRange() + for _, shardRange := range shardRanges { + if key.KeyRangeContainsKeyRange(shardRange, kr) { + return true + } + } + return false +} + // OpenTabletDiscovery opens the vitess topo if enables and returns a ticker // channel for polling. func OpenTabletDiscovery() <-chan time.Time { @@ -72,144 +137,87 @@ func OpenTabletDiscovery() <-chan time.Time { if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil { log.Error(err) } + // Parse --clusters_to_watch into a filter. + err := initializeShardsToWatch() + if err != nil { + log.Fatalf("Error parsing --clusters-to-watch: %v", err) + } + // We refresh all information from the topo once before we start the ticks to do + // it on a timer. + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + if err := refreshAllInformation(ctx); err != nil { + log.Errorf("failed to initialize topo information: %+v", err) + } return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker } +// getAllTablets gets all tablets from all cells using a goroutine per cell. +func getAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo { + var tabletsMu sync.Mutex + tablets := make([]*topo.TabletInfo, 0) + eg, ctx := errgroup.WithContext(ctx) + for _, cell := range cells { + eg.Go(func() error { + t, err := ts.GetTabletsByCell(ctx, cell, nil) + if err != nil { + log.Errorf("Failed to load tablets from cell %s: %+v", cell, err) + return nil + } + tabletsMu.Lock() + defer tabletsMu.Unlock() + tablets = append(tablets, t...) + return nil + }) + } + _ = eg.Wait() // always nil + return tablets +} + // refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while -func refreshAllTablets() { - refreshTabletsUsing(func(tabletAlias string) { +func refreshAllTablets(ctx context.Context) error { + return refreshTabletsUsing(ctx, func(tabletAlias string) { DiscoverInstance(tabletAlias, false /* forceDiscovery */) }, false /* forceRefresh */) } -// getKeyspaceShardsToWatch converts the input clustersToWatch into a list of individual keyspace/shards. -// This handles both individual shards or key ranges using TabletFilter from the discovery package. -func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) { - // Parse input and build list of keyspaces / shards - var keyspaceShards []*topo.KeyspaceShard - - keyspaces := make(map[string]map[string]string) - filters := make(map[string][]string) - - keyspaces["ranged"] = map[string]string{} - keyspaces["full"] = map[string]string{} - - for _, ks := range clustersToWatch { - if strings.Contains(ks, "/") { - // This is a keyspace/shard specification - input := strings.Split(ks, "/") - keyspaces["ranged"][input[0]] = "ranged" - // filter creation expects a pipe separator between keyspace and shard - filters[input[0]] = append(filters[input[0]], fmt.Sprintf("%s|%s", input[0], input[1])) - - } else { - keyspaces["full"][ks] = "full" - } +// refreshTabletsUsing refreshes tablets using a provided loader. +func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error { + if !IsLeaderOrActive() { + return nil } - // Copy function will combine the two maps. It will override any keyspaces in ranged that also exist in full with the - // full designation because we assume that the full keyspace will take precedence over a keyspace/shard specification within the same input. - // e.g. If the clustersToWatch is `ks1,...ks1/10-20`, all tablets in ks1 should be watched. - maps.Copy(keyspaces["ranged"], keyspaces["full"]) - - if len(keyspaces["ranged"]) > 0 { - for ks, filterType := range keyspaces["ranged"] { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - - shards, err := ts.GetShardNames(ctx, ks) - if err != nil { - // Log the errr and continue - log.Errorf("Error fetching shards for keyspace: %v", ks) - continue - } - - if len(shards) == 0 { - log.Errorf("Topo has no shards for ks: %v", ks) - continue - } - - if filterType == "ranged" { - shardFilter, err := discovery.NewFilterByShard(filters[ks]) - if err != nil { - log.Error(err) - return keyspaceShards, err - } - - for _, s := range shards { - if shardFilter.IsIncluded(&topodatapb.Tablet{Keyspace: ks, Shard: s}) { - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) - } - } - } else { - for _, s := range shards { - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) - } - } - } + // Get all cells. + ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + cells, err := ts.GetKnownCells(ctx) + if err != nil { + return err } - return keyspaceShards, nil -} - -func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { - if !IsLeaderOrActive() { - return + // Get all tablets from all cells. + getTabletsCtx, getTabletsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer getTabletsCancel() + tablets := getAllTablets(getTabletsCtx, cells) + if len(tablets) == 0 { + log.Error("Found no tablets") + return nil } - if len(clustersToWatch) == 0 { // all known clusters - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - cells, err := ts.GetKnownCells(ctx) - if err != nil { - log.Error(err) - return - } - - refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer refreshCancel() - var wg sync.WaitGroup - for _, cell := range cells { - wg.Add(1) - go func(cell string) { - defer wg.Done() - refreshTabletsInCell(refreshCtx, cell, loader, forceRefresh) - }(cell) - } - wg.Wait() - } else { - keyspaceShards, err := getKeyspaceShardsToWatch() - if err != nil { - log.Error(err) - return - } - if len(keyspaceShards) == 0 || keyspaceShards == nil { - log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch) - return - } - refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer refreshCancel() - var wg sync.WaitGroup - for _, ks := range keyspaceShards { - wg.Add(1) - go func(ks *topo.KeyspaceShard) { - defer wg.Done() - refreshTabletsInKeyspaceShard(refreshCtx, ks.Keyspace, ks.Shard, loader, forceRefresh, nil) - }(ks) + // Filter tablets that should not be watched using shardsToWatch map. + matchedTablets := make([]*topo.TabletInfo, 0, len(tablets)) + func() { + for _, t := range tablets { + if shouldWatchTablet(t.Tablet) { + matchedTablets = append(matchedTablets, t) + } } - wg.Wait() - } -} + }() -func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) { - tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency}) - if err != nil { - log.Errorf("Error fetching topo info for cell %v: %v", cell, err) - return - } - query := "select alias from vitess_tablet where cell = ?" - args := sqlutils.Args(cell) - refreshTablets(tablets, query, args, loader, forceRefresh, nil) + // Refresh the filtered tablets. + query := "select alias from vitess_tablet" + refreshTablets(matchedTablets, query, nil, loader, forceRefresh, nil) + return nil } // forceRefreshAllTabletsInShard is used to refresh all the tablet's information (both MySQL information and topo records) diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index c4aef5ba0eb..63f5c0a7167 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -19,6 +19,7 @@ package logic import ( "context" "fmt" + "strings" "sync/atomic" "testing" "time" @@ -29,6 +30,7 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/external/golib/sqlutils" + "vitess.io/vitess/go/vt/key" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vttime" "vitess.io/vitess/go/vt/topo" @@ -101,6 +103,221 @@ var ( } ) +func TestShouldWatchTablet(t *testing.T) { + oldClustersToWatch := clustersToWatch + defer func() { + clustersToWatch = oldClustersToWatch + shardsToWatch = nil + }() + + testCases := []struct { + in []string + tablet *topodatapb.Tablet + expectedShouldWatch bool + }{ + { + in: []string{}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedShouldWatch: true, + }, + { + in: []string{keyspace}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedShouldWatch: true, + }, + { + in: []string{keyspace + "/-"}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedShouldWatch: true, + }, + { + in: []string{keyspace + "/" + shard}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedShouldWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}), + }, + expectedShouldWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x40}, []byte{0x50}), + }, + expectedShouldWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x70}, []byte{0x90}), + }, + expectedShouldWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x90}), + }, + expectedShouldWatch: false, + }, + { + in: []string{"ks/50-70"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}), + }, + expectedShouldWatch: true, + }, + { + in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x80}), + }, + expectedShouldWatch: true, + }, + { + in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x80}, []byte{0x90}), + }, + expectedShouldWatch: false, + }, + { + in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x90}, []byte{0xa0}), + }, + expectedShouldWatch: false, + }, + } + + for _, tt := range testCases { + t.Run(fmt.Sprintf("%v-Tablet-%v-%v", strings.Join(tt.in, ","), tt.tablet.GetKeyspace(), tt.tablet.GetShard()), func(t *testing.T) { + clustersToWatch = tt.in + err := initializeShardsToWatch() + require.NoError(t, err) + assert.Equal(t, tt.expectedShouldWatch, shouldWatchTablet(tt.tablet)) + }) + } +} + +// TestInitializeShardsToWatch tests that we initialize the shardsToWatch map correctly +// using the `--clusters_to_watch` flag. +func TestInitializeShardsToWatch(t *testing.T) { + oldClustersToWatch := clustersToWatch + defer func() { + clustersToWatch = oldClustersToWatch + shardsToWatch = nil + }() + + testCases := []struct { + in []string + expected map[string][]*topodatapb.KeyRange + expectedErr string + }{ + { + in: []string{}, + expected: map[string][]*topodatapb.KeyRange{}, + }, + { + in: []string{"unknownKs"}, + expected: map[string][]*topodatapb.KeyRange{ + "unknownKs": { + key.NewCompleteKeyRange(), + }, + }, + }, + { + in: []string{"test/-"}, + expected: map[string][]*topodatapb.KeyRange{ + "test": { + key.NewCompleteKeyRange(), + }, + }, + }, + { + in: []string{"test/324"}, + expectedErr: `invalid key range "324" while parsing clusters to watch`, + }, + { + in: []string{"test/0"}, + expected: map[string][]*topodatapb.KeyRange{ + "test": { + key.NewCompleteKeyRange(), + }, + }, + }, + { + in: []string{"test/-", "test2/-80", "test2/80-"}, + expected: map[string][]*topodatapb.KeyRange{ + "test": { + key.NewCompleteKeyRange(), + }, + "test2": { + key.NewKeyRange(nil, []byte{0x80}), + key.NewKeyRange([]byte{0x80}, nil), + }, + }, + }, + { + // known keyspace + in: []string{keyspace}, + expected: map[string][]*topodatapb.KeyRange{ + keyspace: { + key.NewCompleteKeyRange(), + }, + }, + }, + { + // keyspace with trailing-slash + in: []string{keyspace + "/"}, + expected: map[string][]*topodatapb.KeyRange{ + keyspace: { + key.NewCompleteKeyRange(), + }, + }, + }, + } + + for _, testCase := range testCases { + t.Run(strings.Join(testCase.in, ","), func(t *testing.T) { + defer func() { + shardsToWatch = make(map[string][]*topodatapb.KeyRange, 0) + }() + clustersToWatch = testCase.in + err := initializeShardsToWatch() + if testCase.expectedErr != "" { + require.EqualError(t, err, testCase.expectedErr) + return + } + require.NoError(t, err) + require.Equal(t, testCase.expected, shardsToWatch) + }) + } +} + func TestRefreshTabletsInKeyspaceShard(t *testing.T) { // Store the old flags and restore on test completion oldTs := ts @@ -277,131 +494,6 @@ func TestShardPrimary(t *testing.T) { } } -func TestGetKeyspaceShardsToWatch(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ts = memorytopo.NewServer(ctx, "test_cell") - - keyspaces := []string{"test_keyspace", "test_keyspace2", "test_keyspace3", "test_keyspace4"} - for _, k := range keyspaces { - if err := ts.CreateKeyspace(ctx, k, &topodatapb.Keyspace{}); err != nil { - t.Fatalf("cannot create keyspace: %v", err) - } - } - - shards1 := []string{"-40", "40-50", "50-60", "60-70", "70-80", "80-"} - shards2 := []string{"-1000", "1000-1100", "1100-1200", "1200-1300", "1300-"} - - for _, shard := range shards1 { - if err := ts.CreateShard(ctx, keyspaces[0], shard); err != nil { - t.Fatalf("cannot create shard: %v", err) - } - } - - for _, shard := range shards2 { - if err := ts.CreateShard(ctx, keyspaces[1], shard); err != nil { - t.Fatalf("cannot create shard: %v", err) - } - } - - if err := ts.CreateShard(ctx, keyspaces[2], "-"); err != nil { - t.Fatalf("cannot create shard: %v", err) - } - - if err := ts.CreateShard(ctx, keyspaces[3], "0"); err != nil { - t.Fatalf("cannot create shard: %v", err) - } - - testcases := []*struct { - name string - clusters []string - expected []*topo.KeyspaceShard - }{ - { - name: "single shard and range", - clusters: []string{fmt.Sprintf("%s/40-50", keyspaces[0]), fmt.Sprintf("%s/60-80", keyspaces[0])}, - expected: []*topo.KeyspaceShard{ - {Keyspace: keyspaces[0], Shard: "40-50"}, - {Keyspace: keyspaces[0], Shard: "60-70"}, - {Keyspace: keyspaces[0], Shard: "70-80"}, - }, - }, { - name: "single shard", - clusters: []string{fmt.Sprintf("%s/40-50", keyspaces[0])}, - expected: []*topo.KeyspaceShard{{Keyspace: keyspaces[0], Shard: "40-50"}}, - }, { - name: "full keyspace", - clusters: []string{keyspaces[0]}, - expected: []*topo.KeyspaceShard{ - {Keyspace: keyspaces[0], Shard: "-40"}, - {Keyspace: keyspaces[0], Shard: "40-50"}, - {Keyspace: keyspaces[0], Shard: "50-60"}, - {Keyspace: keyspaces[0], Shard: "60-70"}, - {Keyspace: keyspaces[0], Shard: "70-80"}, - {Keyspace: keyspaces[0], Shard: "80-"}, - }, - }, { - name: "full keyspace with keyrange", - clusters: []string{keyspaces[0], fmt.Sprintf("%s/60-80", keyspaces[0])}, - expected: []*topo.KeyspaceShard{ - {Keyspace: keyspaces[0], Shard: "-40"}, - {Keyspace: keyspaces[0], Shard: "40-50"}, - {Keyspace: keyspaces[0], Shard: "50-60"}, - {Keyspace: keyspaces[0], Shard: "60-70"}, - {Keyspace: keyspaces[0], Shard: "70-80"}, - {Keyspace: keyspaces[0], Shard: "80-"}, - }, - }, { - name: "multi keyspace", - clusters: []string{keyspaces[0], fmt.Sprintf("%s/1100-1300", keyspaces[1])}, - expected: []*topo.KeyspaceShard{ - {Keyspace: keyspaces[1], Shard: "1100-1200"}, - {Keyspace: keyspaces[1], Shard: "1200-1300"}, - {Keyspace: keyspaces[0], Shard: "-40"}, - {Keyspace: keyspaces[0], Shard: "40-50"}, - {Keyspace: keyspaces[0], Shard: "50-60"}, - {Keyspace: keyspaces[0], Shard: "60-70"}, - {Keyspace: keyspaces[0], Shard: "70-80"}, - {Keyspace: keyspaces[0], Shard: "80-"}, - }, - }, { - name: "partial success with non-existent shard", - clusters: []string{"non-existent/10-20", fmt.Sprintf("%s/1100-1300", keyspaces[1])}, - expected: []*topo.KeyspaceShard{ - {Keyspace: keyspaces[1], Shard: "1100-1200"}, - {Keyspace: keyspaces[1], Shard: "1200-1300"}, - }, - }, { - name: "empty result", - clusters: []string{"non-existent/10-20"}, - expected: nil, - }, { - name: "single keyspace -", - clusters: []string{keyspaces[2]}, - expected: []*topo.KeyspaceShard{ - {Keyspace: keyspaces[2], Shard: "-"}, - }, - }, { - name: "single keyspace 0", - clusters: []string{keyspaces[3]}, - expected: []*topo.KeyspaceShard{ - {Keyspace: keyspaces[3], Shard: "0"}, - }, - }, - } - - for _, testcase := range testcases { - t.Run(testcase.name, func(t *testing.T) { - clustersToWatch = testcase.clusters - res, err := getKeyspaceShardsToWatch() - - assert.NoError(t, err) - assert.ElementsMatch(t, testcase.expected, res) - }) - } -} - // verifyRefreshTabletsInKeyspaceShard calls refreshTabletsInKeyspaceShard with the forceRefresh parameter provided and verifies that // the number of instances refreshed matches the parameter and all the tablets match the ones provided func verifyRefreshTabletsInKeyspaceShard(t *testing.T, forceRefresh bool, instanceRefreshRequired int, tablets []*topodatapb.Tablet, tabletsToIgnore []string) { diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index b9d62fc2982..947150b9907 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -17,6 +17,7 @@ package logic import ( + "context" "os" "os/signal" "sync" @@ -26,6 +27,7 @@ import ( "github.com/patrickmn/go-cache" "github.com/sjmudd/stopwatch" + "golang.org/x/sync/errgroup" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -407,27 +409,34 @@ func ContinuousDiscovery() { } }() case <-tabletTopoTick: - // Create a wait group - var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(config.Config.TopoInformationRefreshSeconds)) + if err := refreshAllInformation(ctx); err != nil { + log.Errorf("failed to refresh topo information: %+v", err) + } + cancel() + } + } +} - // Refresh all keyspace information. - wg.Add(1) - go func() { - defer wg.Done() - RefreshAllKeyspacesAndShards() - }() +// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks. +func refreshAllInformation(ctx context.Context) error { + // Create an errgroup + eg, ctx := errgroup.WithContext(ctx) - // Refresh all tablets. - wg.Add(1) - go func() { - defer wg.Done() - refreshAllTablets() - }() + // Refresh all keyspace information. + eg.Go(func() error { + return RefreshAllKeyspacesAndShards(ctx) + }) - // Wait for both the refreshes to complete - wg.Wait() - // We have completed one discovery cycle in the entirety of it. We should update the process health. - process.FirstDiscoveryCycleComplete.Store(true) - } + // Refresh all tablets. + eg.Go(func() error { + return refreshAllTablets(ctx) + }) + + // Wait for both the refreshes to complete + err := eg.Wait() + if err == nil { + process.FirstDiscoveryCycleComplete.Store(true) } + return err } diff --git a/go/vt/vtorc/logic/vtorc_test.go b/go/vt/vtorc/logic/vtorc_test.go index c8f2ac3bfdc..7ee2f0e253b 100644 --- a/go/vt/vtorc/logic/vtorc_test.go +++ b/go/vt/vtorc/logic/vtorc_test.go @@ -1,11 +1,17 @@ package logic import ( + "context" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vtorc/db" + "vitess.io/vitess/go/vt/vtorc/process" ) func TestWaitForLocksRelease(t *testing.T) { @@ -54,3 +60,49 @@ func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration { waitForLocksRelease() return time.Since(start) } + +func TestRefreshAllInformation(t *testing.T) { + defer process.ResetLastHealthCheckCache() + + // Store the old flags and restore on test completion + oldTs := ts + defer func() { + ts = oldTs + }() + + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + // Verify in the beginning, we have the first DiscoveredOnce field false. + _, err := process.HealthTest() + require.NoError(t, err) + + // Create a memory topo-server and create the keyspace and shard records + ts = memorytopo.NewServer(context.Background(), cell1) + _, err = ts.GetOrCreateShard(context.Background(), keyspace, shard) + require.NoError(t, err) + + // Test error + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel context to simulate timeout + require.Error(t, refreshAllInformation(ctx)) + require.False(t, process.FirstDiscoveryCycleComplete.Load()) + health, err := process.HealthTest() + require.NoError(t, err) + require.False(t, health.DiscoveredOnce) + require.False(t, health.Healthy) + process.ResetLastHealthCheckCache() + + // Test success + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + require.NoError(t, refreshAllInformation(ctx2)) + require.True(t, process.FirstDiscoveryCycleComplete.Load()) + health, err = process.HealthTest() + require.NoError(t, err) + require.True(t, health.DiscoveredOnce) + require.True(t, health.Healthy) + process.ResetLastHealthCheckCache() +} diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 22db89e1d56..7f8ab83b39b 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -36,6 +36,8 @@ var FirstDiscoveryCycleComplete atomic.Bool var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second) +func ResetLastHealthCheckCache() { lastHealthCheckCache.Flush() } + type NodeHealth struct { Hostname string Token string @@ -120,8 +122,8 @@ func HealthTest() (health *HealthStatus, err error) { log.Error(err) return health, err } - health.Healthy = healthy health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load() + health.Healthy = healthy && health.DiscoveredOnce if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil { health.Error = err diff --git a/go/vt/vtorc/process/health_test.go b/go/vt/vtorc/process/health_test.go new file mode 100644 index 00000000000..85317530ac4 --- /dev/null +++ b/go/vt/vtorc/process/health_test.go @@ -0,0 +1,51 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package process + +import ( + "testing" + + "github.com/stretchr/testify/require" + _ "modernc.org/sqlite" +) + +func TestHealthTest(t *testing.T) { + defer func() { + FirstDiscoveryCycleComplete.Store(false) + ThisNodeHealth = &NodeHealth{} + ResetLastHealthCheckCache() + }() + + require.Zero(t, ThisNodeHealth.LastReported) + + ThisNodeHealth = &NodeHealth{} + health, err := HealthTest() + require.NoError(t, err) + require.False(t, health.Healthy) + require.False(t, health.DiscoveredOnce) + require.NotZero(t, ThisNodeHealth.LastReported) + ResetLastHealthCheckCache() + + ThisNodeHealth = &NodeHealth{} + FirstDiscoveryCycleComplete.Store(true) + health, err = HealthTest() + require.NoError(t, err) + require.True(t, health.Healthy) + require.True(t, health.DiscoveredOnce) + require.NotZero(t, ThisNodeHealth.LastReported) + ResetLastHealthCheckCache() +} diff --git a/go/vt/vtorc/server/api.go b/go/vt/vtorc/server/api.go index b0112e10add..60fdf226e95 100644 --- a/go/vt/vtorc/server/api.go +++ b/go/vt/vtorc/server/api.go @@ -45,6 +45,7 @@ const ( disableGlobalRecoveriesAPI = "/api/disable-global-recoveries" enableGlobalRecoveriesAPI = "/api/enable-global-recoveries" replicationAnalysisAPI = "/api/replication-analysis" + databaseStateAPI = "/api/database-state" healthAPI = "/debug/health" AggregatedDiscoveryMetricsAPI = "/api/aggregated-discovery-metrics" @@ -60,6 +61,7 @@ var ( disableGlobalRecoveriesAPI, enableGlobalRecoveriesAPI, replicationAnalysisAPI, + databaseStateAPI, healthAPI, AggregatedDiscoveryMetricsAPI, } @@ -86,6 +88,8 @@ func (v *vtorcAPI) ServeHTTP(response http.ResponseWriter, request *http.Request errantGTIDsAPIHandler(response, request) case replicationAnalysisAPI: replicationAnalysisAPIHandler(response, request) + case databaseStateAPI: + databaseStateAPIHandler(response) case AggregatedDiscoveryMetricsAPI: AggregatedDiscoveryMetricsAPIHandler(response, request) default: @@ -104,7 +108,7 @@ func getACLPermissionLevelForAPI(apiEndpoint string) string { return acl.ADMIN case replicationAnalysisAPI: return acl.MONITORING - case healthAPI: + case healthAPI, databaseStateAPI: return acl.MONITORING } return acl.ADMIN @@ -166,6 +170,16 @@ func errantGTIDsAPIHandler(response http.ResponseWriter, request *http.Request) returnAsJSON(response, http.StatusOK, instances) } +// databaseStateAPIHandler is the handler for the databaseStateAPI endpoint +func databaseStateAPIHandler(response http.ResponseWriter) { + ds, err := inst.GetDatabaseState() + if err != nil { + http.Error(response, err.Error(), http.StatusInternalServerError) + return + } + writePlainTextResponse(response, ds, http.StatusOK) +} + // AggregatedDiscoveryMetricsAPIHandler is the handler for the discovery metrics endpoint func AggregatedDiscoveryMetricsAPIHandler(response http.ResponseWriter, request *http.Request) { // return metrics for last x seconds