Skip to content

Commit

Permalink
Merge #62719 #65454
Browse files Browse the repository at this point in the history
62719: kvserver: simplify and document handling of ReplicaPlaceholder r=erikgrinaker a=tbg

This commit cleans up the handling of ReplicaPlaceholder and documents their
semantics better. I embarked on this while looking into an unrelated test
flake. It occurred to me that we were not installing placeholders before the
snapshot transfer, but only after. This seems like a bad idea now that our
average snapshot size has gone up, as placeholders help avoid duplicate work;
we now install them before accepting the range data.

In practice, we currently only allow once snapshot inflight per store, so it
is not clear that that improvement is buying us anything. I think that the
improved clarity in this commit will, though.

Concretely, we now have two simple rules for placeholders:

1. they only exist for uninitialized replicas (and must exist for those, or we
   can end up with overlapping replicas)
2. you write it --> you (and *only* you) remove it.


1 was true before but the documentation was not clear. 2. was not true, as
there were a few out-of-band places that removed placeholders (and didn't
clearly explain why - sounded like we were previously leaking placeholders,
maybe @ajwerner remembers?).

There's also a bit of extra cleanup that clarifies things - for example, since
caller to `applySnapshot` already checked for an empty snapshot, the
placeholder removal within it effectively dead code and is now removed. Also,
in receiveSnapshot, there was an optimistic check that allowed us to discard
snapshots early. It was always confusing to keep track of which one is which
and now that we've lifted the authoritative check to the top of
receiveSnapshot, we got to delete the other one.

Release note: None


65454: jobs: move trace_id from payload to progress r=pbardea a=adityamaru

The trace_id associated with a job is updated on every
resumption of the job. If the job is paused and resumed
several times, we don't want to rewrite the payload every
single time. Thus, we move the field to the job progress
which is expected to be rewritten frequently.

