Skip to content

Commit

Permalink
Merge #31776
Browse files Browse the repository at this point in the history
31776: sql: cleanup AS OF SYSTEM TIME query descriptor cache lookup r=vivekmenezes a=vivekmenezes

This change removes the fixedTimestamp bit introduced in #31716

It also explains problem #18354 with LeaseManager.Acquire() through
a comment and implements a work around bypassing the cache and
reading the descriptor directly from the store.

It also fixes a problem with UncachedPhysicalAccessor.GetObjectDesc
tested through TestTxnCanStillResolveOldName.

related to #30967

fixes #18354
fixes #19925

Release note: None

Co-authored-by: Vivek Menezes <[email protected]>
  • Loading branch information
craig[bot] and vivekmenezes committed Nov 1, 2018
2 parents ac465ac + 7c9b907 commit b05b2f8
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 41 deletions.
5 changes: 2 additions & 3 deletions pkg/sql/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ func (p *planner) getTableScanByRef(
scanVisibility scanVisibility,
) (planDataSource, error) {
flags := ObjectLookupFlags{CommonLookupFlags{
txn: p.txn,
avoidCached: p.avoidCachedDescriptors,
fixedTimestamp: p.semaCtx.AsOfTimestamp != nil,
txn: p.txn,
avoidCached: p.avoidCachedDescriptors,
},
}
desc, err := p.Tables().getTableVersionByID(ctx, sqlbase.ID(tref.TableID), flags)
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,17 @@ func (m *LeaseManager) findNewest(tableID sqlbase.ID) *tableVersionState {
// the returned descriptor. Renewal of a lease may begin in the
// background. Renewal is done in order to prevent blocking on future
// acquisitions.
//
// Known limitation: AcquireByName() calls Acquire() and therefore suffers
// from the same limitation as Acquire (See Acquire). AcquireByName() is
// unable to function correctly on a timestamp less than the timestamp
// of a transaction with a DROP/TRUNCATE on a table. The limitation in
// the face of a DROP follows directly from the limitation on Acquire().
// A TRUNCATE is implemented by changing the name -> id mapping for a table
// and by dropping the descriptor with the old id. While AcquireByName
// can use the timestamp and get the correct name->id mapping at a
// timestamp, it uses Acquire() to get a descriptor with the corresponding
// id and fails because the id has been dropped by the TRUNCATE.
func (m *LeaseManager) AcquireByName(
ctx context.Context, timestamp hlc.Timestamp, dbID sqlbase.ID, tableName string,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
Expand Down Expand Up @@ -1505,6 +1516,12 @@ func (m *LeaseManager) resolveName(
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
//
// Known limitation: Acquire() can return an error after the table with
// the tableID has been dropped. This is true even when using a timestamp
// less than the timestamp of the DROP command. This is because Acquire
// can only return an older version of a descriptor if the latest version
// can be leased; as it stands a dropped table cannot be leased.
func (m *LeaseManager) Acquire(
ctx context.Context, timestamp hlc.Timestamp, tableID sqlbase.ID,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
Expand Down
109 changes: 109 additions & 0 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1781,3 +1781,112 @@ CREATE TABLE t.test2 ();
t.Fatalf("expected lease acquisition to not block, but blockCount is: %d", blockCount)
}
}

// TestReadBeforeDrop tests that a read over a table from a transaction
// initiated before the table is dropped succeeds.
func TestReadBeforeDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := tests.CreateTestServerParams()
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR);
INSERT INTO t.kv VALUES ('a', 'b');
`); err != nil {
t.Fatal(err)
}
// Test that once a table is dropped it cannot be used even when
// a transaction is using a timestamp from the past.
tx, err := sqlDB.Begin()
if err != nil {
t.Fatal(err)
}

if _, err := sqlDB.Exec(`DROP TABLE t.kv`); err != nil {
t.Fatal(err)
}

rows, err := tx.Query(`SELECT * FROM t.kv`)
if err != nil {
t.Fatal(err)
}
defer rows.Close()

for rows.Next() {
var k, v string
err := rows.Scan(&k, &v)
if err != nil {
t.Fatal(err)
}
if k != "a" || v != "b" {
t.Fatalf("didn't find expected row: %s %s", k, v)
}
}

if err := tx.Commit(); err != nil {
t.Fatal(err)
}
}

// Tests that transactions with timestamps within the uncertainty interval
// of a TABLE CREATE are pushed to allow them to observe the created table.
func TestTableCreationPushesTxnsInRecentPast(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := tests.CreateTestServerParams()
tc := serverutils.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: params,
})
defer tc.Stopper().Stop(context.TODO())
sqlDB := tc.ServerConn(0)

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.timestamp (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}

// Create a transaction before the table is created.
tx, err := sqlDB.Begin()
if err != nil {
t.Fatal(err)
}

// Create a transaction before the table is created. Use a different
// node so that clock uncertainty is presumed and it gets pushed.
tx1, err := tc.ServerConn(1).Begin()
if err != nil {
t.Fatal(err)
}

if _, err := sqlDB.Exec(`
CREATE TABLE t.kv (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}

// Was actually run in the past and so doesn't see the table.
if _, err := tx.Exec(`
INSERT INTO t.kv VALUES ('a', 'b');
`); !testutils.IsError(err, "does not exist") {
t.Fatal(err)
}

// Not sure whether run in the past and so sees clock uncertainty push.
if _, err := tx1.Exec(`
INSERT INTO t.kv VALUES ('c', 'd');
`); err != nil {
t.Fatal(err)
}

if err := tx.Rollback(); err != nil {
t.Fatal(err)
}

if err := tx1.Commit(); err != nil {
t.Fatal(err)
}
}
12 changes: 9 additions & 3 deletions pkg/sql/physical_schema_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,17 @@ func (a UncachedPhysicalAccessor) GetObjectDesc(
if err != nil {
return nil, nil, err
}

if found {
// We have a descriptor. Is it in the right state? We'll keep if
// in the ADD state.
// We have a descriptor. Is it in the right state? We'll keep it if
// it is in the ADD state.
if err := filterTableState(desc); err == nil || err == errTableAdding {
return desc, dbDesc, nil
// Immediately after a RENAME an old name still points to the
// descriptor during the drain phase for the name. Do not
// return a descriptor during draining.
if nameMatchesTable(desc, dbDesc.ID, name.Table()) {
return desc, dbDesc, nil
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ CREATE TABLE test.t (a INT PRIMARY KEY);
// Check that the old name is not usable outside of the transaction now
// that the node doesn't have a lease on it anymore (committing the txn
// should have released the lease on the version of the descriptor with the
// old name), even thoudh the name mapping still exists.
// old name), even though the name mapping still exists.
lease := s.LeaseManager().(*LeaseManager).tableNames.get(tableDesc.ID, "t", s.Clock().Now())
if lease != nil {
t.Fatalf(`still have lease on "t"`)
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,10 @@ func (p *planner) LookupObject(

func (p *planner) CommonLookupFlags(ctx context.Context, required bool) CommonLookupFlags {
return CommonLookupFlags{
ctx: ctx,
txn: p.txn,
required: required,
avoidCached: p.avoidCachedDescriptors,
fixedTimestamp: p.semaCtx.AsOfTimestamp != nil,
ctx: ctx,
txn: p.txn,
required: required,
avoidCached: p.avoidCachedDescriptors,
}
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/schema_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ type CommonLookupFlags struct {
required bool
// if avoidCached is set, lookup will avoid the cache (if any).
avoidCached bool
// set to true if the transaction has a fixed timestamp.
fixedTimestamp bool
}

// DatabaseLookupFlags is the flag struct suitable for GetDatabaseDesc().
Expand Down
35 changes: 8 additions & 27 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,10 @@ type dbCacheSubscriber interface {
// If flags.required is false, getTableVersion() will gracefully
// return a nil descriptor and no error if the table does not exist.
//
// TODO(vivek): #6418 introduced a transaction deadline that is enforced at
// the KV layer, and was introduced to manager the validity window of a
// table descriptor. Since we will be checking for the valid use of a table
// descriptor here, we do not need the extra check at the KV layer. However,
// for a SNAPSHOT_ISOLATION transaction the commit timestamp of the transaction
// can change, so we have kept the transaction deadline. It's worth
// reconsidering if this is really needed.
// It might also add a transaction deadline to the transaction that is
// enforced at the KV layer to ensure that the transaction doesn't violate
// the validity window of the table descriptor version returned.
//
// TODO(vivek): Allow cached descriptors for AS OF SYSTEM TIME queries.
func (tc *TableCollection) getTableVersion(
ctx context.Context, tn *tree.TableName, flags ObjectLookupFlags,
) (ObjectDescriptor, *sqlbase.DatabaseDescriptor, error) {
Expand Down Expand Up @@ -255,9 +250,8 @@ func (tc *TableCollection) getTableVersion(
return table, nil, nil
}

phyAccessor := UncachedPhysicalAccessor{}
if avoidCache {
flags.avoidCached = true
phyAccessor := UncachedPhysicalAccessor{}
return phyAccessor.GetObjectDesc(tn, flags)
}

Expand All @@ -276,25 +270,12 @@ func (tc *TableCollection) getTableVersion(
origTimestamp := flags.txn.OrigTimestamp()
table, expiration, err := tc.leaseMgr.AcquireByName(ctx, origTimestamp, dbID, tn.Table())
if err != nil {
// AcquireByName() is unable to function correctly on a timestamp
// less than the timestamp of a transaction with a DROP/TRUNCATE on
// a table. A TRUNCATE is where the name -> id map for a table changes.
// TODO(vivek): There is no strong reason why this problem is limited
// to AS OF SYSTEM TIME requests; remove flags.fixedTimestamp.
if flags.fixedTimestamp {
flags.avoidCached = true
phyAccessor := UncachedPhysicalAccessor{}
// Read the descriptor from the store in the face of some specific errors
// because of a known limitation of AcquireByName. See the known
// limitations of AcquireByName for details.
if err == errTableDropped || err == sqlbase.ErrDescriptorNotFound {
return phyAccessor.GetObjectDesc(tn, flags)
}
if err == sqlbase.ErrDescriptorNotFound {
if flags.required {
// Transform the descriptor error into an error that references the
// table's name.
return nil, nil, sqlbase.NewUndefinedRelationError(tn)
}
// We didn't find the descriptor but it's also not required. Make no fuss.
return nil, nil, nil
}
// Lease acquisition failed with some other error. This we don't
// know how to deal with, so propagate the error.
return nil, nil, err
Expand Down

0 comments on commit b05b2f8

Please sign in to comment.