Skip to content

Commit

Permalink
kvserver: check GC threshold after acquiring a storage snapshot
Browse files Browse the repository at this point in the history
Previously, we would check if a given read could proceed with execution
_before_ we acquired a snapshot of the storage engine state. In particular, we
would check the replica's in-memory GC threshold before the state of the engine
had been pinned.

This meant that the following scenario was possible:
1. Request comes in, checks the replica's in-memory GC threshold and determines
that it is safe to proceed.
2. A pair of GC requests bump the GC threshold and garbage-collect the expired data.
3. The read acquires a snapshot of the storage engine state.
4. The read continues with its execution and sees an incorrect result.

This commit makes it such that we now check the GC threshold after we acquire a
snapshot of the storage engine state. It does so by lifting the GC
threshold check out of `checkExecutionCanProceed()` and splitting up the
single critical section under `Replica.mu` into two.

NB: Note that this commit only fixes our current model of issuing `GCRequest`s
-- which is that we first issue a GCRequest that simply bumps the GC threshold
and then issue another GCRequest that actually performs the garbage collection.
If a single GCRequest were to do both of these things, we'd still have an issue
with reads potentially seeing incorrect results since, currently, the in-memory
GC threshold is bumped as a "post-apply" side effect that takes effect after
the expired data has already been garbage collected. This will be handled in a
future patch.

Release note: none
  • Loading branch information