Also addresses a comment from #65322.

Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
3 people committed May 19, 2021
3 parents 95b6c1f + 6b9168d + c9599e9 commit 3ffafc3
Show file tree
Hide file tree
Showing 21 changed files with 989 additions and 674 deletions.
10 changes: 4 additions & 6 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,15 @@ func (r *Registry) runJob(
//
// A new root span will be created on every resumption of the job.
var spanOptions []tracing.SpanOption
if tj, ok := resumer.(TraceableJob); ok {
if tj.ForceRealSpan() {
spanOptions = append(spanOptions, tracing.WithForceRealSpan())
}
if _, ok := resumer.(TraceableJob); ok {
spanOptions = append(spanOptions, tracing.WithForceRealSpan())
}
ctx, span = r.settings.Tracer.StartSpanCtx(ctx, spanName, spanOptions...)
defer span.Finish()
if err := job.Update(ctx, nil /* txn */, func(txn *kv.Txn, md JobMetadata,
ju *JobUpdater) error {
md.Payload.TraceID = span.TraceID()
ju.UpdatePayload(md.Payload)
md.Progress.TraceID = span.TraceID()
ju.UpdateProgress(md.Progress)
return nil
}); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type StartableJob struct {
type TraceableJob interface {
// ForceRealSpan forces the registry to create a real Span instead of a
// low-overhead non-recordable noop span.
ForceRealSpan() bool
ForceRealSpan()
}

func init() {
Expand Down
700 changes: 350 additions & 350 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ message Payload {
NewSchemaChangeDetails newSchemaChange = 24;
MigrationDetails migration = 25;
}
uint64 trace_id = 26 [(gogoproto.customname) = "TraceID"];
reserved 26;

// NEXT ID: 27.
}
Expand All @@ -725,6 +725,8 @@ message Progress {
NewSchemaChangeProgress newSchemaChange = 19;
MigrationProgress migration = 20;
}

uint64 trace_id = 21 [(gogoproto.customname) = "TraceID"];
}

enum Type {
Expand Down
156 changes: 104 additions & 52 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -1499,14 +1500,20 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) {
tc.AddAndStartServer(t, stickyServerArgs[2])

// Try to up-replicate the RHS of the split to store 2.
if _, err := tc.AddVoters(splitKey, tc.Target(3)); !testutils.IsError(err, kvserver.IntersectingSnapshotMsg) {
// Don't use tc.AddVoter because we expect a retriable error and want it
// returned to us.
if _, err := tc.Servers[0].DB().AdminChangeReplicas(
ctx, splitKey, tc.LookupRangeOrFatal(t, splitKey),
roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(3)),
); !kvserver.IsRetriableReplicationChangeError(err) {
t.Fatalf("unexpected error %v", err)
}

// Enable the replica GC queue so that the next attempt to replicate the RHS
// to store 2 will cause the obsolete replica to be GC'd allowing a
// subsequent replication to succeed.
tc.GetFirstStoreFromServer(t, 3).SetReplicaGCQueueActive(true)
tc.AddVotersOrFatal(t, splitKey, tc.Target(3))
}

// Test that when a Raft group is not able to establish a quorum, its Raft log
Expand Down Expand Up @@ -2492,6 +2499,9 @@ func TestReportUnreachableRemoveRace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rng, seed := randutil.NewTestPseudoRand()
t.Logf("seed is %d", seed)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3,
base.TestClusterArgs{
Expand All @@ -2502,54 +2512,78 @@ func TestReportUnreachableRemoveRace(t *testing.T) {
tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 2)...))

outer:
for i := 0; i < 5; i++ {
for leaderIdx := range tc.Servers {
repl := tc.GetFirstStoreFromServer(t, leaderIdx).LookupReplica(roachpb.RKey(key))
require.NotNil(t, repl)
if repl.RaftStatus().SoftState.RaftState == raft.StateLeader {
for replicaIdx := range tc.Servers {
if replicaIdx == leaderIdx {
continue
}
repDesc, err := repl.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
if lease, _ := repl.GetLease(); lease.Replica.Equal(repDesc) {
tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(replicaIdx))
}
tc.RemoveVotersOrFatal(t, key, tc.Target(leaderIdx))
// We want to stop all nodes from talking to the replicaIdx, so need
// to trip the breaker on all servers but it.
for i := range tc.Servers {
if i != replicaIdx {
cb := tc.Servers[i].RaftTransport().GetCircuitBreaker(tc.Target(replicaIdx).NodeID, rpc.DefaultClass)
cb.Break()
}
}
time.Sleep(tc.GetFirstStoreFromServer(t, replicaIdx).GetStoreConfig().CoalescedHeartbeatsInterval)
for i := range tc.Servers {
if i != replicaIdx {
cb := tc.Servers[i].RaftTransport().GetCircuitBreaker(tc.Target(replicaIdx).NodeID, rpc.DefaultClass)
cb.Reset()
}
}
// Make sure the old replica was actually removed, before we try to re-adding it.
testutils.SucceedsSoon(t, func() error {
if oldRepl := tc.GetFirstStoreFromServer(t, leaderIdx).LookupReplica(roachpb.RKey(key)); oldRepl != nil {
return errors.Errorf("Expected replica %s to be removed", oldRepl)
}
return nil
})
tc.AddVotersOrFatal(t, key, tc.Target(leaderIdx))
require.NoError(t, tc.WaitForVoters(key, tc.Target(leaderIdx)))
continue outer
// Find the Raft leader.
var leaderIdx int
var leaderRepl *kvserver.Replica
testutils.SucceedsSoon(t, func() error {
for idx := range tc.Servers {
repl := tc.GetFirstStoreFromServer(t, idx).LookupReplica(roachpb.RKey(key))
require.NotNil(t, repl)
if repl.RaftStatus().SoftState.RaftState == raft.StateLeader {
leaderIdx = idx
leaderRepl = repl
return nil
}
t.Fatal("could not find raft replica")
}
return errors.New("no Raft leader found")
})

// Raft leader found. Make sure it doesn't have the lease (transferring it
// away if necessary, which entails picking a random other node and sending
// the lease to it). We'll also partition this random node away from the rest
// as this was (way back) how we triggered problems with coalesced heartbeats.
partitionedMaybeLeaseholderIdx := (leaderIdx + 1 + rng.Intn(tc.NumServers()-1)) % tc.NumServers()
t.Logf("leader is idx=%d, partitioning idx=%d", leaderIdx, partitionedMaybeLeaseholderIdx)
leaderRepDesc, err := leaderRepl.GetReplicaDescriptor()
require.NoError(t, err)
if lease, _ := leaderRepl.GetLease(); lease.OwnedBy(leaderRepDesc.StoreID) {
tc.TransferRangeLeaseOrFatal(t, *leaderRepl.Desc(), tc.Target(partitionedMaybeLeaseholderIdx))
}
i-- // try again

// Remove the raft leader.
t.Logf("removing leader")
tc.RemoveVotersOrFatal(t, key, tc.Target(leaderIdx))

// Pseudo-partition partitionedMaybeLeaseholderIdx away from everyone else. We do this by tripping
// the circuit breaker on all other nodes.
t.Logf("partitioning")
for i := range tc.Servers {
if i != partitionedMaybeLeaseholderIdx {
cb := tc.Servers[i].RaftTransport().GetCircuitBreaker(tc.Target(partitionedMaybeLeaseholderIdx).NodeID, rpc.DefaultClass)
cb.Break()
}
}

// Wait out the heartbeat interval and resolve the partition.
heartbeatInterval := tc.GetFirstStoreFromServer(t, partitionedMaybeLeaseholderIdx).GetStoreConfig().CoalescedHeartbeatsInterval
time.Sleep(heartbeatInterval)
t.Logf("resolving partition")
for i := range tc.Servers {
if i != partitionedMaybeLeaseholderIdx {
cb := tc.Servers[i].RaftTransport().GetCircuitBreaker(tc.Target(partitionedMaybeLeaseholderIdx).NodeID, rpc.DefaultClass)
cb.Reset()
}
}

t.Logf("waiting for replicaGC of removed leader replica")
// Make sure the old replica was actually removed by replicaGC, before we
// try to re-add it. Otherwise the addition below might fail. One shot here
// is often enough, but not always; in the worst case we need to wait out
// something on the order of a election timeout plus
// ReplicaGCQueueSuspectTimeout before replicaGC will be attempted (and will
// then succeed on the first try).
testutils.SucceedsSoon(t, func() error {
s := tc.GetFirstStoreFromServer(t, leaderIdx)
s.MustForceReplicaGCScanAndProcess()
if oldRepl := tc.GetFirstStoreFromServer(t, leaderIdx).LookupReplica(roachpb.RKey(key)); oldRepl != nil {
return errors.Errorf("Expected replica %s to be removed", oldRepl)
}
return nil
})
t.Logf("re-adding leader")
tc.AddVotersOrFatal(t, key, tc.Target(leaderIdx))
require.NoError(t, tc.WaitForVoters(key, tc.Target(leaderIdx)))
}
}

Expand Down Expand Up @@ -4873,6 +4907,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
require.NoError(t, db.Run(ctx, b))
}
ensureNoTombstone := func(t *testing.T, store *kvserver.Store, rangeID roachpb.RangeID) {
t.Helper()
var tombstone roachpb.RangeTombstone
tombstoneKey := keys.RangeTombstoneKey(rangeID)
ok, err := storage.MVCCGetProto(
Expand Down Expand Up @@ -5040,7 +5075,10 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// Unsuccessful because the RHS will not accept the learner snapshot
// and will be rolled back. Nevertheless it will have learned that it
// has been removed at the old replica ID.
_, err = tc.AddVoters(keyB, tc.Target(0))
_, err = tc.Servers[0].DB().AdminChangeReplicas(
ctx, keyB, tc.LookupRangeOrFatal(t, keyB),
roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(0)),
)
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)

// Without a partitioned RHS we'll end up always writing a tombstone here because
Expand Down Expand Up @@ -5090,7 +5128,10 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// Unsuccessfuly because the RHS will not accept the learner snapshot
// and will be rolled back. Nevertheless it will have learned that it
// has been removed at the old replica ID.
_, err = tc.AddVoters(keyB, tc.Target(0))
_, err = tc.Servers[0].DB().AdminChangeReplicas(
ctx, keyB, tc.LookupRangeOrFatal(t, keyB),
roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(0)),
)
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)

