Skip to content

Commit

Permalink
Merge pull request #36631 from vivekmenezes/backport19.1-36531
Browse files Browse the repository at this point in the history
backport-19.1: sql: ensure lease expiration is monotonically increasing
  • Loading branch information
vivekmenezes authored Apr 9, 2019
2 parents 0b4cccd + 1047868 commit c03c307
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ const (
// DefaultTableDescriptorLeaseJitterFraction is the default factor
// that we use to randomly jitter the lease duration when acquiring a
// new lease and the lease renewal timeout.
DefaultTableDescriptorLeaseJitterFraction = 0.25
DefaultTableDescriptorLeaseJitterFraction = 0.05

// DefaultTableDescriptorLeaseRenewalTimeout is the default time
// before a lease expires when acquisition to renew the lease begins.
Expand Down
63 changes: 49 additions & 14 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func (s *tableVersionState) hasExpired(timestamp hlc.Timestamp) bool {
return !timestamp.Less(s.expiration)
}

// hasValidExpiration checks that this table have a larger expiration than
// the existing one it is replacing. This can be used to check the
// monotonicity of the expiration times on a table at a particular version.
// The version is not explicitly checked here.
func (s *tableVersionState) hasValidExpiration(existing *tableVersionState) bool {
return existing.expiration.Less(s.expiration)
}

func (s *tableVersionState) incRefcount() {
s.mu.Lock()
s.incRefcountLocked()
Expand Down Expand Up @@ -163,11 +171,20 @@ func (s LeaseStore) jitteredLeaseDuration() time.Duration {
// acquire a lease on the most recent version of a table descriptor.
// If the lease cannot be obtained because the descriptor is in the process of
// being dropped, the error will be errTableDropped.
func (s LeaseStore) acquire(ctx context.Context, tableID sqlbase.ID) (*tableVersionState, error) {
// The expiration time set for the lease > minExpiration.
func (s LeaseStore) acquire(
ctx context.Context, minExpiration hlc.Timestamp, tableID sqlbase.ID,
) (*tableVersionState, error) {
var table *tableVersionState
err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
expiration := txn.OrigTimestamp()
expiration.WallTime += int64(s.jitteredLeaseDuration())
if !minExpiration.Less(expiration) {
// In the rare circumstances where expiration <= minExpiration
// use an expiration based on the minExpiration to guarantee
// a monotonically increasing expiration.
expiration = minExpiration.Add(int64(time.Millisecond), 0)
}

tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, tableID)
if err != nil {
Expand Down Expand Up @@ -857,19 +874,33 @@ func (m *LeaseManager) AcquireFreshestFromStore(ctx context.Context, tableID sql
// upsertLocked inserts a lease for a particular table version.
// If an existing lease exists for the table version it replaces
// it and returns it.
func (t *tableState) upsertLocked(ctx context.Context, table *tableVersionState) *storedTableLease {
func (t *tableState) upsertLocked(
ctx context.Context, table *tableVersionState,
) (*storedTableLease, error) {
s := t.mu.active.find(table.Version)
if s == nil {
if t.mu.active.findNewest() != nil {
log.Infof(ctx, "new lease: %s", table)
}
t.mu.active.insert(table)
return nil
return nil, nil
}

// The table is replacing an existing one at the same version.
if !table.hasValidExpiration(s) {
// This is a violation of an invariant and can actually not
// happen. We return an error here to aid in further investigations.
return nil, errors.Errorf("lease expiration monotonicity violation, (%s) vs (%s)", s, table)
}

s.mu.Lock()
table.mu.Lock()
// subsume the refcount of the older lease.
// subsume the refcount of the older lease. This is permitted because
// the new lease has a greater expiration than the older lease and
// any transaction using the older lease can safely use a deadline set
// to the older lease's expiration even though the older lease is
// released! This is because the new lease is valid at the same table
// version at a greater expiration.
table.mu.refcount += s.mu.refcount
s.mu.refcount = 0
l := s.mu.lease
Expand All @@ -881,7 +912,7 @@ func (t *tableState) upsertLocked(ctx context.Context, table *tableVersionState)
s.mu.Unlock()
t.mu.active.remove(s)
t.mu.active.insert(table)
return l
return l, nil
}

// removeInactiveVersions removes inactive versions in t.mu.active.data with refcount 0.
Expand All @@ -908,26 +939,30 @@ func (t *tableState) removeInactiveVersions() []*storedTableLease {

// If the lease cannot be obtained because the descriptor is in the process of
// being dropped, the error will be errTableDropped.
// minExpirationTime, if not set to the zero value, will be used as a lower
// bound on the expiration of the new table. This can be used to eliminate the
// jitter in the expiration time, and guarantee that we get a lease that will be
// inserted at the end of the lease set (i.e. it will be returned by
// findNewest() from now on). The boolean returned is true if this call was actually
// responsible for the lease acquisition.
// The boolean returned is true if this call was actually responsible for the
// lease acquisition.
func acquireNodeLease(ctx context.Context, m *LeaseManager, id sqlbase.ID) (bool, error) {
var toRelease *storedTableLease
resultChan, didAcquire := m.group.DoChan(fmt.Sprintf("acquire%d", id), func() (interface{}, error) {
if m.isDraining() {
return nil, errors.New("cannot acquire lease when draining")
}
table, err := m.LeaseStore.acquire(ctx, id)
newest := m.findNewest(id)
var minExpiration hlc.Timestamp
if newest != nil {
minExpiration = newest.expiration
}
table, err := m.LeaseStore.acquire(ctx, minExpiration, id)
if err != nil {
return nil, err
}
t := m.findTableState(id, false /* create */)
t.mu.Lock()
defer t.mu.Unlock()
toRelease = t.upsertLocked(ctx, table)
toRelease, err = t.upsertLocked(ctx, table)
if err != nil {
return nil, err
}
m.tableNames.insert(table)
return leaseToken(table), nil
})
Expand Down Expand Up @@ -1239,7 +1274,7 @@ func (c *tableNameCache) insert(table *tableVersionState) {
// If we already have a lease in the cache for this name, see if this one is
// better (higher version or later expiration).
if table.Version > existing.Version ||
(table.Version == existing.Version && (existing.expiration.Less(table.expiration))) {
(table.Version == existing.Version && table.hasValidExpiration(existing)) {
// Overwrite the old table. The new one is better. From now on, we want
// clients to use the new one.
c.tables[key] = table
Expand Down
68 changes: 68 additions & 0 deletions pkg/sql/lease_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,74 @@ CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR);
}
}

// Tests that a name cache entry always exists for the latest lease and
// the lease expiration time is monotonically increasing.
func TestNameCacheContainsLatestLease(t *testing.T) {
defer leaktest.AfterTest(t)()
removalTracker := NewLeaseRemovalTracker()
testingKnobs := base.TestingKnobs{
SQLLeaseManager: &LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{
LeaseReleasedEvent: removalTracker.LeaseRemovedNotification,
},
},
}
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{Knobs: testingKnobs})
defer s.Stopper().Stop(context.TODO())
leaseManager := s.LeaseManager().(*LeaseManager)

const tableName = "test"

if _, err := db.Exec(fmt.Sprintf(`
CREATE DATABASE t;
CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR);
`, tableName)); err != nil {
t.Fatal(err)
}

tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", tableName)

// Populate the name cache.
if _, err := db.Exec("SELECT * FROM t.test;"); err != nil {
t.Fatal(err)
}

// There is a cache entry.
lease := leaseManager.tableNames.get(tableDesc.ParentID, tableName, s.Clock().Now())
if lease == nil {
t.Fatalf("name cache has no unexpired entry for (%d, %s)", tableDesc.ParentID, tableName)
}

tracker := removalTracker.TrackRemoval(&lease.ImmutableTableDescriptor)

// Acquire another lease.
if _, err := acquireNodeLease(context.TODO(), leaseManager, tableDesc.ID); err != nil {
t.Fatal(err)
}

// Check the name resolves to the new lease.
newLease := leaseManager.tableNames.get(tableDesc.ParentID, tableName, s.Clock().Now())
if newLease == nil {
t.Fatalf("name cache doesn't contain entry for (%d, %s)", tableDesc.ParentID, tableName)
}
if newLease == lease {
t.Fatalf("same lease %s", newLease.expiration.GoTime())
}

if err := leaseManager.Release(&lease.ImmutableTableDescriptor); err != nil {
t.Fatal(err)
}

// The first lease acquisition was released.
if err := tracker.WaitForRemoval(); err != nil {
t.Fatal(err)
}

if err := leaseManager.Release(&newLease.ImmutableTableDescriptor); err != nil {
t.Fatal(err)
}
}

// Test that table names are treated as case sensitive by the name cache.
func TestTableNameCaseSensitive(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down

0 comments on commit c03c307

Please sign in to comment.