Skip to content

Commit

Permalink
Merge pull request cockroachdb#10012 from tschottdorf/evaluate-refactor
Browse files Browse the repository at this point in the history
storage: evaluate proposals without lock
  • Loading branch information
tbg authored Oct 25, 2016
2 parents 8ea9e31 + 41526e2 commit c214896
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 104 deletions.
152 changes: 97 additions & 55 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ type Replica struct {
// propose has been called, but which have not yet
// applied.
//
// TODO(tschottdorf): evaluate whether this should be a list/slice.
// The *ProposalData in the map are "owned" by it. Elements from the
// map must only be referenced while Replica.mu is held, except if the
// element is removed from the map first.
proposals map[storagebase.CmdIDKey]*ProposalData
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. May be 0 if the replica has
Expand Down Expand Up @@ -701,10 +703,8 @@ func (r *Replica) setReplicaIDLocked(replicaID roachpb.ReplicaID) error {
// If there was a previous replica, repropose its pending commands under
// this new incarnation.
if previousReplicaID != 0 {
// propose pending commands under new replicaID
if err := r.refreshPendingCmdsLocked(reasonReplicaIDChanged, 0); err != nil {
return err
}
// repropose all pending commands under new replicaID.
r.refreshProposalsLocked(0, reasonReplicaIDChanged)
}

return nil
Expand Down Expand Up @@ -1568,16 +1568,27 @@ func (r *Replica) assert5725(ba roachpb.BatchRequest) {
// - the timestamp cache is checked to determine if the command's affected keys
// were accessed with a timestamp exceeding that of the command; if so, the
// command's timestamp is incremented accordingly.
// - a ProposalData is created and inserted into the Replica's in-flight
// proposals map, a lease index is assigned to it, and it is submitted to Raft,
// returning a channel.
// - the command is evaluated, resulting in a ProposalData. As proposer-eval'ed
// KV isn't implemented yet, this means that the BatchRequest is added to
// the resulting ProposalData object. With proposer-evaluated KV, a
// WriteBatch containing the raw effects of the command's application is
// added along with some auxiliary data.
// - the ProposalData is inserted into the Replica's in-flight proposals map,
// a lease index is assigned to it, and it is submitted to Raft, returning
// a channel.
// - the result of the Raft proposal is read from the channel and the command
// registered with the timestamp cache, removed from the command queue, and its
// result (which could be an error) returned to the client.
// registered with the timestamp cache, removed from the command queue, and
// its result (which could be an error) returned to the client.
//
// Internally, multiple iterations of the above process are may take place due
// to the (rare) need to the Raft proposal failing retryably (usually due to
// proposal reordering).
//
// TODO(tschottdorf): take special care with "special" commands and their
// reorderings. For example, a batch of writes and a split could be in flight
// in parallel without overlap, but if the writes hit the RHS, something must
// prevent them from writing outside of the key range when they apply.
// Similarly, a command proposed under lease A must not apply under lease B.
func (r *Replica) addWriteCmd(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
Expand Down Expand Up @@ -1688,44 +1699,58 @@ func (r *Replica) tryAddWriteCmd(
}
}

// TODO(tschottdorf): for proposer-evaluated Raft, need to refactor so that
// this does not happen under Replica.mu.
func (r *Replica) evaluateProposalLocked(
// evaluateProposal generates ProposalData from the given request by evaluating
// it, returning both state which is held only on the proposer and that which
// is to be replicated through Raft. The return value is ready to be inserted
// into Replica's proposal map and subsequently passed to submitProposalLocked.
//
// Replica.mu must not be held.
//
// TODO(tschottdorf): with proposer-evaluated KV, a WriteBatch will be prepared
// in this method.
func (r *Replica) evaluateProposal(
ctx context.Context,
idKey storagebase.CmdIDKey,
replica roachpb.ReplicaDescriptor,
ba roachpb.BatchRequest,
) *ProposalData {
if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex {
r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex
}
if !ba.IsLeaseRequest() {
r.mu.lastAssignedLeaseIndex++
}
if log.V(4) {
log.Infof(ctx, "prepared command %x: maxLeaseIndex=%d leaseAppliedIndex=%d",
idKey, r.mu.lastAssignedLeaseIndex, r.mu.state.LeaseAppliedIndex)
}
// Note that we don't hold any locks at this point. This is important
// since evaluating a proposal is expensive (at least under proposer-
// evaluated KV).
var pd ProposalData
pd.RaftCommand = &storagebase.RaftCommand{
RangeID: r.RangeID,
OriginReplica: replica,
Cmd: ba,
MaxLeaseIndex: r.mu.lastAssignedLeaseIndex,
}
pd.ctx = ctx
pd.idKey = idKey
pd.done = make(chan proposalResult, 1)
return &pd
}

func (r *Replica) insertProposalLocked(pCmd *ProposalData) {
idKey := pCmd.idKey
if _, ok := r.mu.proposals[idKey]; ok {
func (r *Replica) insertProposalLocked(pd *ProposalData) {
// Assign a lease index. Note that we do this as late as possible
// to make sure (to the extent that we can) that we don't assign
// (=predict) the index differently from the order in which commands are
// proposed (and thus likely applied).
if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex {
r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex
}
if !pd.RaftCommand.Cmd.IsLeaseRequest() {
r.mu.lastAssignedLeaseIndex++
}
pd.RaftCommand.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex
if log.V(4) {
log.Infof(pd.ctx, "submitting proposal %x: maxLeaseIndex=%d",
pd.idKey, pd.RaftCommand.MaxLeaseIndex)
}

if _, ok := r.mu.proposals[pd.idKey]; ok {
ctx := r.AnnotateCtx(context.TODO())
log.Fatalf(ctx, "pending command already exists for %s", idKey)
log.Fatalf(ctx, "pending command already exists for %s", pd.idKey)
}
r.mu.proposals[idKey] = pCmd
r.mu.proposals[pd.idKey] = pd
}

func makeIDKey() storagebase.CmdIDKey {
Expand All @@ -1748,6 +1773,18 @@ func makeIDKey() storagebase.CmdIDKey {
func (r *Replica) propose(
ctx context.Context, ba roachpb.BatchRequest,
) (chan proposalResult, func() bool, error) {
r.mu.Lock()
if r.mu.destroyed != nil {
r.mu.Unlock()
return nil, nil, r.mu.destroyed
}
repDesc, err := r.getReplicaDescriptorLocked()
if err != nil {
r.mu.Unlock()
return nil, nil, err
}
r.mu.Unlock()

// submitProposalLocked calls withRaftGroupLocked which requires that
// raftMu is held. In order to maintain our lock ordering we need to lock
// Replica.raftMu here before locking Replica.mu.
Expand All @@ -1757,18 +1794,17 @@ func (r *Replica) propose(
// optimize for the common case where Replica.mu.internalRaftGroup is
// non-nil, but that doesn't currently seem worth it. Always locking raftMu
// has a tiny (~1%) performance hit for single-node block_writer testing.
//
// TODO(tschottdorf): holding raftMu during evaluation limits concurrency
// at the range level and is something we will eventually need to address.
// See #10084.
r.raftMu.Lock()
defer r.raftMu.Unlock()

pCmd := r.evaluateProposal(ctx, makeIDKey(), repDesc, ba)

r.mu.Lock()
defer r.mu.Unlock()
if r.mu.destroyed != nil {
return nil, nil, r.mu.destroyed
}
repDesc, err := r.getReplicaDescriptorLocked()
if err != nil {
return nil, nil, err
}
pCmd := r.evaluateProposalLocked(ctx, makeIDKey(), repDesc, ba)
r.insertProposalLocked(pCmd)

if err := r.submitProposalLocked(pCmd); err != nil {
Expand All @@ -1789,6 +1825,7 @@ func (r *Replica) propose(
// The replica lock must be held.
func (r *Replica) submitProposalLocked(p *ProposalData) error {
p.proposedAtTicks = r.mu.ticks

if r.mu.submitProposalFn != nil {
return r.mu.submitProposalFn(p)
}
Expand Down Expand Up @@ -1938,7 +1975,7 @@ func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) error {
}

// handleRaftReadyLocked is the same as handleRaftReady but requires that the
// replica be locked for raft processing via r.raftLock.
// replica's raftMu be held.
func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
ctx := r.AnnotateCtx(context.TODO())
var hasReady bool
Expand Down Expand Up @@ -2139,11 +2176,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
}
if refreshReason != noReason {
r.mu.Lock()
err := r.refreshPendingCmdsLocked(refreshReason, 0)
r.refreshProposalsLocked(0, refreshReason)
r.mu.Unlock()
if err != nil {
return err
}
}

// TODO(bdarnell): need to check replica id and not Advance if it
Expand All @@ -2163,6 +2197,7 @@ func (r *Replica) tick() (bool, error) {
return r.tickRaftMuLocked()
}

// tickRaftMuLocked requires that raftMu is held, but not replicaMu.
func (r *Replica) tickRaftMuLocked() (bool, error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -2209,10 +2244,9 @@ func (r *Replica) tickRaftMuLocked() (bool, error) {
// RaftElectionTimeoutTicks to refreshPendingCmdsLocked means that commands
// will be refreshed when they have been pending for 1 to 2 election
// cycles.
if err := r.refreshPendingCmdsLocked(
reasonTicks, r.store.cfg.RaftElectionTimeoutTicks); err != nil {
return true, err
}
r.refreshProposalsLocked(
r.store.cfg.RaftElectionTimeoutTicks, reasonTicks,
)
}
return true, nil
}
Expand Down Expand Up @@ -2412,11 +2446,16 @@ const (
reasonTicks
)

func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDelta int) error {
if len(r.mu.proposals) == 0 {
return nil
}

// refreshProposalsLocked goes through the pending proposals, notifying
// proposers whose proposals need to be retried, and resubmitting proposals
// which were likely dropped (but may still apply at a legal Lease index).
// mu must be held.
//
// refreshAtDelta specifies how old (in ticks) a command must be for it to be
// inspected; usually this is called with zero (affect everything) or the
// number of ticks of an election timeout (affect only proposals that have had
// ample time to apply but didn't).
func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftReason) {
// Note that we can't use the commit index here (which is typically a
// little ahead), because a pending command is removed only as it applies.
// Thus we'd risk reproposing a command that has been committed but not yet
Expand Down Expand Up @@ -2447,7 +2486,7 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe
if log.V(1) && (numShouldRetry > 0 || len(reproposals) > 0) {
ctx := r.AnnotateCtx(context.TODO())
log.Infof(ctx,
"pending commands: sent %d back to client, reproposing %d (at %d.%d); %s",
"pending commands: sent %d back to client, reproposing %d (at %d.%d)k %s",
numShouldRetry, len(reproposals), r.mu.state.RaftAppliedIndex,
r.mu.state.LeaseAppliedIndex, reason)
}
Expand All @@ -2457,14 +2496,17 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe
// the "correct" index). For reproposals, it's generally pretty unlikely
// that they can make it in the right place. Reproposing in order is
// definitely required, however.
//
// TODO(tschottdorf): evaluate whether `proposals` should be a list/slice.
sort.Sort(reproposals)
for _, p := range reproposals {
log.Eventf(p.ctx, "reproposing command %x; %s", p.idKey, reason)
log.Eventf(p.ctx, "re-submitting command %x to Raft: %s", p.idKey, reason)
if err := r.submitProposalLocked(p); err != nil {
return err
delete(r.mu.proposals, p.idKey)
p.done <- proposalResult{Err: roachpb.NewError(err)}
close(p.done)
}
}
return nil
}

func (r *Replica) getReplicaDescriptorByIDLocked(
Expand Down
Loading

0 comments on commit c214896

Please sign in to comment.