// Without a partitioned RHS we'll end up always writing a tombstone here because
Expand Down Expand Up @@ -5158,10 +5199,15 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// Remove and re-add the RHS to create a new uninitialized replica at
// a higher replica ID. This will lead to a tombstone being written.
tc.RemoveVotersOrFatal(t, keyB, tc.Target(0))
// Unsuccessful because the RHS will not accept the learner snapshot
// and will be rolled back. Nevertheless it will have learned that it
// has been removed at the old replica ID.
_, err = tc.AddVoters(keyB, tc.Target(0))
// Unsuccessful because the RHS will not accept the learner snapshot and
// will be rolled back. Nevertheless it will have learned that it has been
// removed at the old replica ID. We don't use tc.AddVoters because that
// will retry until it runs out of time, since we're creating a
// retriable-looking situation here that will persist.
_, err = tc.Servers[0].DB().AdminChangeReplicas(
ctx, keyB, tc.LookupRangeOrFatal(t, keyB),
roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(0)),
)
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
// Ensure that the replica exists with the higher replica ID.
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rhsInfo.Desc.RangeID)
Expand Down Expand Up @@ -5219,7 +5265,13 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// Unsuccessfuly because the RHS will not accept the learner snapshot
// and will be rolled back. Nevertheless it will have learned that it
// has been removed at the old replica ID.
_, err = tc.AddVoters(keyB, tc.Target(0))
//
// Not using tc.AddVoters because we expect an error, but that error
// would be retried internally.
_, err = tc.Servers[0].DB().AdminChangeReplicas(
ctx, keyB, tc.LookupRangeOrFatal(t, keyB),
roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(0)),
)
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
// Ensure that there's no tombstone.
// The RHS on store 0 never should have heard about its original ID.
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2628,7 +2628,12 @@ func TestReplicaTombstone(t *testing.T) {
// this as a heartbeat. This demonstrates case (4) where a raft message
// to a newer replica ID (in this case a heartbeat) removes an initialized
// Replica.
_, err = tc.AddVoters(key, tc.Target(2))
//
// Don't use tc.AddVoter; this would retry internally as we're faking a
// a snapshot error here (and these are all considered retriable).
_, err = tc.Servers[0].DB().AdminChangeReplicas(
ctx, key, tc.LookupRangeOrFatal(t, key), roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(2)),
)
require.Regexp(t, "boom", err)
tombstone := waitForTombstone(t, store.Engine(), rangeID)
require.Equal(t, roachpb.ReplicaID(4), tombstone.NextReplicaID)
Expand All @@ -2644,7 +2649,9 @@ func TestReplicaTombstone(t *testing.T) {
// We could replica GC these replicas without too much extra work but they
// also should be rare. Note this is not new with learner replicas.
setMinHeartbeat(5)
_, err = tc.AddVoters(key, tc.Target(2))
_, err = tc.Servers[0].DB().AdminChangeReplicas(
ctx, key, tc.LookupRangeOrFatal(t, key), roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(2)),
)
require.Regexp(t, "boom", err)
// We will start out reading the old tombstone so keep retrying.
testutils.SucceedsSoon(t, func() error {
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3308,7 +3308,9 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
// second node).
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
_, err := tc.AddVoters(k, tc.Target(1))
_, err := tc.Servers[0].DB().AdminChangeReplicas(
ctx, k, tc.LookupRangeOrFatal(t, k), roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)),
)
return err
})

