diff --git a/go/cmd/vtexplain/cli/vtexplain.go b/go/cmd/vtexplain/cli/vtexplain.go index aee920f6486..5c15112917e 100644 --- a/go/cmd/vtexplain/cli/vtexplain.go +++ b/go/cmd/vtexplain/cli/vtexplain.go @@ -24,6 +24,7 @@ import ( "vitess.io/vitess/go/acl" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vtexplain" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" @@ -182,7 +183,9 @@ func parseAndRun() error { if err != nil { return err } - vte, err := vtexplain.Init(context.Background(), env, vschema, schema, ksShardMap, opts) + ctx := context.Background() + ts := memorytopo.NewServer(ctx, vtexplain.Cell) + vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts) if err != nil { return err } diff --git a/go/vt/srvtopo/watch.go b/go/vt/srvtopo/watch.go index 36d8fd428bd..4a0ccda2d59 100644 --- a/go/vt/srvtopo/watch.go +++ b/go/vt/srvtopo/watch.go @@ -23,6 +23,7 @@ import ( "time" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" ) @@ -204,8 +205,11 @@ func (entry *watchEntry) onErrorLocked(ctx context.Context, err error, init bool entry.value = nil } } else { - entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err) - log.Errorf("%v", entry.lastError) + if !topo.IsErrType(err, topo.Interrupted) { + // No need to log if we're explicitly interrupted. + entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err) + log.Errorf("%v", entry.lastError) + } // Even though we didn't get a new value, update the lastValueTime // here since the watch was successfully running before and we want @@ -224,8 +228,7 @@ func (entry *watchEntry) onErrorLocked(ctx context.Context, err error, init bool if len(entry.listeners) > 0 && !topo.IsErrType(err, topo.Interrupted) { go func() { - time.Sleep(entry.rw.cacheRefreshInterval) - + _ = timer.SleepContext(ctx, entry.rw.cacheRefreshInterval) entry.mutex.Lock() entry.ensureWatchingLocked(ctx) entry.mutex.Unlock() diff --git a/go/vt/srvtopo/watch_srvkeyspace.go b/go/vt/srvtopo/watch_srvkeyspace.go index cefe95c6951..ac2d8c0bac1 100644 --- a/go/vt/srvtopo/watch_srvkeyspace.go +++ b/go/vt/srvtopo/watch_srvkeyspace.go @@ -40,7 +40,7 @@ func (k *srvKeyspaceKey) String() string { func NewSrvKeyspaceWatcher(ctx context.Context, topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvKeyspaceWatcher { watch := func(entry *watchEntry) { key := entry.key.(*srvKeyspaceKey) - requestCtx, requestCancel := context.WithCancel(context.Background()) + requestCtx, requestCancel := context.WithCancel(ctx) defer requestCancel() current, changes, err := topoServer.WatchSrvKeyspace(requestCtx, key.cell, key.keyspace) diff --git a/go/vt/vtadmin/api.go b/go/vt/vtadmin/api.go index b462d1dcbb3..dce4ff041c9 100644 --- a/go/vt/vtadmin/api.go +++ b/go/vt/vtadmin/api.go @@ -39,6 +39,7 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtadmin/cluster" "vitess.io/vitess/go/vt/vtadmin/cluster/dynamic" @@ -2382,7 +2383,8 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest) return nil, er.Error() } - vte, err := vtexplain.Init(ctx, api.env, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"}) + ts := memorytopo.NewServer(ctx, vtexplain.Cell) + vte, err := vtexplain.Init(ctx, api.env, ts, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"}) if err != nil { return nil, fmt.Errorf("error initilaizing vtexplain: %w", err) } diff --git a/go/vt/vtexplain/vtexplain.go b/go/vt/vtexplain/vtexplain.go index 55a0e52b005..e16dea89d2b 100644 --- a/go/vt/vtexplain/vtexplain.go +++ b/go/vt/vtexplain/vtexplain.go @@ -28,6 +28,7 @@ import ( "github.com/spf13/pflag" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/discovery" @@ -53,7 +54,7 @@ func init() { } const ( - vtexplainCell = "explainCell" + Cell = "explainCell" // ModeMulti is the default mode with autocommit implemented at vtgate ModeMulti = "multi" @@ -183,7 +184,7 @@ type TabletActions struct { } // Init sets up the fake execution environment -func Init(ctx context.Context, env *vtenv.Environment, vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) { +func Init(ctx context.Context, env *vtenv.Environment, ts *topo.Server, vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) { // Verify options if opts.ReplicationMode != "ROW" && opts.ReplicationMode != "STATEMENT" { return nil, fmt.Errorf("invalid replication mode \"%s\"", opts.ReplicationMode) @@ -206,7 +207,7 @@ func Init(ctx context.Context, env *vtenv.Environment, vSchemaStr, sqlSchema, ks env: env, } vte.setGlobalTabletEnv(tabletEnv) - err = vte.initVtgateExecutor(ctx, vSchemaStr, ksShardMapStr, opts) + err = vte.initVtgateExecutor(ctx, ts, vSchemaStr, ksShardMapStr, opts) if err != nil { return nil, fmt.Errorf("initVtgateExecutor: %v", err.Error()) } diff --git a/go/vt/vtexplain/vtexplain_test.go b/go/vt/vtexplain/vtexplain_test.go index 1c60ea5c34a..e7a6f4bdfc8 100644 --- a/go/vt/vtexplain/vtexplain_test.go +++ b/go/vt/vtexplain/vtexplain_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/test/utils" @@ -51,7 +52,7 @@ type testopts struct { shardmap map[string]map[string]*topo.ShardInfo } -func initTest(ctx context.Context, mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain { +func initTest(ctx context.Context, ts *topo.Server, mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain { schema, err := os.ReadFile("testdata/test-schema.sql") require.NoError(t, err) @@ -67,7 +68,7 @@ func initTest(ctx context.Context, mode string, opts *Options, topts *testopts, } opts.ExecutionMode = mode - vte, err := Init(ctx, vtenv.NewTestEnv(), string(vSchema), string(schema), shardmap, opts) + vte, err := Init(ctx, vtenv.NewTestEnv(), ts, string(vSchema), string(schema), shardmap, opts) require.NoError(t, err, "vtexplain Init error\n%s", string(schema)) return vte } @@ -90,7 +91,8 @@ func runTestCase(testcase, mode string, opts *Options, topts *testopts, t *testi t.Run(testcase, func(t *testing.T) { ctx := utils.LeakCheckContext(t) - vte := initTest(ctx, mode, opts, topts, t) + ts := memorytopo.NewServer(ctx, Cell) + vte := initTest(ctx, ts, mode, opts, topts, t) defer vte.Stop() sqlFile := fmt.Sprintf("testdata/%s-queries.sql", testcase) @@ -156,8 +158,8 @@ func TestExplain(t *testing.T) { func TestErrors(t *testing.T) { ctx := utils.LeakCheckContext(t) - - vte := initTest(ctx, ModeMulti, defaultTestOpts(), &testopts{}, t) + ts := memorytopo.NewServer(ctx, Cell) + vte := initTest(ctx, ts, ModeMulti, defaultTestOpts(), &testopts{}, t) defer vte.Stop() tests := []struct { @@ -196,8 +198,8 @@ func TestErrors(t *testing.T) { func TestJSONOutput(t *testing.T) { ctx := utils.LeakCheckContext(t) - - vte := initTest(ctx, ModeMulti, defaultTestOpts(), &testopts{}, t) + ts := memorytopo.NewServer(ctx, Cell) + vte := initTest(ctx, ts, ModeMulti, defaultTestOpts(), &testopts{}, t) defer vte.Stop() sql := "select 1 from user where id = 1" explains, err := vte.Run(sql) @@ -346,7 +348,8 @@ func TestInit(t *testing.T) { } }` schema := "create table table_missing_primary_vindex (id int primary key)" - _, err := Init(ctx, vtenv.NewTestEnv(), vschema, schema, "", defaultTestOpts()) + ts := memorytopo.NewServer(ctx, Cell) + _, err := Init(ctx, vtenv.NewTestEnv(), ts, vschema, schema, "", defaultTestOpts()) require.Error(t, err) require.Contains(t, err.Error(), "missing primary col vindex") } diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index f24725fcd7b..a5d9677f02a 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -22,26 +22,23 @@ package vtexplain import ( "context" "fmt" + "path" "sort" "strings" "vitess.io/vitess/go/cache/theine" - "vitess.io/vitess/go/vt/vtgate/logstats" - "vitess.io/vitess/go/vt/vtgate/vindexes" - - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/memorytopo" - - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/json2" "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate" "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vtgate/logstats" + "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/queryservice" querypb "vitess.io/vitess/go/vt/proto/query" @@ -50,14 +47,14 @@ import ( vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) -func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShardMapStr string, opts *Options) error { +func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, vSchemaStr, ksShardMapStr string, opts *Options) error { vte.explainTopo = &ExplainTopo{NumShards: opts.NumShards} - vte.explainTopo.TopoServer = memorytopo.NewServer(ctx, vtexplainCell) + vte.explainTopo.TopoServer = ts vte.healthCheck = discovery.NewFakeHealthCheck(nil) - resolver := vte.newFakeResolver(ctx, opts, vte.explainTopo, vtexplainCell) + resolver := vte.newFakeResolver(ctx, opts, vte.explainTopo, Cell) - err := vte.buildTopology(ctx, opts, vSchemaStr, ksShardMapStr, opts.NumShards) + err := vte.buildTopology(ctx, ts, opts, vSchemaStr, ksShardMapStr, opts.NumShards) if err != nil { return err } @@ -75,7 +72,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShar var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests queryLogBufferSize := 10 plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false) - vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0) + vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0) vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)) return nil @@ -95,7 +92,7 @@ func (vte *VTExplain) newFakeResolver(ctx context.Context, opts *Options, serv s return vtgate.NewResolver(srvResolver, serv, cell, sc) } -func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error { +func (vte *VTExplain) buildTopology(ctx context.Context, ts *topo.Server, opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error { vte.explainTopo.Lock.Lock() defer vte.explainTopo.Lock.Unlock() @@ -120,6 +117,10 @@ func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaS return err } + conn, err := ts.ConnForCell(ctx, Cell) + if err != nil { + return err + } vte.explainTopo.TabletConns = make(map[string]*explainTablet) vte.explainTopo.KeyspaceShards = make(map[string]map[string]*topodatapb.ShardReference) for ks, vschema := range vte.explainTopo.Keyspaces { @@ -130,6 +131,32 @@ func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaS vte.explainTopo.KeyspaceShards[ks] = make(map[string]*topodatapb.ShardReference) + srvPath := path.Join(topo.KeyspacesPath, ks, topo.SrvKeyspaceFile) + srvKeyspace := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ + { + ServedType: topodatapb.TabletType_PRIMARY, + ShardReferences: shards, + }, + { + ServedType: topodatapb.TabletType_REPLICA, + ShardReferences: shards, + }, + { + ServedType: topodatapb.TabletType_RDONLY, + ShardReferences: shards, + }, + }, + } + data, err := srvKeyspace.MarshalVT() + if err != nil { + return err + } + _, err = conn.Update(ctx, srvPath, data, nil) + if err != nil { + return err + } + for _, shard := range shards { // If the topology is in the middle of a reshard, there can be two shards covering the same key range (e.g. // both source shard 80- and target shard 80-c0 cover the keyrange 80-c0). For the purposes of explain, we @@ -142,14 +169,13 @@ func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaS hostname := fmt.Sprintf("%s/%s", ks, shard.Name) log.Infof("registering test tablet %s for keyspace %s shard %s", hostname, ks, shard.Name) - tablet := vte.healthCheck.AddFakeTablet(vtexplainCell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService { - return vte.newTablet(ctx, vte.env, opts, t) + tablet := vte.healthCheck.AddFakeTablet(Cell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService { + return vte.newTablet(ctx, vte.env, opts, t, ts) }) vte.explainTopo.TabletConns[hostname] = tablet.(*explainTablet) vte.explainTopo.KeyspaceShards[ks][shard.Name] = shard } } - return err } diff --git a/go/vt/vtexplain/vtexplain_vttablet.go b/go/vt/vtexplain/vtexplain_vttablet.go index 9ac81000f3d..b573fe29774 100644 --- a/go/vt/vtexplain/vtexplain_vttablet.go +++ b/go/vt/vtexplain/vtexplain_vttablet.go @@ -24,6 +24,7 @@ import ( "sync" "vitess.io/vitess/go/vt/sidecardb" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/mysql" @@ -35,7 +36,6 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -105,7 +105,7 @@ type explainTablet struct { var _ queryservice.QueryService = (*explainTablet)(nil) -func (vte *VTExplain) newTablet(ctx context.Context, env *vtenv.Environment, opts *Options, t *topodatapb.Tablet) *explainTablet { +func (vte *VTExplain) newTablet(ctx context.Context, env *vtenv.Environment, opts *Options, t *topodatapb.Tablet, ts *topo.Server) *explainTablet { db := fakesqldb.New(nil) sidecardb.AddSchemaInitQueries(db, true, env.Parser()) @@ -120,7 +120,7 @@ func (vte *VTExplain) newTablet(ctx context.Context, env *vtenv.Environment, opt config.EnableTableGC = false // XXX much of this is cloned from the tabletserver tests - tsv := tabletserver.NewTabletServer(ctx, env, topoproto.TabletAliasString(t.Alias), config, memorytopo.NewServer(ctx, ""), t.Alias) + tsv := tabletserver.NewTabletServer(ctx, env, topoproto.TabletAliasString(t.Alias), config, ts, t.Alias) tablet := explainTablet{db: db, tsv: tsv, vte: vte, collationEnv: env.CollationEnv()} db.Handler = &tablet diff --git a/go/vt/vtexplain/vtexplain_vttablet_test.go b/go/vt/vtexplain/vtexplain_vttablet_test.go index fc3785a907a..a4350316d82 100644 --- a/go/vt/vtexplain/vtexplain_vttablet_test.go +++ b/go/vt/vtexplain/vtexplain_vttablet_test.go @@ -20,12 +20,14 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" @@ -73,7 +75,8 @@ create table t2 ( ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vte, err := Init(ctx, vtenv.NewTestEnv(), testVSchema, testSchema, "", opts) + ts := memorytopo.NewServer(ctx, Cell) + vte, err := Init(ctx, vtenv.NewTestEnv(), ts, testVSchema, testSchema, "", opts) require.NoError(t, err) defer vte.Stop() @@ -132,17 +135,22 @@ create table test_partitioned ( ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vte := initTest(ctx, ModeMulti, defaultTestOpts(), &testopts{}, t) + ts := memorytopo.NewServer(ctx, Cell) + vte := initTest(ctx, ts, ModeMulti, defaultTestOpts(), &testopts{}, t) defer vte.Stop() tabletEnv, _ := newTabletEnvironment(ddls, defaultTestOpts(), env.CollationEnv()) vte.setGlobalTabletEnv(tabletEnv) tablet := vte.newTablet(ctx, env, defaultTestOpts(), &topodatapb.Tablet{ - Keyspace: "test_keyspace", + Keyspace: "ks_sharded", Shard: "-80", - Alias: &topodatapb.TabletAlias{}, - }) + Alias: &topodatapb.TabletAlias{ + Cell: Cell, + }, + }, ts) + + time.Sleep(10 * time.Millisecond) se := tablet.tsv.SchemaEngine() tables := se.GetSchema() diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index a451850a23f..bb2616c836b 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -357,7 +357,9 @@ func (throttler *Throttler) normalizeThrottlerConfig(throttlerConfig *topodatapb func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspace, err error) bool { if err != nil { - log.Errorf("WatchSrvKeyspaceCallback error: %v", err) + if !topo.IsErrType(err, topo.Interrupted) && !errors.Is(err, context.Canceled) { + log.Errorf("WatchSrvKeyspaceCallback error: %v", err) + } return false } throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig)