Skip to content

Commit

Permalink
storage: fork-lift proposer-evaluated KV
Browse files Browse the repository at this point in the history
Add experimental proposer-evaluated KV gated behind the environment variable
`COCKROACH_PROPOSER_EVALUATED_KV`. When set to a truthy value, Raft proposals
are evaluated and the resulting RocksDB `WriteBatch` submitted to Raft along
with some auxiliary metadata. The result of the evaluation is only stored in
the pending command on the proposer, and returned to the waiting client after
the `WriteBatch` has been applied.

Introduce a natural failfast path for (most) proposals returning an error.
Instead of proposing, waiting for Raft, and only then receiving an error,
proposals which do not lead to a state change receive their error directly
when the proposal is evaluated, upstream of Raft. Only errors which still
want to persist data (for instance, `*TransactionRetryError` when intents
were laid down) go through the whole proposal, with the client receiving
the error after the associated `Batch` commits.

While proposer-evaluated KV is now ready for review, preliminary testing and
benchmarking, the current implementation is incomplete and incorrect:
- `Lease` acquisition is not special-cased, meaning that lease state may be
  clobbered freely when non-leaseholders propose a lease request based on stale
  data. This needs to be fixed but it also shows that we don't stress that
  scenario sufficiently in testing yet.
- Similarly, we don't check that commands can only apply under the lease that
  they were proposed (which is necessary).
- `CommandQueue` does not account for internal keys accessed by overlapping
  commands correctly (this is tracked in #10084), which in principle also lead
  to anomalies which should be exposed by testing and addressed.
- `TestingCommandFilter` needs to be refactored to be an explicit interceptor
  for the pre-Raft stage of commands. Tests were fixed up enough to pass with
  proposer-evaluated KV as well, but it's possible that some tests don't test
  what they used to.
  • Loading branch information
tbg committed Oct 31, 2016
1 parent 11825cc commit cdf73a7
Show file tree
Hide file tree
Showing 13 changed files with 1,306 additions and 387 deletions.
16 changes: 14 additions & 2 deletions pkg/kv/txn_correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,12 +750,18 @@ func (hv *historyVerifier) runHistory(
var wg sync.WaitGroup
wg.Add(len(txnMap))
retryErrs := make(chan *retryError, len(txnMap))
errs := make(chan error, 1) // only populated while buffer available

for i, txnCmds := range txnMap {
go func(i int, txnCmds []*cmd) {
if err := hv.runTxn(i, priorities[i], isolations[i], txnCmds, db, t); err != nil {
if re, ok := err.(*retryError); !ok {
t.Errorf("(%s): unexpected failure: %s", cmds, err)
reportErr := errors.Wrapf(err, "(%s): unexpected failure", cmds)
select {
case errs <- reportErr:
default:
t.Error(reportErr)
}
} else {
retryErrs <- re
}
Expand All @@ -765,7 +771,13 @@ func (hv *historyVerifier) runHistory(
}
wg.Wait()

// If we received a retry error, propagate the first one now.
// For serious errors, report the first one.
select {
case err := <-errs:
return err
default:
}
// In the absence of serious errors, report the first retry error, if any.
select {
case re := <-retryErrs:
return re
Expand Down
20 changes: 14 additions & 6 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ func TestRestoreReplicas(t *testing.T) {
}
}

// TODO(bdarnell): more aggressive testing here; especially with
// proposer-evaluated KV, what this test does is much less as it doesn't
// exercise the path in which the replica change fails at *apply* time (we only
// test the failfast path), in which it isn't even proposed.
func TestFailedReplicaChange(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -815,18 +819,17 @@ func TestStoreRangeCorruptionChangeReplicas(t *testing.T) {
syncutil.Mutex
store *storage.Store
}
sc.TestingKnobs.TestingCommandFilter = func(filterArgs storagebase.FilterArgs) *roachpb.Error {
sc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
corrupt.Lock()
defer corrupt.Unlock()

if corrupt.store == nil || filterArgs.Sid != corrupt.store.StoreID() {
if corrupt.store == nil || filterArgs.StoreID != corrupt.store.StoreID() {
return nil
}

if filterArgs.Req.Header().Key.Equal(roachpb.Key("boom")) {
return roachpb.NewError(storage.NewReplicaCorruptionError(errors.New("test")))
}
return nil
return roachpb.NewError(
storage.NewReplicaCorruptionError(errors.New("test")),
)
}

// Don't timeout raft leader.
Expand Down Expand Up @@ -1049,6 +1052,11 @@ func TestStoreRangeDownReplicate(t *testing.T) {

// TestChangeReplicasDescriptorInvariant tests that a replica change aborts if
// another change has been made to the RangeDescriptor since it was initiated.
//
// TODO(tschottdorf): If this test is flaky because the snapshot count does not
// increase, it's likely because with proposer-evaluated KV, less gets proposed
// and so sometimes Raft discards the preemptive snapshot (though we count that
// case in stats already) or doesn't produce a Ready.
func TestChangeReplicasDescriptorInvariant(t *testing.T) {
defer leaktest.AfterTest(t)()
mtc := startMultiTestContext(t, 3)
Expand Down
Loading

0 comments on commit cdf73a7

Please sign in to comment.