aayushshah15 committed Jun 28, 2022
1 parent 5541cf8 commit ab6d45e
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 24 deletions.
68 changes: 52 additions & 16 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,10 +1345,12 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(

// TODO(nvanbenschoten): move the following 5 methods to replica_send.go.

// checkExecutionCanProceed returns an error if a batch request cannot be
// executed by the Replica. An error indicates that the Replica is not live and
// able to serve traffic or that the request is not compatible with the state of
// the Range due to the range's key bounds, the range's lease, the range's GC
// checkExecutionCanProceedBeforeStorageSnapshot returns an error if a batch
// request cannot be executed by the Replica. For read-only requests, the method
// is called before the state of the storage engine is pinned (via an iterator
// or a snapshot). An error indicates that the Replica is not live and able to
// serve traffic or that the request is not compatible with the state of the
// Range due to the range's key bounds, the range's lease, the range's GC
// threshold, or due to a pending merge. On success, returns nil and either a
// zero LeaseStatus (indicating that the request was permitted to skip the lease
// checks) or a LeaseStatus in LeaseState_VALID (indicating that the Replica is
Expand All @@ -1359,7 +1361,7 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
// will not wait for a pending merge to conclude before proceeding. Callers
// might be ok with this if they know that they will end up checking for a
// pending merge at some later time.
func (r *Replica) checkExecutionCanProceed(
func (r *Replica) checkExecutionCanProceedBeforeStorageSnapshot(
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard,
) (kvserverpb.LeaseStatus, error) {
rSpan, err := keys.Range(ba.Requests)
Expand Down Expand Up @@ -1398,7 +1400,7 @@ func (r *Replica) checkExecutionCanProceed(
return kvserverpb.LeaseStatus{}, err
}

st, shouldExtend, err := r.checkGCThresholdAndLeaseRLocked(ctx, ba)
st, shouldExtend, err := r.checkLeaseRLocked(ctx, ba)
if err != nil {
return kvserverpb.LeaseStatus{}, err
}
Expand Down Expand Up @@ -1432,13 +1434,54 @@ func (r *Replica) checkExecutionCanProceed(
return st, nil
}

// checkGCThresholdAndLeaseRLocked checks the provided batch against the GC
// checkExecutionCanProceedAfterStorageSnapshot returns an error if a batch
// request cannot be executed by the Replica. For read-only requests, this
// method is called after the state of the storage engine is pinned via an
// iterator. An error indicates that the request's timestamp is below the
// Replica's GC threshold.
func (r *Replica) checkExecutionCanProceedAfterStorageSnapshot(
ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus,
) error {
r.mu.RLock()
defer r.mu.RUnlock()
// NB: For read-only requests, the GC threshold check is performed after the
// state of the storage engine has been pinned by the iterator. This is
// because GC requests don't acquire latches at the timestamp they are garbage
// collecting, so read traffic at / around the GC threshold will not be
// serialized with GC requests. Thus, we must check the in-memory GC threshold
// after we pin the state of the storage engine [1].
//
// [1]: This relies on the invariant that the in-memory GC threshold is bumped
// _before_ the actual garbage collection happens.
//
// TODO(aayush): The above description intentionally omits some details, as
// they are going to be changed as part of
// https://github.com/cockroachdb/cockroach/issues/55293.
return r.checkTSAboveGCThresholdRLocked(ba.EarliestActiveTimestamp(), st, ba.IsAdmin())
}

// checkExecutionCanProceedRWOrAdmin returns an error if a batch request going
// through the RW or admin paths cannot be executed by the Replica.
func (r *Replica) checkExecutionCanProceedRWOrAdmin(
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard,
) (kvserverpb.LeaseStatus, error) {
st, err := r.checkExecutionCanProceedBeforeStorageSnapshot(ctx, ba, g)
if err != nil {
return kvserverpb.LeaseStatus{}, err
}
if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil {
return kvserverpb.LeaseStatus{}, err
}
return st, nil
}

// checkLeaseRLocked checks the provided batch against the GC
// threshold and lease. A nil error indicates to go ahead with the batch, and
// is accompanied either by a valid or zero lease status, the latter case
// indicating that the request was permitted to bypass the lease check. The
// returned bool indicates whether the lease should be extended (only on nil
// error).
func (r *Replica) checkGCThresholdAndLeaseRLocked(
func (r *Replica) checkLeaseRLocked(
ctx context.Context, ba *roachpb.BatchRequest,
) (kvserverpb.LeaseStatus, bool, error) {
now := r.Clock().NowAsClockTimestamp()
Expand Down Expand Up @@ -1474,21 +1517,14 @@ func (r *Replica) checkGCThresholdAndLeaseRLocked(
}
// Otherwise, suppress the error. Also, remember that we're not serving
// this under the lease by zeroing out the status. We also intentionally
// do not pass the original status to checkTSAboveGCThresholdRLocked as
// do not pass the original status to checkTSAboveGCThreshold as
// this method assumes that a valid status indicates that this replica
// holds the lease (see #73123). `shouldExtend` is already false in this
// branch, but for completeness we zero it out as well.
st, shouldExtend, err = kvserverpb.LeaseStatus{}, false, nil
}
}

// Check if request is below the GC threshold and if so, error out. Note that
// this uses the lease status no matter whether it's valid or not, and the
// method is set up to handle that.
if err := r.checkTSAboveGCThresholdRLocked(ba.EarliestActiveTimestamp(), st, ba.IsAdmin()); err != nil {
return kvserverpb.LeaseStatus{}, false, err
}

return st, shouldExtend, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_follower_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestCheckExecutionCanProceedAllowsFollowerReadWithInvalidLease(t *testing.T
}
ba.Add(&gArgs)

ls, err := r.checkExecutionCanProceed(ctx, ba, nil /* g */)
ls, err := r.checkExecutionCanProceedBeforeStorageSnapshot(ctx, ba, nil /* g */)
require.NoError(t, err)
require.Empty(t, ls)

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ func (r *Replica) evalAndPropose(
proposal.command.ProposerLeaseSequence = seq
} else if !st.Lease.OwnedBy(r.store.StoreID()) {
// Perform a sanity check that the lease is owned by this replica. This must
// have been ascertained by the callers in checkExecutionCanProceed.
// have been ascertained by the callers in
// checkExecutionCanProceedBeforeStorageSnapshot.
log.Fatalf(ctx, "cannot propose %s on follower with remotely owned lease %s", ba, st.Lease)
} else {
proposal.command.ProposerLeaseSequence = st.Lease.Sequence
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (r *Replica) executeReadOnlyBatch(
defer r.readOnlyCmdMu.RUnlock()

// Verify that the batch can be executed.
st, err := r.checkExecutionCanProceed(ctx, ba, g)
st, err := r.checkExecutionCanProceedBeforeStorageSnapshot(ctx, ba, g)
if err != nil {
return nil, g, roachpb.NewError(err)
}
Expand Down Expand Up @@ -74,6 +74,9 @@ func (r *Replica) executeReadOnlyBatch(
}
defer rw.Close()

if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil {
return nil, g, roachpb.NewError(err)
}
// TODO(nvanbenschoten): once all replicated intents are pulled into the
// concurrency manager's lock-table, we can be sure that if we reached this
// point, we will not conflict with any of them during evaluation. This in
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ func (r *Replica) executeAdminBatch(
return nil, roachpb.NewError(err)
}

_, err := r.checkExecutionCanProceed(ctx, ba, nil /* g */)
_, err := r.checkExecutionCanProceedRWOrAdmin(ctx, ba, nil /* g */)
if err == nil {
err = r.signallerForBatch(ba).Err()
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,14 @@ func (r *Replica) executeWriteBatch(
}()

// Verify that the batch can be executed.
st, err := r.checkExecutionCanProceed(ctx, ba, g)
st, err := r.checkExecutionCanProceedRWOrAdmin(ctx, ba, g)
if err != nil {
return nil, g, roachpb.NewError(err)
}

// Check the breaker. Note that we do this after checkExecutionCanProceed,
// so that NotLeaseholderError has precedence.
// Check the breaker. Note that we do this after
// checkExecutionCanProceedBeforeStorageSnapshot, so that NotLeaseholderError
// has precedence.
if err := r.signallerForBatch(ba).Err(); err != nil {
return nil, g, roachpb.NewError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) {
t.Fatal("replica was not marked as destroyed")
}

if _, err = repl1.checkExecutionCanProceed(ctx, &roachpb.BatchRequest{}, nil /* g */); !errors.Is(err, expErr) {
if _, err = repl1.checkExecutionCanProceedBeforeStorageSnapshot(ctx, &roachpb.BatchRequest{}, nil /* g */); !errors.Is(err, expErr) {
t.Fatalf("expected error %s, but got %v", expErr, err)
}
}
Expand Down

0 comments on commit ab6d45e

Please sign in to comment.