Skip to content

Commit

Permalink
sql: Refresh table leases asynchronously on access
Browse files Browse the repository at this point in the history
Previously if we would only refresh a table lease if it would be expired for the
transaction timestamp. Now we will refresh the lease asynchronously on access if
it is about to expire in order to prevent transactions from blocking on lease
acquisition.

Fixes #17227.
  • Loading branch information
lgo committed Sep 27, 2017
1 parent 65eb31b commit 24bb4fe
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 11 deletions.
82 changes: 72 additions & 10 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,23 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// TODO(pmattis): Periodically renew leases for tables that were used recently and
// for which the lease will expire soon.

var (
// LeaseDuration is the mean duration a lease will be acquired for. The
// actual duration is jittered in the range
// [0.75,1.25]*LeaseDuration. Exported for testing purposes only.
// actual duration is jittered using LeaseJitterMultiplier.
// Exported for testing purposes only.
LeaseDuration = 5 * time.Minute
// LeaseJitterMultiplier is the factor that we use to randomly jitter the lease
// duration when acquiring a new lease. The range of the actual lease duration will
// be [(1-LeaseJitterMultiplier) * LeaseDuration, (1-LeaseJitterMultiplier) * LeaseDuration]
// Exported for testing purposes only.
LeaseJitterMultiplier = 0.25
// LeaseRenewalTimeout is the duration for when we renew the lease early.
// This is specifically 30 seconds less than the minimum LeaseDuration, to be
// sure not to let the lease expire before renewing. Exported for testing
// purposes.
// TODO(joey): We can make the timeout an offset from the actual expiration,
// after jitter.
LeaseRenewalTimeout = LeaseDuration - time.Duration(float64(LeaseDuration)*LeaseJitterMultiplier) - 30*time.Second
)

// tableVersionState holds the state for a table version. This includes
Expand Down Expand Up @@ -122,9 +131,9 @@ type LeaseStore struct {
}

// jitteredLeaseDuration returns a randomly jittered duration from the interval
// [0.75 * leaseDuration, 1.25 * leaseDuration].
// [(1-LeaseJitterMultiplier) * leaseDuration, (1+LeaseJitterMultiplier) * leaseDuration].
func jitteredLeaseDuration() time.Duration {
return time.Duration(float64(LeaseDuration) * (0.75 + 0.5*rand.Float64()))
return time.Duration(float64(LeaseDuration) * (1 - LeaseJitterMultiplier + 2*LeaseJitterMultiplier*rand.Float64()))
}

// acquire a lease on the most recent version of a table descriptor.
Expand Down Expand Up @@ -562,6 +571,10 @@ type tableState struct {
// If set, leases are released from the store as soon as their
// refcount drops to 0, as opposed to waiting until they expire.
dropped bool
// Timer for refreshing the lease before it expires. Once a lease has been
// acquired once, a timer continuously runs to refresh the lease on a
// separate routine after LeaseRenewalTimeout time.
timer *time.Timer
}
}

