Skip to content

Commit

Permalink
Merge pull request #18824 from lego/joey/async-table-lease-refresh-1.1
Browse files Browse the repository at this point in the history
cherrypick-1.1: sql: Refresh table leases asynchronously on access
  • Loading branch information
lgo authored Sep 27, 2017
2 parents a9a6f5c + 24bb4fe commit 3feb591
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 11 deletions.
82 changes: 72 additions & 10 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,23 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// TODO(pmattis): Periodically renew leases for tables that were used recently and
// for which the lease will expire soon.

var (
// LeaseDuration is the mean duration a lease will be acquired for. The
// actual duration is jittered in the range
// [0.75,1.25]*LeaseDuration. Exported for testing purposes only.
// actual duration is jittered using LeaseJitterMultiplier.
// Exported for testing purposes only.
LeaseDuration = 5 * time.Minute
// LeaseJitterMultiplier is the factor that we use to randomly jitter the lease
// duration when acquiring a new lease. The range of the actual lease duration will
// be [(1-LeaseJitterMultiplier) * LeaseDuration, (1-LeaseJitterMultiplier) * LeaseDuration]
// Exported for testing purposes only.
LeaseJitterMultiplier = 0.25
// LeaseRenewalTimeout is the duration for when we renew the lease early.
// This is specifically 30 seconds less than the minimum LeaseDuration, to be
// sure not to let the lease expire before renewing. Exported for testing
// purposes.
// TODO(joey): We can make the timeout an offset from the actual expiration,
// after jitter.
LeaseRenewalTimeout = LeaseDuration - time.Duration(float64(LeaseDuration)*LeaseJitterMultiplier) - 30*time.Second
)

// tableVersionState holds the state for a table version. This includes
Expand Down Expand Up @@ -122,9 +131,9 @@ type LeaseStore struct {
}

// jitteredLeaseDuration returns a randomly jittered duration from the interval
// [0.75 * leaseDuration, 1.25 * leaseDuration].
// [(1-LeaseJitterMultiplier) * leaseDuration, (1+LeaseJitterMultiplier) * leaseDuration].
func jitteredLeaseDuration() time.Duration {
return time.Duration(float64(LeaseDuration) * (0.75 + 0.5*rand.Float64()))
return time.Duration(float64(LeaseDuration) * (1 - LeaseJitterMultiplier + 2*LeaseJitterMultiplier*rand.Float64()))
}

// acquire a lease on the most recent version of a table descriptor.
Expand Down Expand Up @@ -562,6 +571,10 @@ type tableState struct {
// If set, leases are released from the store as soon as their
// refcount drops to 0, as opposed to waiting until they expire.
dropped bool
// Timer for refreshing the lease before it expires. Once a lease has been
// acquired once, a timer continuously runs to refresh the lease on a
// separate routine after LeaseRenewalTimeout time.
timer *time.Timer
}
}

