Skip to content

Commit

Permalink
Merge pull request cockroachdb#85428 from erikgrinaker/backport22.1-8…
Browse files Browse the repository at this point in the history
…4865

release-22.1: kvserver: always return NLHE on lease acquisition timeouts
  • Loading branch information
erikgrinaker authored Aug 8, 2022
2 parents 351a0ad + c874b0d commit c510310
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 29 deletions.
121 changes: 121 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -1309,3 +1310,123 @@ func TestAlterRangeRelocate(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...))
}

// TestAcquireLeaseTimeout is a regression test that lease acquisition timeouts
// always return a NotLeaseHolderError.
func TestAcquireLeaseTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Set a timeout for the test context, to guard against the test getting stuck.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

// blockRangeID, when non-zero, will signal the replica to delay lease
// requests for the given range until the request's context is cancelled, and
// return the context error.
var blockRangeID int32

maybeBlockLeaseRequest := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if ba.IsSingleRequest() && ba.Requests[0].GetInner().Method() == roachpb.RequestLease &&
int32(ba.RangeID) == atomic.LoadInt32(&blockRangeID) {
t.Logf("blocked lease request for r%d", ba.RangeID)
<-ctx.Done()
return roachpb.NewError(ctx.Err())
}
return nil
}

// The lease request timeout depends on the Raft election timeout, so we set
// it low to get faster timeouts (800 ms) and speed up the test.
var raftCfg base.RaftConfig
raftCfg.SetDefaults()
raftCfg.RaftHeartbeatIntervalTicks = 1
raftCfg.RaftElectionTimeoutTicks = 2

manualClock := hlc.NewHybridManualClock()

// Start a two-node cluster.
const numNodes = 2
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: raftCfg,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manualClock.UnixNano,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: maybeBlockLeaseRequest,
AllowLeaseRequestProposalsWhenNotLeader: true,
},
},
},
})
defer tc.Stopper().Stop(ctx)
srv := tc.Server(0)

// Split off a range, upreplicate it to both servers, and move the lease
// from n1 to n2.
splitKey := roachpb.Key("a")
_, desc := tc.SplitRangeOrFatal(t, splitKey)
tc.AddVotersOrFatal(t, splitKey, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)

// Stop n2 and increment its epoch to invalidate the lease.
lv, ok := tc.Server(1).NodeLiveness().(*liveness.NodeLiveness)
require.True(t, ok)
lvNode2, ok := lv.Self()
require.True(t, ok)
tc.StopServer(1)

manualClock.Forward(lvNode2.Expiration.WallTime)
lv, ok = srv.NodeLiveness().(*liveness.NodeLiveness)
require.True(t, ok)
testutils.SucceedsSoon(t, func() error {
err := lv.IncrementEpoch(context.Background(), lvNode2)
if errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
return nil
}
return err
})
require.False(t, repl.CurrentLeaseStatus(ctx).IsValid())

// Trying to acquire the lease should error with an empty NLHE, since the
// range doesn't have quorum.
var nlhe *roachpb.NotLeaseHolderError
_, err = repl.TestingAcquireLease(ctx)
require.Error(t, err)
require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type
require.ErrorAs(t, err, &nlhe)
require.Empty(t, nlhe.Lease)

// Now for the real test: block lease requests for the range, and send off a
// bunch of sequential lease requests with a small delay, which should join
// onto the same lease request internally. All of these should return a NLHE
// when they time out, regardless of the internal mechanics.
atomic.StoreInt32(&blockRangeID, int32(desc.RangeID))

const attempts = 20
var wg sync.WaitGroup
errC := make(chan error, attempts)
wg.Add(attempts)
for i := 0; i < attempts; i++ {
time.Sleep(10 * time.Millisecond)
go func() {
_, err := repl.TestingAcquireLease(ctx)
errC <- err
wg.Done()
}()
}
wg.Wait()
close(errC)

for err := range errC {
require.Error(t, err)
require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type
require.ErrorAs(t, err, &nlhe)
require.Empty(t, nlhe.Lease)
}
}
36 changes: 7 additions & 29 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -349,15 +348,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
p.nextLease = roachpb.Lease{}
}

// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * p.repl.store.cfg.RaftElectionTimeout()

const taskName = "pendingLeaseRequest: requesting lease"
err := p.repl.store.Stopper().RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: taskName,
TaskName: "pendingLeaseRequest: requesting lease",
// Trace the lease acquisition as a child even though it might outlive the
// parent in case the parent's ctx is canceled. Other requests might
// later block on this lease acquisition too, and we can't include the
Expand All @@ -368,13 +362,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
func(ctx context.Context) {
defer sp.Finish()

// Run the lease acquisition request with a timeout. We must eventually
// return a NotLeaseHolderError rather than hanging, otherwise we could
// prevent the caller from nudging a different replica into acquiring the
// lease.
err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error {
return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq)
})
err := p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq)
// Error will be handled below.

// We reset our state below regardless of whether we've gotten an error or
Expand Down Expand Up @@ -1167,22 +1155,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * r.store.cfg.RaftElectionTimeout()
if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout,
func(ctx context.Context) error {
status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig)
return nil
},
); err != nil {
return kvserverpb.LeaseStatus{}, roachpb.NewError(err)
}
return status, pErr
}

// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like
// redirectOnOrAcquireLeaseForRequest, but runs without a timeout.
func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout(
ctx context.Context, reqTS hlc.Timestamp, brSig signaller,
) (kvserverpb.LeaseStatus, *roachpb.Error) {
// Does not use RunWithTimeout(), because we do not want to mask the
// NotLeaseHolderError on context cancellation.
ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()

// Try fast-path.
now := r.store.Clock().NowAsClockTimestamp()
{
Expand Down

0 comments on commit c510310

Please sign in to comment.