Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: let AS OF SYSTEM TIME requests use table descriptor cache #31716

Merged
merged 1 commit into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ func (ex *connExecutor) execStmtInOpenState(
}
if ts != nil {
p.semaCtx.AsOfTimestamp = ts
p.avoidCachedDescriptors = true
ex.state.mu.txn.SetFixedTimestamp(ctx, *ts)
}
} else {
Expand All @@ -362,7 +361,6 @@ func (ex *connExecutor) execStmtInOpenState(
ex.state.mu.txn.OrigTimestamp()))
}
p.semaCtx.AsOfTimestamp = ts
p.avoidCachedDescriptors = true
}
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,6 @@ func (ex *connExecutor) prepare(
}
if protoTS != nil {
p.semaCtx.AsOfTimestamp = protoTS
// We can't use cached descriptors anywhere in this query, because
// we want the descriptors at the timestamp given, not the latest
// known to the cache.
p.avoidCachedDescriptors = true
txn.SetFixedTimestamp(ctx, *protoTS)
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ func (p *planner) getTableScanByRef(
indexFlags *tree.IndexFlags,
scanVisibility scanVisibility,
) (planDataSource, error) {
flags := ObjectLookupFlags{CommonLookupFlags{txn: p.txn, avoidCached: p.avoidCachedDescriptors}}
flags := ObjectLookupFlags{CommonLookupFlags{
txn: p.txn,
avoidCached: p.avoidCachedDescriptors,
fixedTimestamp: p.semaCtx.AsOfTimestamp != nil,
},
}
desc, err := p.Tables().getTableVersionByID(ctx, sqlbase.ID(tref.TableID), flags)
if err != nil {
return planDataSource{}, errors.Wrapf(err, "%s", tree.ErrString(tref))
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,7 @@ func (s LeaseStore) getForExpiration(
var table *tableVersionState
err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
descKey := sqlbase.MakeDescMetadataKey(id)
prevTimestamp := expiration
prevTimestamp.WallTime--
prevTimestamp := expiration.Prev()
txn.SetFixedTimestamp(ctx, prevTimestamp)
var desc sqlbase.Descriptor
if err := txn.GetProto(ctx, descKey, &desc); err != nil {
Expand Down Expand Up @@ -756,7 +755,7 @@ func (m *LeaseManager) readOlderVersionForTimestamp(
afterIdx := 0
// Walk back the versions to find one that is valid for the timestamp.
for i := len(t.mu.active.data) - 1; i >= 0; i-- {
// Check to see if the ModififcationTime is valid.
// Check to see if the ModificationTime is valid.
if table := t.mu.active.data[i]; !timestamp.Less(table.ModificationTime) {
if timestamp.Less(table.expiration) {
// Existing valid table version.
Expand Down
50 changes: 50 additions & 0 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,56 @@ SELECT EXISTS(SELECT * FROM t.foo);
}
}

// Test that an AS OF SYSTEM TIME query uses the table cache.
func TestAsOfSystemTimeUsesCache(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := tests.CreateTestServerParams()

fooAcquiredCount := int32(0)

params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{
RemoveOnceDereferenced: true,
LeaseAcquiredEvent: func(table sqlbase.TableDescriptor, _ error) {
if table.Name == "foo" {
atomic.AddInt32(&fooAcquiredCount, 1)
}
},
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.foo (v INT);
`); err != nil {
t.Fatal(err)
}

if atomic.LoadInt32(&fooAcquiredCount) > 0 {
t.Fatalf("CREATE TABLE has acquired a lease: got %d, expected 0", atomic.LoadInt32(&fooAcquiredCount))
}

var tsVal string
if err := sqlDB.QueryRow("SELECT cluster_logical_timestamp()").Scan(&tsVal); err != nil {
t.Fatal(err)
}

if _, err := sqlDB.Exec(
fmt.Sprintf(`SELECT * FROM t.foo AS OF SYSTEM TIME %s;`, tsVal),
); err != nil {
t.Fatal(err)
}

count := atomic.LoadInt32(&fooAcquiredCount)
if count == 0 {
t.Fatal("SELECT did not get lease; got 0, expected > 0")
}
}

// TestDescriptorRefreshOnRetry tests that all descriptors acquired by
// a query are properly released before the query is retried.
func TestDescriptorRefreshOnRetry(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,11 @@ func (p *planner) LookupObject(

func (p *planner) CommonLookupFlags(ctx context.Context, required bool) CommonLookupFlags {
return CommonLookupFlags{
ctx: ctx,
txn: p.txn,
required: required,
avoidCached: p.avoidCachedDescriptors,
ctx: ctx,
txn: p.txn,
required: required,
avoidCached: p.avoidCachedDescriptors,
fixedTimestamp: p.semaCtx.AsOfTimestamp != nil,
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/schema_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type CommonLookupFlags struct {
required bool
// if avoidCached is set, lookup will avoid the cache (if any).
avoidCached bool
// set to true if the transaction has a fixed timestamp.
fixedTimestamp bool
}

// DatabaseLookupFlags is the flag struct suitable for GetDatabaseDesc().
Expand Down
24 changes: 17 additions & 7 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ func (tc *TableCollection) getTableVersion(
}
}

// TODO(vivek): Ideally we'd avoid caching for only the
// system.descriptor and system.lease tables, because they are
// used for acquiring leases, creating a chicken&egg problem.
// But doing so turned problematic and the tests pass only by also
// disabling caching of system.eventlog, system.rangelog, and
// system.users. For now we're sticking to disabling caching of
// all system descriptors except the role-members-table.
avoidCache := flags.avoidCached || testDisableTableLeases ||
(tn.Catalog() == sqlbase.SystemDB.Name && tn.TableName.String() != sqlbase.RoleMembersTable.Name)

Expand All @@ -252,13 +259,6 @@ func (tc *TableCollection) getTableVersion(
}

if avoidCache {
// TODO(vivek): Ideally we'd avoid caching for only the
// system.descriptor and system.lease tables, because they are
// used for acquiring leases, creating a chicken&egg problem.
// But doing so turned problematic and the tests pass only by also
// disabling caching of system.eventlog, system.rangelog, and
// system.users. For now we're sticking to disabling caching of
// all system descriptors except the role-members-table.
flags.avoidCached = true
phyAccessor := UncachedPhysicalAccessor{}
return phyAccessor.GetObjectDesc(tn, flags)
Expand All @@ -279,6 +279,16 @@ func (tc *TableCollection) getTableVersion(
origTimestamp := flags.txn.OrigTimestamp()
table, expiration, err := tc.leaseMgr.AcquireByName(ctx, origTimestamp, dbID, tn.Table())
if err != nil {
// AcquireByName() is unable to function correctly on a timestamp
// less than the timestamp of a transaction with a DROP/TRUNCATE on
// a table. A TRUNCATE is where the name -> id map for a table changes.
// TODO(vivek): There is no strong reason why this problem is limited
// to AS OF SYSTEM TIME requests; remove flags.fixedTimestamp.
if flags.fixedTimestamp {
flags.avoidCached = true
phyAccessor := UncachedPhysicalAccessor{}
return phyAccessor.GetObjectDesc(tn, flags)
}
if err == sqlbase.ErrDescriptorNotFound {
if flags.required {
// Transform the descriptor error into an error that references the
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (p *planner) truncateTable(
}
newTableDesc.Mutations = nil
newTableDesc.GCMutations = nil

newTableDesc.ModificationTime = p.txn.CommitTimestamp()
tKey := tableKey{parentID: newTableDesc.ParentID, name: newTableDesc.Name}
key := tKey.Key()
if err := p.createDescriptorWithID(
Expand Down