Expand Down Expand Up @@ -3343,7 +3345,9 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
// Now repeatedly re-add the learner on the rhs, so it has a
// different replicaID than the split trigger expects.
add := func() {
_, err := tc.AddVoters(kRHS, tc.Target(1))
_, err := tc.Servers[0].DB().AdminChangeReplicas(
ctx, kRHS, tc.LookupRangeOrFatal(t, kRHS), roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)),
)
// The "snapshot intersects existing range" error is expected if the store
// has not heard a raft message addressed to a later replica ID while the
// "was not found on" error is expected if the store has heard that it has
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type nonDeterministicFailure struct {

// The provided format string should be safe for reporting.
func makeNonDeterministicFailure(format string, args ...interface{}) error {
err := errors.Newf(format, args...)
err := errors.AssertionFailedWithDepthf(1, format, args...)
return &nonDeterministicFailure{
wrapped: err,
safeExpl: err.Error(),
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,16 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) {
scratchStartKey := tc.ScratchRange(t)
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
_, err := tc.AddVoters(scratchStartKey, tc.Target(1))
// NB: we don't use tc.AddVoters because that will auto-retry
// and the test expects to see the error that results on the
// first attempt.
desc, err := tc.LookupRange(scratchStartKey)
if err != nil {
return err
}
_, err = tc.Servers[0].DB().AdminChangeReplicas(
ctx, scratchStartKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)),
)
return err
})

Expand Down
Loading

0 comments on commit 3ffafc3

Please sign in to comment.