Skip to content

Commit

Permalink
sql: let AS OF SYSTEM TIME requests use table descriptor cache
Browse files Browse the repository at this point in the history
This change also fixes a problem where TRUNCATE was not
setting the ModificationTime on the descriptor.

This change also fixes a bug where we were not using
Timestamp.Prev() to compute the previous timestamp
when reading the previous version of a descriptor.

This change disables the use of the descriptor cache
when AS OF SYSTEM TIME is for a timestamp before
a DROP or a TRUNCATE on a table because of a bug
in AcquireByName().

fixes #30967

Release note (sql change): Speedup AS OF SYSTEM TIME requests by
letting them use the table descriptor cache.
  • Loading branch information
vivekmenezes committed Nov 14, 2018
1 parent 4f9cbf1 commit 6257889
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 22 deletions.
2 changes: 0 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ func (ex *connExecutor) execStmtInOpenState(
}
if ts != nil {
p.semaCtx.AsOfTimestamp = ts
p.avoidCachedDescriptors = true
ex.state.mu.txn.SetFixedTimestamp(ctx, *ts)
}
} else {
Expand All @@ -363,7 +362,6 @@ func (ex *connExecutor) execStmtInOpenState(
ex.state.mu.txn.OrigTimestamp()))
}
p.semaCtx.AsOfTimestamp = ts
p.avoidCachedDescriptors = true
}
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,6 @@ func (ex *connExecutor) prepare(
}
if protoTS != nil {
p.semaCtx.AsOfTimestamp = protoTS
// We can't use cached descriptors anywhere in this query, because
// we want the descriptors at the timestamp given, not the latest
// known to the cache.
p.avoidCachedDescriptors = true
txn.SetFixedTimestamp(ctx, *protoTS)
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ func (p *planner) getTableScanByRef(
indexFlags *tree.IndexFlags,
scanVisibility scanVisibility,
) (planDataSource, error) {
flags := ObjectLookupFlags{CommonLookupFlags{txn: p.txn, avoidCached: p.avoidCachedDescriptors}}
flags := ObjectLookupFlags{CommonLookupFlags{
txn: p.txn,
avoidCached: p.avoidCachedDescriptors,
fixedTimestamp: p.semaCtx.AsOfTimestamp != nil,
},
}
desc, err := p.Tables().getTableVersionByID(ctx, sqlbase.ID(tref.TableID), flags)
if err != nil {
return planDataSource{}, errors.Wrapf(err, "%s", tree.ErrString(tref))
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,7 @@ func (s LeaseStore) getForExpiration(
var table *tableVersionState
err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
descKey := sqlbase.MakeDescMetadataKey(id)
prevTimestamp := expiration
prevTimestamp.WallTime--
prevTimestamp := expiration.Prev()
txn.SetFixedTimestamp(ctx, prevTimestamp)
var desc sqlbase.Descriptor
if err := txn.GetProto(ctx, descKey, &desc); err != nil {
Expand Down Expand Up @@ -756,7 +755,7 @@ func (m *LeaseManager) readOlderVersionForTimestamp(
afterIdx := 0
// Walk back the versions to find one that is valid for the timestamp.
for i := len(t.mu.active.data) - 1; i >= 0; i-- {
// Check to see if the ModififcationTime is valid.
// Check to see if the ModificationTime is valid.
if table := t.mu.active.data[i]; !timestamp.Less(table.ModificationTime) {
if timestamp.Less(table.expiration) {
// Existing valid table version.
Expand Down
50 changes: 50 additions & 0 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,56 @@ SELECT EXISTS(SELECT * FROM t.foo);
}
}

// Test that an AS OF SYSTEM TIME query uses the table cache.
func TestAsOfSystemTimeUsesCache(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := tests.CreateTestServerParams()

fooAcquiredCount := int32(0)

params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{
RemoveOnceDereferenced: true,
LeaseAcquiredEvent: func(table sqlbase.TableDescriptor, _ error) {
if table.Name == "foo" {
atomic.AddInt32(&fooAcquiredCount, 1)
}
},
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.foo (v INT);
`); err != nil {
t.Fatal(err)
}

if atomic.LoadInt32(&fooAcquiredCount) > 0 {
t.Fatalf("CREATE TABLE has acquired a lease: got %d, expected 0", atomic.LoadInt32(&fooAcquiredCount))
}

var tsVal string
if err := sqlDB.QueryRow("SELECT cluster_logical_timestamp()").Scan(&tsVal); err != nil {
t.Fatal(err)
}

if _, err := sqlDB.Exec(
fmt.Sprintf(`SELECT * FROM t.foo AS OF SYSTEM TIME %s;`, tsVal),
); err != nil {
t.Fatal(err)
}

count := atomic.LoadInt32(&fooAcquiredCount)
if count == 0 {
t.Fatal("SELECT did not get lease; got 0, expected > 0")
}
}

// TestDescriptorRefreshOnRetry tests that all descriptors acquired by
// a query are properly released before the query is retried.
func TestDescriptorRefreshOnRetry(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,11 @@ func (p *planner) LookupObject(

func (p *planner) CommonLookupFlags(ctx context.Context, required bool) CommonLookupFlags {
return CommonLookupFlags{
ctx: ctx,
txn: p.txn,
required: required,
avoidCached: p.avoidCachedDescriptors,
ctx: ctx,
txn: p.txn,
required: required,
avoidCached: p.avoidCachedDescriptors,
fixedTimestamp: p.semaCtx.AsOfTimestamp != nil,
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/schema_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type CommonLookupFlags struct {
required bool
// if avoidCached is set, lookup will avoid the cache (if any).
avoidCached bool
// set to true if the transaction has a fixed timestamp.
fixedTimestamp bool
}

// DatabaseLookupFlags is the flag struct suitable for GetDatabaseDesc().
Expand Down
24 changes: 17 additions & 7 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ func (tc *TableCollection) getTableVersion(
}
}

// TODO(vivek): Ideally we'd avoid caching for only the
// system.descriptor and system.lease tables, because they are
// used for acquiring leases, creating a chicken&egg problem.
// But doing so turned problematic and the tests pass only by also
// disabling caching of system.eventlog, system.rangelog, and
// system.users. For now we're sticking to disabling caching of
// all system descriptors except the role-members-table.
avoidCache := flags.avoidCached || testDisableTableLeases ||
(tn.Catalog() == sqlbase.SystemDB.Name && tn.TableName.String() != sqlbase.RoleMembersTable.Name)

Expand All @@ -252,13 +259,6 @@ func (tc *TableCollection) getTableVersion(
}

if avoidCache {
// TODO(vivek): Ideally we'd avoid caching for only the
// system.descriptor and system.lease tables, because they are
// used for acquiring leases, creating a chicken&egg problem.
// But doing so turned problematic and the tests pass only by also
// disabling caching of system.eventlog, system.rangelog, and
// system.users. For now we're sticking to disabling caching of
// all system descriptors except the role-members-table.
flags.avoidCached = true
phyAccessor := UncachedPhysicalAccessor{}
return phyAccessor.GetObjectDesc(tn, flags)
Expand All @@ -279,6 +279,16 @@ func (tc *TableCollection) getTableVersion(
origTimestamp := flags.txn.OrigTimestamp()
table, expiration, err := tc.leaseMgr.AcquireByName(ctx, origTimestamp, dbID, tn.Table())
if err != nil {
// AcquireByName() is unable to function correctly on a timestamp
// less than the timestamp of a transaction with a DROP/TRUNCATE on
// a table. A TRUNCATE is where the name -> id map for a table changes.
// TODO(vivek): There is no strong reason why this problem is limited
// to AS OF SYSTEM TIME requests; remove flags.fixedTimestamp.
if flags.fixedTimestamp {
flags.avoidCached = true
phyAccessor := UncachedPhysicalAccessor{}
return phyAccessor.GetObjectDesc(tn, flags)
}
if err == sqlbase.ErrDescriptorNotFound {
if flags.required {
// Transform the descriptor error into an error that references the
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (p *planner) truncateTable(ctx context.Context, id sqlbase.ID, traceKV bool
}
}
newTableDesc.Mutations = nil

newTableDesc.ModificationTime = p.txn.CommitTimestamp()
tKey := tableKey{parentID: newTableDesc.ParentID, name: newTableDesc.Name}
key := tKey.Key()
if err := p.createDescriptorWithID(
Expand Down

0 comments on commit 6257889

Please sign in to comment.