Skip to content

Commit

Permalink
sql: remove LeaseManager dependency on ExecutorConfig
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
vivekmenezes committed May 16, 2019
1 parent a6f98a7 commit ed4bffc
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 52 deletions.
32 changes: 18 additions & 14 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,19 +353,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.internalMemMetrics = sql.MakeMemMetrics("internal", cfg.HistogramWindowInterval())
s.registry.AddMetricStruct(s.internalMemMetrics)

// Set up Lease Manager
var lmKnobs sql.LeaseManagerTestingKnobs
if leaseManagerTestingKnobs := cfg.TestingKnobs.SQLLeaseManager; leaseManagerTestingKnobs != nil {
lmKnobs = *leaseManagerTestingKnobs.(*sql.LeaseManagerTestingKnobs)
}
s.leaseMgr = sql.NewLeaseManager(
s.cfg.AmbientCtx,
nil, /* execCfg - will be set later because of circular dependencies */
lmKnobs,
s.stopper,
s.cfg.LeaseManagerConfig,
)

// We do not set memory monitors or a noteworthy limit because the children of
// this monitor will be setting their own noteworthy limits.
rootSQLMemoryMonitor := mon.MakeMonitor(
Expand Down Expand Up @@ -514,6 +501,23 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
distSQLMetrics := distsqlrun.MakeDistSQLMetrics(cfg.HistogramWindowInterval())
s.registry.AddMetricStruct(distSQLMetrics)

// Set up Lease Manager
var lmKnobs sql.LeaseManagerTestingKnobs
if leaseManagerTestingKnobs := cfg.TestingKnobs.SQLLeaseManager; leaseManagerTestingKnobs != nil {
lmKnobs = *leaseManagerTestingKnobs.(*sql.LeaseManagerTestingKnobs)
}
s.leaseMgr = sql.NewLeaseManager(
s.cfg.AmbientCtx,
&s.nodeIDContainer,
s.db,
s.clock,
nil, /* internalExecutor - will be set later because of circular dependencies */
st,
lmKnobs,
s.stopper,
s.cfg.LeaseManagerConfig,
)

// Set up the DistSQL server.
distSQLCfg := distsqlrun.ServerConfig{
AmbientContext: s.cfg.AmbientCtx,
Expand Down Expand Up @@ -724,7 +728,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

s.execCfg = &execCfg

s.leaseMgr.SetExecCfg(&execCfg)
s.leaseMgr.SetInternalExecutor(execCfg.InternalExecutor)
s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)
s.leaseMgr.PeriodicallyRefreshSomeLeases()

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ CREATE TABLE crdb_internal.leases (
)`,
populate: func(ctx context.Context, p *planner, _ *DatabaseDescriptor, addRow func(...tree.Datum) error) error {
leaseMgr := p.LeaseMgr()
nodeID := tree.NewDInt(tree.DInt(int64(leaseMgr.execCfg.NodeID.Get())))
nodeID := tree.NewDInt(tree.DInt(int64(leaseMgr.nodeIDContainer.Get())))

leaseMgr.mu.Lock()
defer leaseMgr.mu.Unlock()
Expand Down Expand Up @@ -605,7 +605,7 @@ CREATE TABLE crdb_internal.node_statement_statistics (
}

leaseMgr := p.LeaseMgr()
nodeID := tree.NewDInt(tree.DInt(int64(leaseMgr.execCfg.NodeID.Get())))
nodeID := tree.NewDInt(tree.DInt(int64(leaseMgr.nodeIDContainer.Get())))

// Retrieve the application names and sort them to ensure the
// output is deterministic.
Expand Down
68 changes: 41 additions & 27 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -138,7 +140,11 @@ func storedLeaseExpiration(expiration hlc.Timestamp) tree.DTimestamp {
// LeaseStore implements the operations for acquiring and releasing leases and
// publishing a new version of a descriptor. Exported only for testing.
type LeaseStore struct {
execCfg *ExecutorConfig
nodeIDContainer *base.NodeIDContainer
db *client.DB
clock *hlc.Clock
internalExecutor sqlutil.InternalExecutor
settings *cluster.Settings

// group is used for all calls made to acquireNodeLease to prevent
// concurrent lease acquisitions from the store.
Expand Down Expand Up @@ -176,7 +182,7 @@ func (s LeaseStore) acquire(
ctx context.Context, minExpiration hlc.Timestamp, tableID sqlbase.ID,
) (*tableVersionState, error) {
var table *tableVersionState
err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
expiration := txn.OrigTimestamp()
expiration.WallTime += int64(s.jitteredLeaseDuration())
if !minExpiration.Less(expiration) {
Expand Down Expand Up @@ -209,11 +215,11 @@ func (s LeaseStore) acquire(

// ValidateTable instead of Validate, even though we have a txn available,
// so we don't block reads waiting for this table version.
if err := table.ValidateTable(s.execCfg.Settings); err != nil {
if err := table.ValidateTable(s.settings); err != nil {
return err
}

nodeID := s.execCfg.NodeID.Get()
nodeID := s.nodeIDContainer.Get()
if nodeID == 0 {
panic("zero nodeID")
}
Expand All @@ -228,7 +234,7 @@ func (s LeaseStore) acquire(
`INSERT INTO system.public.lease ("descID", version, "nodeID", expiration) VALUES (%d, %d, %d, %s)`,
storedLease.id, storedLease.version, nodeID, &storedLease.expiration,
)
count, err := s.execCfg.InternalExecutor.Exec(ctx, "lease-insert", txn, insertLease)
count, err := s.internalExecutor.Exec(ctx, "lease-insert", txn, insertLease)
if err != nil {
return err
}
Expand All @@ -255,13 +261,13 @@ func (s LeaseStore) release(ctx context.Context, stopper *stop.Stopper, lease *s
// NodeUnavailableErrors.
for r := retry.Start(retryOptions); r.Next(); {
log.VEventf(ctx, 2, "LeaseStore releasing lease %+v", lease)
nodeID := s.execCfg.NodeID.Get()
nodeID := s.nodeIDContainer.Get()
if nodeID == 0 {
panic("zero nodeID")
}
const deleteLease = `DELETE FROM system.public.lease ` +
`WHERE ("descID", version, "nodeID", expiration) = ($1, $2, $3, $4)`
count, err := s.execCfg.InternalExecutor.Exec(
count, err := s.internalExecutor.Exec(
ctx,
"lease-release",
nil, /* txn */
Expand Down Expand Up @@ -303,7 +309,7 @@ func (s LeaseStore) WaitForOneVersion(
// Get the current version of the table descriptor non-transactionally.
//
// TODO(pmattis): Do an inconsistent read here?
if err := s.execCfg.DB.GetProto(ctx, descKey, desc); err != nil {
if err := s.db.GetProto(ctx, descKey, desc); err != nil {
return 0, err
}
tableDesc = desc.GetTable()
Expand All @@ -312,9 +318,9 @@ func (s LeaseStore) WaitForOneVersion(
}
// Check to see if there are any leases that still exist on the previous
// version of the descriptor.
now := s.execCfg.Clock.Now()
now := s.clock.Now()
tables := []IDVersion{NewIDVersionPrev(tableDesc)}
count, err := CountLeases(ctx, s.execCfg.InternalExecutor, tables, now)
count, err := CountLeases(ctx, s.internalExecutor, tables, now)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -360,7 +366,7 @@ func (s LeaseStore) Publish(
var tableDesc *sqlbase.MutableTableDescriptor
// There should be only one version of the descriptor, but it's
// a race now to update to the next version.
err = s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err = s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
// Re-read the current version of the table descriptor, this time
// transactionally.
var err error
Expand Down Expand Up @@ -391,7 +397,7 @@ func (s LeaseStore) Publish(
if err := tableDesc.MaybeIncrementVersion(ctx, txn); err != nil {
return err
}
if err := tableDesc.ValidateTable(s.execCfg.Settings); err != nil {
if err := tableDesc.ValidateTable(s.settings); err != nil {
return err
}

Expand Down Expand Up @@ -451,7 +457,7 @@ func NewIDVersionPrev(desc *sqlbase.TableDescriptor) IDVersion {
// CountLeases returns the number of unexpired leases for a number of tables
// each at a particular version at a particular time.
func CountLeases(
ctx context.Context, executor *InternalExecutor, tables []IDVersion, at hlc.Timestamp,
ctx context.Context, executor sqlutil.InternalExecutor, tables []IDVersion, at hlc.Timestamp,
) (int, error) {
var whereClauses []string
for _, t := range tables {
Expand Down Expand Up @@ -486,7 +492,7 @@ func (s LeaseStore) getForExpiration(
ctx context.Context, expiration hlc.Timestamp, id sqlbase.ID,
) (*tableVersionState, error) {
var table *tableVersionState
err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
descKey := sqlbase.MakeDescMetadataKey(id)
prevTimestamp := expiration.Prev()
txn.SetFixedTimestamp(ctx, prevTimestamp)
Expand Down Expand Up @@ -1070,7 +1076,7 @@ func purgeOldVersions(
// Acquire a refcount on the table on the latest version to maintain an
// active lease, so that it doesn't get released when removeInactives()
// is called below. Release this lease after calling removeInactives().
table, _, err := t.findForTimestamp(ctx, m.execCfg.Clock.Now())
table, _, err := t.findForTimestamp(ctx, m.clock.Now())
if dropped := err == errTableDropped; dropped || err == nil {
removeInactives(dropped)
if table != nil {
Expand Down Expand Up @@ -1313,20 +1319,28 @@ type LeaseManager struct {

// NewLeaseManager creates a new LeaseManager.
//
// execCfg can be nil to help bootstrapping, but then it needs to be set via
// SetExecCfg before the LeaseManager is used.
// internalExecutor can be nil to help bootstrapping, but then it needs to be set via
// SetInternalExecutor before the LeaseManager is used.
//
// stopper is used to run async tasks. Can be nil in tests.
func NewLeaseManager(
ambientCtx log.AmbientContext,
execCfg *ExecutorConfig,
nodeIDContainer *base.NodeIDContainer,
db *client.DB,
clock *hlc.Clock,
internalExecutor sqlutil.InternalExecutor,
settings *cluster.Settings,
testingKnobs LeaseManagerTestingKnobs,
stopper *stop.Stopper,
cfg *base.LeaseManagerConfig,
) *LeaseManager {
lm := &LeaseManager{
LeaseStore: LeaseStore{
execCfg: execCfg,
nodeIDContainer: nodeIDContainer,
db: db,
clock: clock,
internalExecutor: internalExecutor,
settings: settings,
group: &singleflight.Group{},
leaseDuration: cfg.TableDescriptorLeaseDuration,
leaseJitterFraction: cfg.TableDescriptorLeaseJitterFraction,
Expand All @@ -1347,9 +1361,9 @@ func NewLeaseManager(
return lm
}

// SetExecCfg has to be called if a nil execCfg was passed to NewLeaseManager.
func (m *LeaseManager) SetExecCfg(execCfg *ExecutorConfig) {
m.execCfg = execCfg
// SetInternalExecutor has to be called if a nil execCfg was passed to NewLeaseManager.
func (m *LeaseManager) SetInternalExecutor(executor sqlutil.InternalExecutor) {
m.internalExecutor = executor
}

func nameMatchesTable(
Expand Down Expand Up @@ -1491,7 +1505,7 @@ func (m *LeaseManager) resolveName(
) (sqlbase.ID, error) {
key := sqlbase.NewTableKey(dbID, tableName).Key()
id := sqlbase.InvalidID
if err := m.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if err := m.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.SetFixedTimestamp(ctx, timestamp)
gr, err := txn.Get(ctx, key)
if err != nil {
Expand Down Expand Up @@ -1664,7 +1678,7 @@ func (m *LeaseManager) RefreshLeases(s *stop.Stopper, db *client.DB, g *gossip.G
case *sqlbase.Descriptor_Table:
table := union.Table
table.MaybeFillInDescriptor()
if err := table.ValidateTable(m.execCfg.Settings); err != nil {
if err := table.ValidateTable(m.settings); err != nil {
log.Errorf(ctx, "%s: received invalid table descriptor: %s. Desc: %v",
kv.Key, err, table,
)
Expand Down Expand Up @@ -1731,7 +1745,7 @@ func (m *LeaseManager) PeriodicallyRefreshSomeLeases() {

// Refresh some of the current leases.
func (m *LeaseManager) refreshSomeLeases(ctx context.Context) {
limit := tableLeaseRefreshLimit.Get(&m.execCfg.Settings.SV)
limit := tableLeaseRefreshLimit.Get(&m.settings.SV)
if limit <= 0 {
return
}
Expand Down Expand Up @@ -1778,7 +1792,7 @@ func (m *LeaseManager) DeleteOrphanedLeases(timeThreshold int64) {
if m.testingKnobs.DisableDeleteOrphanedLeases {
return
}
nodeID := m.LeaseStore.execCfg.NodeID.Get()
nodeID := m.LeaseStore.nodeIDContainer.Get()
if nodeID == 0 {
panic("zero nodeID")
}
Expand All @@ -1799,7 +1813,7 @@ SELECT "descID", version, expiration FROM system.public.lease AS OF SYSTEM TIME
// The retry is required because of errors caused by node restarts. Retry 30 times.
if err := retry.WithMaxAttempts(ctx, retryOptions, 30, func() error {
var err error
rows, err = m.LeaseStore.execCfg.InternalExecutor.Query(
rows, err = m.LeaseStore.internalExecutor.Query(
ctx, "read orphaned table leases", nil /*txn*/, sqlQuery)
return err
}); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/lease_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
// Populate the name cache.
ctx := context.TODO()
table, _, err := leaseManager.AcquireByName(
ctx, leaseManager.execCfg.Clock.Now(), tableDesc.ParentID, "test")
ctx, leaseManager.clock.Now(), tableDesc.ParentID, "test")
if err != nil {
t.Fatal(err)
}
Expand All @@ -500,7 +500,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
}()

for i := 0; i < 50; i++ {
timestamp := leaseManager.execCfg.Clock.Now()
timestamp := leaseManager.clock.Now()
ctx := context.TODO()
table, _, err := leaseManager.AcquireByName(ctx, timestamp, tableDesc.ParentID, "test")
if err != nil {
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) {
m *LeaseManager,
acquireChan chan Result,
) {
table, e, err := m.Acquire(ctx, m.execCfg.Clock.Now(), descID)
table, e, err := m.Acquire(ctx, m.clock.Now(), descID)
acquireChan <- Result{err: err, exp: e, table: table}
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,11 @@ func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager {
cfgCpy.NodeInfo.NodeID = nc
mgr = sql.NewLeaseManager(
log.AmbientContext{Tracer: tracing.NewTracer()},
&cfgCpy,
nc,
cfgCpy.DB,
cfgCpy.Clock,
cfgCpy.InternalExecutor,
cfgCpy.Settings,
t.leaseManagerTestingKnobs,
t.server.Stopper(),
t.cfg,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/physical_schema_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (a *CachedPhysicalAccessor) GetDatabaseDesc(
// The database was not known in the uncommitted list. Have the db
// cache look it up by name for us.
return a.tc.databaseCache.getDatabaseDesc(ctx,
a.tc.leaseMgr.execCfg.DB.Txn, name, flags.required)
a.tc.leaseMgr.db.Txn, name, flags.required)
}

// We avoided the cache. Go lower.
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ func TestSchemaChangeProcess(t *testing.T) {
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
leaseMgr := sql.NewLeaseManager(
log.AmbientContext{Tracer: tracing.NewTracer()},
&execCfg,
execCfg.NodeID,
execCfg.DB,
execCfg.Clock,
execCfg.InternalExecutor,
execCfg.Settings,
sql.LeaseManagerTestingKnobs{},
stopper,
cfg,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (tc *TableCollection) getMutableTableDescriptor(
// Resolve the database from the database cache when the transaction
// hasn't modified the database.
dbID, err = tc.databaseCache.getDatabaseID(ctx,
tc.leaseMgr.execCfg.DB.Txn, tn.Catalog(), flags.required)
tc.leaseMgr.db.Txn, tn.Catalog(), flags.required)
if err != nil || dbID == sqlbase.InvalidID {
// dbID can still be invalid if required is false and the database is not found.
return nil, err
Expand Down Expand Up @@ -218,7 +218,7 @@ func (tc *TableCollection) getTableVersion(
// Resolve the database from the database cache when the transaction
// hasn't modified the database.
dbID, err = tc.databaseCache.getDatabaseID(ctx,
tc.leaseMgr.execCfg.DB.Txn, tn.Catalog(), flags.required)
tc.leaseMgr.db.Txn, tn.Catalog(), flags.required)
if err != nil || dbID == sqlbase.InvalidID {
// dbID can still be invalid if required is false and the database is not found.
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package sql

import (
"context"
"github.com/cockroachdb/cockroach/pkg/config"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down

0 comments on commit ed4bffc

Please sign in to comment.