Expand All @@ -573,11 +586,12 @@ func (t *tableState) acquire(
) (*tableVersionState, error) {
t.mu.Lock()
defer t.mu.Unlock()
// Wait for any existing lease acquisition.

// Block and wait for the new lease to be acquired on another routine.
t.acquireWait()

// Acquire a lease if no lease exists or if the latest lease is
// about to expire.
// Ensure a lease is acquired. Even though we finished waiting, another
// routine which finished acquiring a lease could have released it by now.
if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) {
if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil {
return nil, err
Expand All @@ -587,6 +601,43 @@ func (t *tableState) acquire(
return t.findForTimestamp(ctx, timestamp, m)
}

// acquireAlways ensures a new lease is acquired. This is used for making sure a
// new lease moves forward when refreshing the lease.
func (t *tableState) acquireAlways(ctx context.Context, m *LeaseManager) error {
t.mu.Lock()
defer t.mu.Unlock()

// Block and wait for the new lease to be acquired on another routine to
// prevent a race condition of entering acquireNodeLease
t.acquireWait()

// Ensure a lease is acquired.
return t.acquireNodeLease(ctx, m, hlc.Timestamp{})
}

// startRefreshingRoutine begins the goroutine responsible for refreshing a
// table lease.
// TODO(joey): Currently this routine will run forever for each table
// descriptor. Ideally, we stop the routine if the table descriptor hasn't been
// used recently.
//
// t.mu needs to be locked.
func (t *tableState) startRefreshingRoutine(m *LeaseManager) {
ctx := context.TODO()
t.stopper.RunWorker(ctx, func(ctx context.Context) {
for {
select {
case <-t.mu.timer.C:
if err := t.acquireAlways(ctx, m); err != nil {
log.Error(ctx, err)
}
case <-t.stopper.ShouldStop():
return
}
}
})
}

// ensureVersion ensures that the latest version >= minVersion. It will
// check if the latest known version meets the criterion, or attempt to
// acquire a lease at the latest version with the hope that it meets
Expand Down Expand Up @@ -800,6 +851,17 @@ func (t *tableState) acquireNodeLease(
}
t.upsertLocked(ctx, table, m)
t.tableNameCache.insert(table)

// Either begin a new routine to start refreshing the lease asynchronously or
// or reset the timer for it. After the first lease acquisition, we should
// always have a routine actively renewing the lease. If a descriptor is
// dropped LeaseStore.acquire() will fail and the timer will not be set.
if t.mu.timer == nil {
t.mu.timer = time.NewTimer(LeaseRenewalTimeout)
t.startRefreshingRoutine(m)
} else {
t.mu.timer.Reset(LeaseRenewalTimeout)
}
return nil
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/lease_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ package sql
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/pkg/errors"
)

func TestTableSet(t *testing.T) {
Expand Down Expand Up @@ -558,3 +562,76 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);

wg.Wait()
}

// This test makes sure the lease gets renewed automatically in the background.
func TestLeaseRefreshedAutomatically(t *testing.T) {
defer leaktest.AfterTest(t)()
var testAcquiredCount int32
testingKnobs := base.TestingKnobs{
SQLLeaseManager: &LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{
// We want to track what leases get acquired,
LeaseAcquiredEvent: func(table sqlbase.TableDescriptor, _ error) {
if table.Name == "test" {
atomic.AddInt32(&testAcquiredCount, 1)
}
},
},
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{Knobs: testingKnobs})
defer s.Stopper().Stop(context.TODO())
leaseManager := s.LeaseManager().(*LeaseManager)

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}
now := s.Clock().Now()
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test")

// We set LeaseRenewalTimeout to be low so it will refresh frequently, and the
// LeaseJitterMultiplier to ensure any newer leases will have a higher
// expiration timestamp.
savedLeaseRenewalTimeout := LeaseRenewalTimeout
savedLeaseJitterMultiplier := LeaseJitterMultiplier
defer func() {
LeaseRenewalTimeout = savedLeaseRenewalTimeout
LeaseJitterMultiplier = savedLeaseJitterMultiplier
}()
LeaseRenewalTimeout = 5 * time.Millisecond
LeaseJitterMultiplier = 0

// Acquire the first lease. This begins the refreshing routine.
_, e1, err := leaseManager.Acquire(context.TODO(), now, tableDesc.ID)
if err != nil {
t.Error(err)
}

// Keep checking for a new lease to be acquired by the refreshing routine.
testutils.SucceedsSoon(t, func() error {
// Acquire the newer lease.
ts, e2, err := leaseManager.Acquire(context.TODO(), now, tableDesc.ID)
if err != nil {
t.Fatal(err)
}

defer func() {
err := leaseManager.Release(ts)
if err != nil {
t.Fatal(err)
}
}()

if e2.WallTime <= e1.WallTime {
return errors.Errorf("expected new lease expiration (%s) to be after old lease expiration (%s)",
e2, e1)
} else if count := atomic.LoadInt32(&testAcquiredCount); count < 3 {
return errors.Errorf("expected at least 3 leases to be acquired, but only acquired %d times",
count)
}
return nil
})
}
4 changes: 3 additions & 1 deletion pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ func TestLeaseManager(testingT *testing.T) {
// table and expiration.
l1, e1 := t.mustAcquire(1, descID)
l2, e2 := t.mustAcquire(1, descID)
if l1.ID != l2.ID || e1 != e2 {
if l1.ID != l2.ID {
t.Fatalf("expected same lease, but found %v != %v", l1, l2)
} else if e1 != e2 {
t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2)
}
t.expectLeases(descID, "/1/1")
// Node 2 never acquired a lease on descID, so we should expect an error.
Expand Down

0 comments on commit 24bb4fe

Please sign in to comment.