From 24bb4fe643b3a3d479ef20a8782e157b0e55e526 Mon Sep 17 00:00:00 2001 From: Joey Pereira Date: Tue, 19 Sep 2017 12:33:03 -0400 Subject: [PATCH] sql: Refresh table leases asynchronously on access Previously if we would only refresh a table lease if it would be expired for the transaction timestamp. Now we will refresh the lease asynchronously on access if it is about to expire in order to prevent transactions from blocking on lease acquisition. Fixes #17227. --- pkg/sql/lease.go | 82 +++++++++++++++++++++++++++++----- pkg/sql/lease_internal_test.go | 77 +++++++++++++++++++++++++++++++ pkg/sql/lease_test.go | 4 +- 3 files changed, 152 insertions(+), 11 deletions(-) diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index d4ccb22c557c..7850e646b514 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -40,14 +40,23 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) -// TODO(pmattis): Periodically renew leases for tables that were used recently and -// for which the lease will expire soon. - var ( // LeaseDuration is the mean duration a lease will be acquired for. The - // actual duration is jittered in the range - // [0.75,1.25]*LeaseDuration. Exported for testing purposes only. + // actual duration is jittered using LeaseJitterMultiplier. + // Exported for testing purposes only. LeaseDuration = 5 * time.Minute + // LeaseJitterMultiplier is the factor that we use to randomly jitter the lease + // duration when acquiring a new lease. The range of the actual lease duration will + // be [(1-LeaseJitterMultiplier) * LeaseDuration, (1-LeaseJitterMultiplier) * LeaseDuration] + // Exported for testing purposes only. + LeaseJitterMultiplier = 0.25 + // LeaseRenewalTimeout is the duration for when we renew the lease early. + // This is specifically 30 seconds less than the minimum LeaseDuration, to be + // sure not to let the lease expire before renewing. Exported for testing + // purposes. + // TODO(joey): We can make the timeout an offset from the actual expiration, + // after jitter. + LeaseRenewalTimeout = LeaseDuration - time.Duration(float64(LeaseDuration)*LeaseJitterMultiplier) - 30*time.Second ) // tableVersionState holds the state for a table version. This includes @@ -122,9 +131,9 @@ type LeaseStore struct { } // jitteredLeaseDuration returns a randomly jittered duration from the interval -// [0.75 * leaseDuration, 1.25 * leaseDuration]. +// [(1-LeaseJitterMultiplier) * leaseDuration, (1+LeaseJitterMultiplier) * leaseDuration]. func jitteredLeaseDuration() time.Duration { - return time.Duration(float64(LeaseDuration) * (0.75 + 0.5*rand.Float64())) + return time.Duration(float64(LeaseDuration) * (1 - LeaseJitterMultiplier + 2*LeaseJitterMultiplier*rand.Float64())) } // acquire a lease on the most recent version of a table descriptor. @@ -562,6 +571,10 @@ type tableState struct { // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. dropped bool + // Timer for refreshing the lease before it expires. Once a lease has been + // acquired once, a timer continuously runs to refresh the lease on a + // separate routine after LeaseRenewalTimeout time. + timer *time.Timer } } @@ -573,11 +586,12 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. + + // Block and wait for the new lease to be acquired on another routine. t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. + // Ensure a lease is acquired. Even though we finished waiting, another + // routine which finished acquiring a lease could have released it by now. if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { return nil, err @@ -587,6 +601,43 @@ func (t *tableState) acquire( return t.findForTimestamp(ctx, timestamp, m) } +// acquireAlways ensures a new lease is acquired. This is used for making sure a +// new lease moves forward when refreshing the lease. +func (t *tableState) acquireAlways(ctx context.Context, m *LeaseManager) error { + t.mu.Lock() + defer t.mu.Unlock() + + // Block and wait for the new lease to be acquired on another routine to + // prevent a race condition of entering acquireNodeLease + t.acquireWait() + + // Ensure a lease is acquired. + return t.acquireNodeLease(ctx, m, hlc.Timestamp{}) +} + +// startRefreshingRoutine begins the goroutine responsible for refreshing a +// table lease. +// TODO(joey): Currently this routine will run forever for each table +// descriptor. Ideally, we stop the routine if the table descriptor hasn't been +// used recently. +// +// t.mu needs to be locked. +func (t *tableState) startRefreshingRoutine(m *LeaseManager) { + ctx := context.TODO() + t.stopper.RunWorker(ctx, func(ctx context.Context) { + for { + select { + case <-t.mu.timer.C: + if err := t.acquireAlways(ctx, m); err != nil { + log.Error(ctx, err) + } + case <-t.stopper.ShouldStop(): + return + } + } + }) +} + // ensureVersion ensures that the latest version >= minVersion. It will // check if the latest known version meets the criterion, or attempt to // acquire a lease at the latest version with the hope that it meets @@ -800,6 +851,17 @@ func (t *tableState) acquireNodeLease( } t.upsertLocked(ctx, table, m) t.tableNameCache.insert(table) + + // Either begin a new routine to start refreshing the lease asynchronously or + // or reset the timer for it. After the first lease acquisition, we should + // always have a routine actively renewing the lease. If a descriptor is + // dropped LeaseStore.acquire() will fail and the timer will not be set. + if t.mu.timer == nil { + t.mu.timer = time.NewTimer(LeaseRenewalTimeout) + t.startRefreshingRoutine(m) + } else { + t.mu.timer.Reset(LeaseRenewalTimeout) + } return nil } diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index 7229072cbc76..4867b2d6ea14 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -19,16 +19,20 @@ package sql import ( "fmt" "sync" + "sync/atomic" "testing" + "time" "golang.org/x/net/context" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/pkg/errors" ) func TestTableSet(t *testing.T) { @@ -558,3 +562,76 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); wg.Wait() } + +// This test makes sure the lease gets renewed automatically in the background. +func TestLeaseRefreshedAutomatically(t *testing.T) { + defer leaktest.AfterTest(t)() + var testAcquiredCount int32 + testingKnobs := base.TestingKnobs{ + SQLLeaseManager: &LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{ + // We want to track what leases get acquired, + LeaseAcquiredEvent: func(table sqlbase.TableDescriptor, _ error) { + if table.Name == "test" { + atomic.AddInt32(&testAcquiredCount, 1) + } + }, + }, + }, + } + s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{Knobs: testingKnobs}) + defer s.Stopper().Stop(context.TODO()) + leaseManager := s.LeaseManager().(*LeaseManager) + + if _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); +`); err != nil { + t.Fatal(err) + } + now := s.Clock().Now() + tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") + + // We set LeaseRenewalTimeout to be low so it will refresh frequently, and the + // LeaseJitterMultiplier to ensure any newer leases will have a higher + // expiration timestamp. + savedLeaseRenewalTimeout := LeaseRenewalTimeout + savedLeaseJitterMultiplier := LeaseJitterMultiplier + defer func() { + LeaseRenewalTimeout = savedLeaseRenewalTimeout + LeaseJitterMultiplier = savedLeaseJitterMultiplier + }() + LeaseRenewalTimeout = 5 * time.Millisecond + LeaseJitterMultiplier = 0 + + // Acquire the first lease. This begins the refreshing routine. + _, e1, err := leaseManager.Acquire(context.TODO(), now, tableDesc.ID) + if err != nil { + t.Error(err) + } + + // Keep checking for a new lease to be acquired by the refreshing routine. + testutils.SucceedsSoon(t, func() error { + // Acquire the newer lease. + ts, e2, err := leaseManager.Acquire(context.TODO(), now, tableDesc.ID) + if err != nil { + t.Fatal(err) + } + + defer func() { + err := leaseManager.Release(ts) + if err != nil { + t.Fatal(err) + } + }() + + if e2.WallTime <= e1.WallTime { + return errors.Errorf("expected new lease expiration (%s) to be after old lease expiration (%s)", + e2, e1) + } else if count := atomic.LoadInt32(&testAcquiredCount); count < 3 { + return errors.Errorf("expected at least 3 leases to be acquired, but only acquired %d times", + count) + } + return nil + }) +} diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index eec6b2fe1c92..a821c52b0088 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -223,8 +223,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error.