Expand All @@ -573,11 +586,12 @@ func (t *tableState) acquire(
) (*tableVersionState, error) {
t.mu.Lock()
defer t.mu.Unlock()
// Wait for any existing lease acquisition.

// Block and wait for the new lease to be acquired on another routine.
t.acquireWait()

// Acquire a lease if no lease exists or if the latest lease is
// about to expire.
// Ensure a lease is acquired. Even though we finished waiting, another
// routine which finished acquiring a lease could have released it by now.
if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) {
if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil {
return nil, err
Expand All @@ -587,6 +601,43 @@ func (t *tableState) acquire(
return t.findForTimestamp(ctx, timestamp, m)
}

// acquireAlways ensures a new lease is acquired. This is used for making sure a
// new lease moves forward when refreshing the lease.
func (t *tableState) acquireAlways(ctx context.Context, m *LeaseManager) error {
t.mu.Lock()
defer t.mu.Unlock()

// Block and wait for the new lease to be acquired on another routine to
// prevent a race condition of entering acquireNodeLease
t.acquireWait()

// Ensure a lease is acquired.
return t.acquireNodeLease(ctx, m, hlc.Timestamp{})
}

// startRefreshingRoutine begins the goroutine responsible for refreshing a
// table lease.
// TODO(joey): Currently this routine will run forever for each table
// descriptor. Ideally, we stop the routine if the table descriptor hasn't been
// used recently.
//
// t.mu needs to be locked.
func (t *tableState) startRefreshingRoutine(m *LeaseManager) {
ctx := context.TODO()
t.stopper.RunWorker(ctx, func(ctx context.Context) {
for {
select {
case <-t.mu.timer.C:
if err := t.acquireAlways(ctx, m); err != nil {
log.Error(ctx, err)
}
case <-t.stopper.ShouldStop():
return
}
}
})
}

// ensureVersion ensures that the latest version >= minVersion. It will
// check if the latest known version meets the criterion, or attempt to
// acquire a lease at the latest version with the hope that it meets
Expand Down Expand Up @@ -800,6 +851,17 @@ func (t *tableState) acquireNodeLease(
}
t.upsertLocked(ctx, table, m)
t.tableNameCache.insert(table)

// Either begin a new routine to start refreshing the lease asynchronously or
// or reset the timer for it. After the first lease acquisition, we should
// always have a routine actively renewing the lease. If a descriptor is
// dropped LeaseStore.acquire() will fail and the timer will not be set.
if t.mu.timer == nil {
t.mu.timer = time.NewTimer(LeaseRenewalTimeout)
t.startRefreshingRoutine(m)
} else {
t.mu.timer.Reset(LeaseRenewalTimeout)
}
return nil
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/lease_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ package sql
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/pkg/errors"
)

func TestTableSet(t *testing.T) {
Expand Down Expand Up @@ -558,3 +562,76 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);

wg.Wait()
}

// This test makes sure the lease gets renewed automatically in the background.
func TestLeaseRefreshedAutomatically(t *testing.T) {
defer leaktest.AfterTest(t)()
var testAcquiredCount int32
testingKnobs := base.TestingKnobs{
SQLLeaseManager: &LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{
// We want to track what leases get acquired,
LeaseAcquiredEvent: func(table sqlbase.TableDescriptor, _ error) {
if table.Name == "test" {
atomic.AddInt32(&testAcquiredCount, 1)
}
},
},
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{Knobs: testingKnobs})
defer s.Stopper().Stop(context.TODO())
leaseManager := s.LeaseManager().(*LeaseManager)

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}
now := s.Clock().Now()
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test")

// We set LeaseRenewalTimeout to be low so it will refresh frequently, and the
// LeaseJitterMultiplier to ensure any newer leases will have a higher
// expiration timestamp.
savedLeaseRenewalTimeout := LeaseRenewalTimeout
savedLeaseJitterMultiplier := LeaseJitterMultiplier
defer func() {
LeaseRenewalTimeout = savedLeaseRenewalTimeout
LeaseJitterMultiplier = savedLeaseJitterMultiplier
}()
LeaseRenewalTimeout = 5 * time.Millisecond
LeaseJitterMultiplier = 0

// Acquire the first lease. This begins the refreshing routine.
_, e1, err := leaseManager.Acquire(context.TODO(), now, tableDesc.ID)
if err != nil {
t.Error(err)
}

// Keep checking for a new lease to be acquired by the refreshing routine.
testutils.SucceedsSoon(t, func() error {
// Acquire the newer lease.
ts, e2, err := leaseManager.Acquire(context.TODO(), now, tableDesc.ID)
if err != nil {
t.Fatal(err)
}

defer func() {
err := leaseManager.Release(ts)
if err != nil {
t.Fatal(err)
}
}()

if e2.WallTime <= e1.WallTime {
return errors.Errorf("expected new lease expiration (%s) to be after old lease expiration (%s)",
e2, e1)
} else if count := atomic.LoadInt32(&testAcquiredCount); count < 3 {
return errors.Errorf("expected at least 3 leases to be acquired, but only acquired %d times",
count)
}
return nil
})
}
4 changes: 3 additions & 1 deletion pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ func TestLeaseManager(testingT *testing.T) {
// table and expiration.
l1, e1 := t.mustAcquire(1, descID)
l2, e2 := t.mustAcquire(1, descID)
if l1.ID != l2.ID || e1 != e2 {
if l1.ID != l2.ID {
t.Fatalf("expected same lease, but found %v != %v", l1, l2)
} else if e1 != e2 {
t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2)
}
t.expectLeases(descID, "/1/1")
// Node 2 never acquired a lease on descID, so we should expect an error.
Expand Down

0 comments on commit 3feb591

Please sign in to comment.