diff --git a/pkg/server/server.go b/pkg/server/server.go index 42cd43db63a6..ef85569e0170 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -353,19 +353,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { s.internalMemMetrics = sql.MakeMemMetrics("internal", cfg.HistogramWindowInterval()) s.registry.AddMetricStruct(s.internalMemMetrics) - // Set up Lease Manager - var lmKnobs sql.LeaseManagerTestingKnobs - if leaseManagerTestingKnobs := cfg.TestingKnobs.SQLLeaseManager; leaseManagerTestingKnobs != nil { - lmKnobs = *leaseManagerTestingKnobs.(*sql.LeaseManagerTestingKnobs) - } - s.leaseMgr = sql.NewLeaseManager( - s.cfg.AmbientCtx, - nil, /* execCfg - will be set later because of circular dependencies */ - lmKnobs, - s.stopper, - s.cfg.LeaseManagerConfig, - ) - // We do not set memory monitors or a noteworthy limit because the children of // this monitor will be setting their own noteworthy limits. rootSQLMemoryMonitor := mon.MakeMonitor( @@ -514,6 +501,23 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { distSQLMetrics := distsqlrun.MakeDistSQLMetrics(cfg.HistogramWindowInterval()) s.registry.AddMetricStruct(distSQLMetrics) + // Set up Lease Manager + var lmKnobs sql.LeaseManagerTestingKnobs + if leaseManagerTestingKnobs := cfg.TestingKnobs.SQLLeaseManager; leaseManagerTestingKnobs != nil { + lmKnobs = *leaseManagerTestingKnobs.(*sql.LeaseManagerTestingKnobs) + } + s.leaseMgr = sql.NewLeaseManager( + s.cfg.AmbientCtx, + &s.nodeIDContainer, + s.db, + s.clock, + nil, /* internalExecutor - will be set later because of circular dependencies */ + st, + lmKnobs, + s.stopper, + s.cfg.LeaseManagerConfig, + ) + // Set up the DistSQL server. distSQLCfg := distsqlrun.ServerConfig{ AmbientContext: s.cfg.AmbientCtx, @@ -724,7 +728,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { s.execCfg = &execCfg - s.leaseMgr.SetExecCfg(&execCfg) + s.leaseMgr.SetInternalExecutor(execCfg.InternalExecutor) s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip) s.leaseMgr.PeriodicallyRefreshSomeLeases() diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 95f76ae8ae74..63526fd0595c 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -376,7 +376,7 @@ CREATE TABLE crdb_internal.leases ( )`, populate: func(ctx context.Context, p *planner, _ *DatabaseDescriptor, addRow func(...tree.Datum) error) error { leaseMgr := p.LeaseMgr() - nodeID := tree.NewDInt(tree.DInt(int64(leaseMgr.execCfg.NodeID.Get()))) + nodeID := tree.NewDInt(tree.DInt(int64(leaseMgr.nodeIDContainer.Get()))) leaseMgr.mu.Lock() defer leaseMgr.mu.Unlock() @@ -605,7 +605,7 @@ CREATE TABLE crdb_internal.node_statement_statistics ( } leaseMgr := p.LeaseMgr() - nodeID := tree.NewDInt(tree.DInt(int64(leaseMgr.execCfg.NodeID.Get()))) + nodeID := tree.NewDInt(tree.DInt(int64(leaseMgr.nodeIDContainer.Get()))) // Retrieve the application names and sort them to ensure the // output is deterministic. diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index 8f40c429037d..400e585c1b1d 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -32,9 +32,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -138,7 +140,11 @@ func storedLeaseExpiration(expiration hlc.Timestamp) tree.DTimestamp { // LeaseStore implements the operations for acquiring and releasing leases and // publishing a new version of a descriptor. Exported only for testing. type LeaseStore struct { - execCfg *ExecutorConfig + nodeIDContainer *base.NodeIDContainer + db *client.DB + clock *hlc.Clock + internalExecutor sqlutil.InternalExecutor + settings *cluster.Settings // group is used for all calls made to acquireNodeLease to prevent // concurrent lease acquisitions from the store. @@ -176,7 +182,7 @@ func (s LeaseStore) acquire( ctx context.Context, minExpiration hlc.Timestamp, tableID sqlbase.ID, ) (*tableVersionState, error) { var table *tableVersionState - err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { expiration := txn.OrigTimestamp() expiration.WallTime += int64(s.jitteredLeaseDuration()) if !minExpiration.Less(expiration) { @@ -209,11 +215,11 @@ func (s LeaseStore) acquire( // ValidateTable instead of Validate, even though we have a txn available, // so we don't block reads waiting for this table version. - if err := table.ValidateTable(s.execCfg.Settings); err != nil { + if err := table.ValidateTable(s.settings); err != nil { return err } - nodeID := s.execCfg.NodeID.Get() + nodeID := s.nodeIDContainer.Get() if nodeID == 0 { panic("zero nodeID") } @@ -228,7 +234,7 @@ func (s LeaseStore) acquire( `INSERT INTO system.public.lease ("descID", version, "nodeID", expiration) VALUES (%d, %d, %d, %s)`, storedLease.id, storedLease.version, nodeID, &storedLease.expiration, ) - count, err := s.execCfg.InternalExecutor.Exec(ctx, "lease-insert", txn, insertLease) + count, err := s.internalExecutor.Exec(ctx, "lease-insert", txn, insertLease) if err != nil { return err } @@ -255,13 +261,13 @@ func (s LeaseStore) release(ctx context.Context, stopper *stop.Stopper, lease *s // NodeUnavailableErrors. for r := retry.Start(retryOptions); r.Next(); { log.VEventf(ctx, 2, "LeaseStore releasing lease %+v", lease) - nodeID := s.execCfg.NodeID.Get() + nodeID := s.nodeIDContainer.Get() if nodeID == 0 { panic("zero nodeID") } const deleteLease = `DELETE FROM system.public.lease ` + `WHERE ("descID", version, "nodeID", expiration) = ($1, $2, $3, $4)` - count, err := s.execCfg.InternalExecutor.Exec( + count, err := s.internalExecutor.Exec( ctx, "lease-release", nil, /* txn */ @@ -303,7 +309,7 @@ func (s LeaseStore) WaitForOneVersion( // Get the current version of the table descriptor non-transactionally. // // TODO(pmattis): Do an inconsistent read here? - if err := s.execCfg.DB.GetProto(ctx, descKey, desc); err != nil { + if err := s.db.GetProto(ctx, descKey, desc); err != nil { return 0, err } tableDesc = desc.GetTable() @@ -312,9 +318,9 @@ func (s LeaseStore) WaitForOneVersion( } // Check to see if there are any leases that still exist on the previous // version of the descriptor. - now := s.execCfg.Clock.Now() + now := s.clock.Now() tables := []IDVersion{NewIDVersionPrev(tableDesc)} - count, err := CountLeases(ctx, s.execCfg.InternalExecutor, tables, now) + count, err := CountLeases(ctx, s.internalExecutor, tables, now) if err != nil { return 0, err } @@ -360,7 +366,7 @@ func (s LeaseStore) Publish( var tableDesc *sqlbase.MutableTableDescriptor // There should be only one version of the descriptor, but it's // a race now to update to the next version. - err = s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + err = s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { // Re-read the current version of the table descriptor, this time // transactionally. var err error @@ -391,7 +397,7 @@ func (s LeaseStore) Publish( if err := tableDesc.MaybeIncrementVersion(ctx, txn); err != nil { return err } - if err := tableDesc.ValidateTable(s.execCfg.Settings); err != nil { + if err := tableDesc.ValidateTable(s.settings); err != nil { return err } @@ -451,7 +457,7 @@ func NewIDVersionPrev(desc *sqlbase.TableDescriptor) IDVersion { // CountLeases returns the number of unexpired leases for a number of tables // each at a particular version at a particular time. func CountLeases( - ctx context.Context, executor *InternalExecutor, tables []IDVersion, at hlc.Timestamp, + ctx context.Context, executor sqlutil.InternalExecutor, tables []IDVersion, at hlc.Timestamp, ) (int, error) { var whereClauses []string for _, t := range tables { @@ -486,7 +492,7 @@ func (s LeaseStore) getForExpiration( ctx context.Context, expiration hlc.Timestamp, id sqlbase.ID, ) (*tableVersionState, error) { var table *tableVersionState - err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { descKey := sqlbase.MakeDescMetadataKey(id) prevTimestamp := expiration.Prev() txn.SetFixedTimestamp(ctx, prevTimestamp) @@ -1070,7 +1076,7 @@ func purgeOldVersions( // Acquire a refcount on the table on the latest version to maintain an // active lease, so that it doesn't get released when removeInactives() // is called below. Release this lease after calling removeInactives(). - table, _, err := t.findForTimestamp(ctx, m.execCfg.Clock.Now()) + table, _, err := t.findForTimestamp(ctx, m.clock.Now()) if dropped := err == errTableDropped; dropped || err == nil { removeInactives(dropped) if table != nil { @@ -1313,20 +1319,28 @@ type LeaseManager struct { // NewLeaseManager creates a new LeaseManager. // -// execCfg can be nil to help bootstrapping, but then it needs to be set via -// SetExecCfg before the LeaseManager is used. +// internalExecutor can be nil to help bootstrapping, but then it needs to be set via +// SetInternalExecutor before the LeaseManager is used. // // stopper is used to run async tasks. Can be nil in tests. func NewLeaseManager( ambientCtx log.AmbientContext, - execCfg *ExecutorConfig, + nodeIDContainer *base.NodeIDContainer, + db *client.DB, + clock *hlc.Clock, + internalExecutor sqlutil.InternalExecutor, + settings *cluster.Settings, testingKnobs LeaseManagerTestingKnobs, stopper *stop.Stopper, cfg *base.LeaseManagerConfig, ) *LeaseManager { lm := &LeaseManager{ LeaseStore: LeaseStore{ - execCfg: execCfg, + nodeIDContainer: nodeIDContainer, + db: db, + clock: clock, + internalExecutor: internalExecutor, + settings: settings, group: &singleflight.Group{}, leaseDuration: cfg.TableDescriptorLeaseDuration, leaseJitterFraction: cfg.TableDescriptorLeaseJitterFraction, @@ -1347,9 +1361,9 @@ func NewLeaseManager( return lm } -// SetExecCfg has to be called if a nil execCfg was passed to NewLeaseManager. -func (m *LeaseManager) SetExecCfg(execCfg *ExecutorConfig) { - m.execCfg = execCfg +// SetInternalExecutor has to be called if a nil execCfg was passed to NewLeaseManager. +func (m *LeaseManager) SetInternalExecutor(executor sqlutil.InternalExecutor) { + m.internalExecutor = executor } func nameMatchesTable( @@ -1491,7 +1505,7 @@ func (m *LeaseManager) resolveName( ) (sqlbase.ID, error) { key := sqlbase.NewTableKey(dbID, tableName).Key() id := sqlbase.InvalidID - if err := m.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + if err := m.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { txn.SetFixedTimestamp(ctx, timestamp) gr, err := txn.Get(ctx, key) if err != nil { @@ -1664,7 +1678,7 @@ func (m *LeaseManager) RefreshLeases(s *stop.Stopper, db *client.DB, g *gossip.G case *sqlbase.Descriptor_Table: table := union.Table table.MaybeFillInDescriptor() - if err := table.ValidateTable(m.execCfg.Settings); err != nil { + if err := table.ValidateTable(m.settings); err != nil { log.Errorf(ctx, "%s: received invalid table descriptor: %s. Desc: %v", kv.Key, err, table, ) @@ -1731,7 +1745,7 @@ func (m *LeaseManager) PeriodicallyRefreshSomeLeases() { // Refresh some of the current leases. func (m *LeaseManager) refreshSomeLeases(ctx context.Context) { - limit := tableLeaseRefreshLimit.Get(&m.execCfg.Settings.SV) + limit := tableLeaseRefreshLimit.Get(&m.settings.SV) if limit <= 0 { return } @@ -1778,7 +1792,7 @@ func (m *LeaseManager) DeleteOrphanedLeases(timeThreshold int64) { if m.testingKnobs.DisableDeleteOrphanedLeases { return } - nodeID := m.LeaseStore.execCfg.NodeID.Get() + nodeID := m.LeaseStore.nodeIDContainer.Get() if nodeID == 0 { panic("zero nodeID") } @@ -1799,7 +1813,7 @@ SELECT "descID", version, expiration FROM system.public.lease AS OF SYSTEM TIME // The retry is required because of errors caused by node restarts. Retry 30 times. if err := retry.WithMaxAttempts(ctx, retryOptions, 30, func() error { var err error - rows, err = m.LeaseStore.execCfg.InternalExecutor.Query( + rows, err = m.LeaseStore.internalExecutor.Query( ctx, "read orphaned table leases", nil /*txn*/, sqlQuery) return err }); err != nil { diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index 88d7b2496313..311743279b86 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -478,7 +478,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Populate the name cache. ctx := context.TODO() table, _, err := leaseManager.AcquireByName( - ctx, leaseManager.execCfg.Clock.Now(), tableDesc.ParentID, "test") + ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test") if err != nil { t.Fatal(err) } @@ -500,7 +500,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); }() for i := 0; i < 50; i++ { - timestamp := leaseManager.execCfg.Clock.Now() + timestamp := leaseManager.clock.Now() ctx := context.TODO() table, _, err := leaseManager.AcquireByName(ctx, timestamp, tableDesc.ParentID, "test") if err != nil { @@ -685,7 +685,7 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { m *LeaseManager, acquireChan chan Result, ) { - table, e, err := m.Acquire(ctx, m.execCfg.Clock.Now(), descID) + table, e, err := m.Acquire(ctx, m.clock.Now(), descID) acquireChan <- Result{err: err, exp: e, table: table} } diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index 18eef47558d7..d23ac942daef 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -206,7 +206,11 @@ func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager { cfgCpy.NodeInfo.NodeID = nc mgr = sql.NewLeaseManager( log.AmbientContext{Tracer: tracing.NewTracer()}, - &cfgCpy, + nc, + cfgCpy.DB, + cfgCpy.Clock, + cfgCpy.InternalExecutor, + cfgCpy.Settings, t.leaseManagerTestingKnobs, t.server.Stopper(), t.cfg, diff --git a/pkg/sql/physical_schema_accessors.go b/pkg/sql/physical_schema_accessors.go index 43975a873564..f2cfa31605d0 100644 --- a/pkg/sql/physical_schema_accessors.go +++ b/pkg/sql/physical_schema_accessors.go @@ -215,7 +215,7 @@ func (a *CachedPhysicalAccessor) GetDatabaseDesc( // The database was not known in the uncommitted list. Have the db // cache look it up by name for us. return a.tc.databaseCache.getDatabaseDesc(ctx, - a.tc.leaseMgr.execCfg.DB.Txn, name, flags.required) + a.tc.leaseMgr.db.Txn, name, flags.required) } // We avoided the cache. Go lower. diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index c2038858225e..542e3bf3503f 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -204,7 +204,11 @@ func TestSchemaChangeProcess(t *testing.T) { execCfg := s.ExecutorConfig().(sql.ExecutorConfig) leaseMgr := sql.NewLeaseManager( log.AmbientContext{Tracer: tracing.NewTracer()}, - &execCfg, + execCfg.NodeID, + execCfg.DB, + execCfg.Clock, + execCfg.InternalExecutor, + execCfg.Settings, sql.LeaseManagerTestingKnobs{}, stopper, cfg, diff --git a/pkg/sql/table.go b/pkg/sql/table.go index c148352ad29c..78fc3f73bc5c 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -162,7 +162,7 @@ func (tc *TableCollection) getMutableTableDescriptor( // Resolve the database from the database cache when the transaction // hasn't modified the database. dbID, err = tc.databaseCache.getDatabaseID(ctx, - tc.leaseMgr.execCfg.DB.Txn, tn.Catalog(), flags.required) + tc.leaseMgr.db.Txn, tn.Catalog(), flags.required) if err != nil || dbID == sqlbase.InvalidID { // dbID can still be invalid if required is false and the database is not found. return nil, err @@ -218,7 +218,7 @@ func (tc *TableCollection) getTableVersion( // Resolve the database from the database cache when the transaction // hasn't modified the database. dbID, err = tc.databaseCache.getDatabaseID(ctx, - tc.leaseMgr.execCfg.DB.Txn, tn.Catalog(), flags.required) + tc.leaseMgr.db.Txn, tn.Catalog(), flags.required) if err != nil || dbID == sqlbase.InvalidID { // dbID can still be invalid if required is false and the database is not found. return nil, err diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index ecd6b67ef878..8b6ecf5f24f4 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -16,8 +16,8 @@ package sql import ( "context" - "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys"