diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index a13205b70064..99de0c38763c 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydratedtables" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/distsql" @@ -325,6 +326,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { adminMemMetrics := sql.MakeMemMetrics("admin", cfg.HistogramWindowInterval()) cfg.registry.AddMetricStruct(adminMemMetrics) + hydratedTablesCache := hydratedtables.NewCache(cfg.Settings) + cfg.registry.AddMetricStruct(hydratedTablesCache.Metrics()) + // Set up the DistSQL server. distSQLCfg := execinfra.ServerConfig{ AmbientContext: cfg.AmbientCtx, @@ -370,7 +374,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { ExternalStorage: cfg.externalStorage, ExternalStorageFromURI: cfg.externalStorageFromURI, - RangeCache: cfg.distSender.RangeDescriptorCache(), + RangeCache: cfg.distSender.RangeDescriptorCache(), + HydratedTables: hydratedTablesCache, } cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist) if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil { @@ -506,6 +511,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { QueryCache: querycache.New(cfg.QueryCacheSize), ProtectedTimestampProvider: cfg.protectedtsProvider, ExternalIODirConfig: cfg.ExternalIODirConfig, + HydratedTables: hydratedTablesCache, } cfg.stopper.AddCloser(execCfg.ExecLogger) diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 3ad8ea226d96..1bfea95458d0 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -580,7 +580,7 @@ func (sc *SchemaChanger) validateConstraints( evalCtx.Txn = txn // Use the DistSQLTypeResolver because we need to resolve types by ID. semaCtx := tree.MakeSemaContext() - collection := descs.NewCollection(ctx, sc.settings, sc.leaseMgr) + collection := descs.NewCollection(ctx, sc.settings, sc.leaseMgr, nil /* hydratedTables */) semaCtx.TypeResolver = descs.NewDistSQLTypeResolver(collection, txn) // TODO (rohany): When to release this? As of now this is only going to get released // after the check is validated. @@ -723,7 +723,7 @@ func (sc *SchemaChanger) truncateIndexes( } // Retrieve a lease for this table inside the current txn. - tc := descs.NewCollection(ctx, sc.settings, sc.leaseMgr) + tc := descs.NewCollection(ctx, sc.settings, sc.leaseMgr, nil /* hydratedTables */) defer tc.ReleaseAll(ctx) tableDesc, err := sc.getTableVersion(ctx, txn, tc, version) if err != nil { @@ -907,7 +907,7 @@ func (sc *SchemaChanger) distBackfill( } } - tc := descs.NewCollection(ctx, sc.settings, sc.leaseMgr) + tc := descs.NewCollection(ctx, sc.settings, sc.leaseMgr, nil /* hydratedTables */) // Use a leased table descriptor for the backfill. defer tc.ReleaseAll(ctx) tableDesc, err := sc.getTableVersion(ctx, txn, tc, version) @@ -1265,7 +1265,7 @@ func (sc *SchemaChanger) validateForwardIndexes( if err != nil { return err } - tc := descs.NewCollection(ctx, sc.settings, sc.leaseMgr) + tc := descs.NewCollection(ctx, sc.settings, sc.leaseMgr, nil /* hydratedTables */) // pretend that the schema has been modified. if err := tc.AddUncommittedDescriptor(desc); err != nil { return err @@ -1341,7 +1341,7 @@ func (sc *SchemaChanger) validateForwardIndexes( return err } - tc := descs.NewCollection(ctx, sc.settings, sc.leaseMgr) + tc := descs.NewCollection(ctx, sc.settings, sc.leaseMgr, nil /* hydratedTables */) if err := tc.AddUncommittedDescriptor(desc); err != nil { return err } @@ -1729,7 +1729,7 @@ func validateCheckInTxn( ) error { ie := evalCtx.InternalExecutor.(*InternalExecutor) if tableDesc.Version > tableDesc.ClusterVersion.Version { - newTc := descs.NewCollection(ctx, evalCtx.Settings, leaseMgr) + newTc := descs.NewCollection(ctx, evalCtx.Settings, leaseMgr, nil /* hydratedTables */) // pretend that the schema has been modified. if err := newTc.AddUncommittedDescriptor(tableDesc); err != nil { return err @@ -1770,7 +1770,7 @@ func validateFkInTxn( ) error { ie := evalCtx.InternalExecutor.(*InternalExecutor) if tableDesc.Version > tableDesc.ClusterVersion.Version { - newTc := descs.NewCollection(ctx, evalCtx.Settings, leaseMgr) + newTc := descs.NewCollection(ctx, evalCtx.Settings, leaseMgr, nil /* hydratedTables */) // pretend that the schema has been modified. if err := newTc.AddUncommittedDescriptor(tableDesc); err != nil { return err diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 2c718c621ae7..c3bd52a4eead 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/database" "github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydratedtables" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc" @@ -141,6 +142,7 @@ func MakeCollection( dbCache *database.Cache, dbCacheSubscriber DatabaseCacheSubscriber, sessionData *sessiondata.SessionData, + hydratedTables *hydratedtables.Cache, ) Collection { // For testing. databaseLeasingUnsupported := false @@ -154,14 +156,26 @@ func MakeCollection( dbCacheSubscriber: dbCacheSubscriber, databaseLeasingUnsupported: databaseLeasingUnsupported, sessionData: sessionData, + hydratedTables: hydratedTables, } } // NewCollection constructs a new *Collection. func NewCollection( - ctx context.Context, settings *cluster.Settings, leaseMgr *lease.Manager, + ctx context.Context, + settings *cluster.Settings, + leaseMgr *lease.Manager, + hydratedTables *hydratedtables.Cache, ) *Collection { - tc := MakeCollection(ctx, leaseMgr, settings, nil, nil, nil) + tc := MakeCollection( + ctx, + leaseMgr, + settings, + nil, /* dbCache */ + nil, /* dbCache */ + nil, /* sessionData */ + hydratedTables, + ) return &tc } @@ -236,6 +250,10 @@ type Collection struct { // true in mixed-version 20.1/20.2 clusters, but it remains constant for the // duration of each use of the Collection. databaseLeasingUnsupported bool + + // hydratedTables is node-level cache of table descriptors which utlize + // user-defined types. + hydratedTables *hydratedtables.Cache } // getLeasedDescriptorByName return a leased descriptor valid for the @@ -1057,12 +1075,9 @@ func (tc *Collection) hydrateTypesInTableDesc( return desc, nil } - // TODO (rohany, ajwerner): Here we would look into the cached set of - // hydrated table descriptors and potentially return without having to - // make a copy. However, we could avoid hitting the cache if any of the - // user defined types have been modified in this transaction. - - getType := func(ctx context.Context, id descpb.ID) (tree.TypeName, catalog.TypeDescriptor, error) { + getType := typedesc.TypeLookupFunc(func( + ctx context.Context, id descpb.ID, + ) (tree.TypeName, catalog.TypeDescriptor, error) { desc, err := tc.GetTypeVersionByID(ctx, txn, id, tree.ObjectLookupFlagsWithRequired()) if err != nil { return tree.TypeName{}, nil, err @@ -1078,11 +1093,27 @@ func (tc *Collection) hydrateTypesInTableDesc( } name := tree.MakeNewQualifiedTypeName(dbDesc.Name, sc.Name, desc.Name) return name, desc, nil + }) + + // Utilize the cache of hydrated tables if we have one. + if tc.hydratedTables != nil { + hydrated, err := tc.hydratedTables.GetHydratedTableDescriptor(ctx, t, getType) + if err != nil { + return nil, err + } + if hydrated != nil { + return hydrated, nil + } + // The cache decided not to give back a hydrated descriptor, likely + // because either we've modified the table or one of the types or because + // this transaction has a stale view of one of the relevant descriptors. + // Proceed to hydrating a fresh copy. } + // TODO(ajwerner): Propagate the IsModified status here. // Make a copy of the underlying descriptor before hydration. descBase := protoutil.Clone(t.TableDesc()).(*descpb.TableDescriptor) - if err := typedesc.HydrateTypesInTableDescriptor(ctx, descBase, typedesc.TypeLookupFunc(getType)); err != nil { + if err := typedesc.HydrateTypesInTableDescriptor(ctx, descBase, getType); err != nil { return nil, err } return tabledesc.NewImmutable(*descBase), nil diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 4d7bfd273817..f4b56c60e2cd 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -656,7 +656,7 @@ func (s *Server) newConnExecutor( } ex.extraTxnState.prepStmtsNamespaceMemAcc = ex.sessionMon.MakeBoundAccount() ex.extraTxnState.descCollection = descs.MakeCollection(ctx, s.cfg.LeaseManager, - s.cfg.Settings, s.dbCache.getDatabaseCache(), s.dbCache, sd) + s.cfg.Settings, s.dbCache.getDatabaseCache(), s.dbCache, sd, s.cfg.HydratedTables) ex.extraTxnState.txnRewindPos = -1 ex.extraTxnState.schemaChangeJobsCache = make(map[descpb.ID]*jobs.Job) ex.mu.ActiveQueries = make(map[ClusterWideID]*queryMeta) diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 74e94230a4fa..35a3416cf38e 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -439,7 +439,9 @@ func (ds *ServerImpl) NewFlowContext( // If we weren't passed a descs.Collection, then make a new one. We are // responsible for cleaning it up and releasing any accessed descriptors // on flow cleanup. - collection := descs.NewCollection(ctx, ds.ServerConfig.Settings, ds.ServerConfig.LeaseManager.(*lease.Manager)) + collection := descs.NewCollection(ctx, ds.ServerConfig.Settings, + ds.ServerConfig.LeaseManager.(*lease.Manager), + ds.ServerConfig.HydratedTables) flowCtx.TypeResolverFactory = &descs.DistSQLTypeResolverFactory{ Descriptors: collection, CleanupFunc: func(ctx context.Context) { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 5b8a8d8f78e6..57399c1fc403 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/database" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydratedtables" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/distsql" @@ -701,6 +702,10 @@ type ExecutorConfig struct { StmtDiagnosticsRecorder *stmtdiagnostics.Registry ExternalIODirConfig base.ExternalIODirConfig + + // HydratedTables is a node-level cache of table descriptors which utilize + // user-defined types. + HydratedTables *hydratedtables.Cache } // Organization returns the value of cluster.organization. diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index d71bda1a2b74..178043473ee4 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydratedtables" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -170,6 +171,10 @@ type ServerConfig struct { // processors query the cache to see if they should communicate updates to the // gateway. RangeCache *kvcoord.RangeDescriptorCache + + // HydratedTables is a node-level cache of table descriptors which utilize + // user-defined types. + HydratedTables *hydratedtables.Cache } // RuntimeStats is an interface through which the rowexec layer can get diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index ef325fa2754f..5b5ba2c6ebb0 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -269,7 +269,10 @@ func newInternalPlanner( // The table collection used by the internal planner does not rely on the // deprecatedDatabaseCache and there are no subscribers to the // deprecatedDatabaseCache, so we can leave it uninitialized. - tables := descs.NewCollection(ctx, execCfg.Settings, execCfg.LeaseManager) + // Furthermore, we're not concerned about the efficiency of querying tables + // with user-defined types, hence the nil hydratedTables. + tables := descs.NewCollection(ctx, execCfg.Settings, execCfg.LeaseManager, + nil /* hydratedTables */) dataMutator := &sessionDataMutator{ data: sd, defaults: SessionDefaults(map[string]string{ diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index 82665a372855..38776ae106b4 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -423,7 +423,8 @@ func (sc *TableStatisticsCache) parseStats( // will need to start writing a timestamp on the stats objects and request // TypeDescriptor's with the timestamp that the stats were recorded with. err := sc.ClientDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - collection := descs.NewCollection(ctx, sc.Settings, sc.LeaseMgr) + collection := descs.NewCollection(ctx, sc.Settings, sc.LeaseMgr, + nil /* hydratedTables */) defer collection.ReleaseAll(ctx) resolver := descs.NewDistSQLTypeResolver(collection, txn) var err error