Skip to content

Commit

Permalink
storage: evaluate proposals without lock
Browse files Browse the repository at this point in the history
The main objective of this change is to (later) enable proposer-evaluated KV to
compute the `WriteBatch` when evaluating a proposal (i.e. when turning
a `BatchRequest` into a `*ProposalData`). The previous code assumes making
a proposal is cheap, and thus it is done under a large critical sections.

MaxLeaseIndex is assigned at the latest possible moment before submitting to
Raft. Multiple inbound requests can pass the command queue without interacting,
and it's important to assign the Lease index only right before when the
proposal is submitted to Raft.

Note that the command queue currently doesn't cover all the data accessed by
Raft commands (cockroachdb#10084) and that a discussion of how precisely command
evaluation will work is still being discussed (cockroachdb#6290). Nevertheless, this
change should be safe and a step in the right direction.
  • Loading branch information
tbg committed Oct 25, 2016
1 parent 8ea9e31 commit 41526e2
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 104 deletions.
152 changes: 97 additions & 55 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ type Replica struct {
// propose has been called, but which have not yet
// applied.
//
// TODO(tschottdorf): evaluate whether this should be a list/slice.
// The *ProposalData in the map are "owned" by it. Elements from the
// map must only be referenced while Replica.mu is held, except if the
// element is removed from the map first.
proposals map[storagebase.CmdIDKey]*ProposalData
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. May be 0 if the replica has
Expand Down Expand Up @@ -701,10 +703,8 @@ func (r *Replica) setReplicaIDLocked(replicaID roachpb.ReplicaID) error {
// If there was a previous replica, repropose its pending commands under
// this new incarnation.
if previousReplicaID != 0 {
// propose pending commands under new replicaID
if err := r.refreshPendingCmdsLocked(reasonReplicaIDChanged, 0); err != nil {
return err
}
// repropose all pending commands under new replicaID.
r.refreshProposalsLocked(0, reasonReplicaIDChanged)
}

return nil
Expand Down Expand Up @@ -1568,16 +1568,27 @@ func (r *Replica) assert5725(ba roachpb.BatchRequest) {
// - the timestamp cache is checked to determine if the command's affected keys
// were accessed with a timestamp exceeding that of the command; if so, the
// command's timestamp is incremented accordingly.
// - a ProposalData is created and inserted into the Replica's in-flight
// proposals map, a lease index is assigned to it, and it is submitted to Raft,
// returning a channel.
// - the command is evaluated, resulting in a ProposalData. As proposer-eval'ed
// KV isn't implemented yet, this means that the BatchRequest is added to
// the resulting ProposalData object. With proposer-evaluated KV, a
// WriteBatch containing the raw effects of the command's application is
// added along with some auxiliary data.
// - the ProposalData is inserted into the Replica's in-flight proposals map,
// a lease index is assigned to it, and it is submitted to Raft, returning
// a channel.
// - the result of the Raft proposal is read from the channel and the command
// registered with the timestamp cache, removed from the command queue, and its
// result (which could be an error) returned to the client.
// registered with the timestamp cache, removed from the command queue, and
// its result (which could be an error) returned to the client.
//
// Internally, multiple iterations of the above process are may take place due
// to the (rare) need to the Raft proposal failing retryably (usually due to
// proposal reordering).
//
// TODO(tschottdorf): take special care with "special" commands and their
// reorderings. For example, a batch of writes and a split could be in flight
// in parallel without overlap, but if the writes hit the RHS, something must
// prevent them from writing outside of the key range when they apply.
// Similarly, a command proposed under lease A must not apply under lease B.
func (r *Replica) addWriteCmd(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
Expand Down Expand Up @@ -1688,44 +1699,58 @@ func (r *Replica) tryAddWriteCmd(
}
}

// TODO(tschottdorf): for proposer-evaluated Raft, need to refactor so that
// this does not happen under Replica.mu.
func (r *Replica) evaluateProposalLocked(
// evaluateProposal generates ProposalData from the given request by evaluating
// it, returning both state which is held only on the proposer and that which
// is to be replicated through Raft. The return value is ready to be inserted
// into Replica's proposal map and subsequently passed to submitProposalLocked.
//
// Replica.mu must not be held.
//
// TODO(tschottdorf): with proposer-evaluated KV, a WriteBatch will be prepared
// in this method.
func (r *Replica) evaluateProposal(
ctx context.Context,
idKey storagebase.CmdIDKey,
replica roachpb.ReplicaDescriptor,
ba roachpb.BatchRequest,
) *ProposalData {
if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex {
r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex
}
if !ba.IsLeaseRequest() {
r.mu.lastAssignedLeaseIndex++
}
if log.V(4) {
log.Infof(ctx, "prepared command %x: maxLeaseIndex=%d leaseAppliedIndex=%d",
idKey, r.mu.lastAssignedLeaseIndex, r.mu.state.LeaseAppliedIndex)
}
// Note that we don't hold any locks at this point. This is important
// since evaluating a proposal is expensive (at least under proposer-
// evaluated KV).
var pd ProposalData
pd.RaftCommand = &storagebase.RaftCommand{
RangeID: r.RangeID,
OriginReplica: replica,
Cmd: ba,
MaxLeaseIndex: r.mu.lastAssignedLeaseIndex,
}
pd.ctx = ctx
pd.idKey = idKey
pd.done = make(chan proposalResult, 1)
return &pd
}

func (r *Replica) insertProposalLocked(pCmd *ProposalData) {
idKey := pCmd.idKey
if _, ok := r.mu.proposals[idKey]; ok {
func (r *Replica) insertProposalLocked(pd *ProposalData) {
// Assign a lease index. Note that we do this as late as possible
// to make sure (to the extent that we can) that we don't assign
// (=predict) the index differently from the order in which commands are
// proposed (and thus likely applied).
if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex {
r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex
}
if !pd.RaftCommand.Cmd.IsLeaseRequest() {
r.mu.lastAssignedLeaseIndex++
}
pd.RaftCommand.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex
if log.V(4) {
log.Infof(pd.ctx, "submitting proposal %x: maxLeaseIndex=%d",
pd.idKey, pd.RaftCommand.MaxLeaseIndex)
}

if _, ok := r.mu.proposals[pd.idKey]; ok {
ctx := r.AnnotateCtx(context.TODO())
log.Fatalf(ctx, "pending command already exists for %s", idKey)
log.Fatalf(ctx, "pending command already exists for %s", pd.idKey)
}
r.mu.proposals[idKey] = pCmd
r.mu.proposals[pd.idKey] = pd
}

func makeIDKey() storagebase.CmdIDKey {
Expand All @@ -1748,6 +1773,18 @@ func makeIDKey() storagebase.CmdIDKey {
func (r *Replica) propose(
ctx context.Context, ba roachpb.BatchRequest,
) (chan proposalResult, func() bool, error) {
r.mu.Lock()
if r.mu.destroyed != nil {
r.mu.Unlock()
return nil, nil, r.mu.destroyed
}
repDesc, err := r.getReplicaDescriptorLocked()
if err != nil {
r.mu.Unlock()
return nil, nil, err
}
r.mu.Unlock()

// submitProposalLocked calls withRaftGroupLocked which requires that
// raftMu is held. In order to maintain our lock ordering we need to lock
// Replica.raftMu here before locking Replica.mu.
Expand All @@ -1757,18 +1794,17 @@ func (r *Replica) propose(
// optimize for the common case where Replica.mu.internalRaftGroup is
// non-nil, but that doesn't currently seem worth it. Always locking raftMu
// has a tiny (~1%) performance hit for single-node block_writer testing.
//
// TODO(tschottdorf): holding raftMu during evaluation limits concurrency
// at the range level and is something we will eventually need to address.
// See #10084.
r.raftMu.Lock()
defer r.raftMu.Unlock()

pCmd := r.evaluateProposal(ctx, makeIDKey(), repDesc, ba)

r.mu.Lock()
defer r.mu.Unlock()
if r.mu.destroyed != nil {
return nil, nil, r.mu.destroyed
}
repDesc, err := r.getReplicaDescriptorLocked()
if err != nil {
return nil, nil, err
}
pCmd := r.evaluateProposalLocked(ctx, makeIDKey(), repDesc, ba)
r.insertProposalLocked(pCmd)

if err := r.submitProposalLocked(pCmd); err != nil {
Expand All @@ -1789,6 +1825,7 @@ func (r *Replica) propose(
// The replica lock must be held.
func (r *Replica) submitProposalLocked(p *ProposalData) error {
p.proposedAtTicks = r.mu.ticks

if r.mu.submitProposalFn != nil {
return r.mu.submitProposalFn(p)
}
Expand Down Expand Up @@ -1938,7 +1975,7 @@ func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) error {
}

// handleRaftReadyLocked is the same as handleRaftReady but requires that the
// replica be locked for raft processing via r.raftLock.
// replica's raftMu be held.
func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
ctx := r.AnnotateCtx(context.TODO())
var hasReady bool
Expand Down Expand Up @@ -2139,11 +2176,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
}
if refreshReason != noReason {
r.mu.Lock()
err := r.refreshPendingCmdsLocked(refreshReason, 0)
r.refreshProposalsLocked(0, refreshReason)
r.mu.Unlock()
if err != nil {
return err
}
}

// TODO(bdarnell): need to check replica id and not Advance if it
Expand All @@ -2163,6 +2197,7 @@ func (r *Replica) tick() (bool, error) {
return r.tickRaftMuLocked()
}

// tickRaftMuLocked requires that raftMu is held, but not replicaMu.
func (r *Replica) tickRaftMuLocked() (bool, error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -2209,10 +2244,9 @@ func (r *Replica) tickRaftMuLocked() (bool, error) {
// RaftElectionTimeoutTicks to refreshPendingCmdsLocked means that commands
// will be refreshed when they have been pending for 1 to 2 election
// cycles.
if err := r.refreshPendingCmdsLocked(
reasonTicks, r.store.cfg.RaftElectionTimeoutTicks); err != nil {
return true, err
}
r.refreshProposalsLocked(
r.store.cfg.RaftElectionTimeoutTicks, reasonTicks,
)
}
return true, nil
}
Expand Down Expand Up @@ -2412,11 +2446,16 @@ const (
reasonTicks
)

func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDelta int) error {
if len(r.mu.proposals) == 0 {
return nil
}

// refreshProposalsLocked goes through the pending proposals, notifying
// proposers whose proposals need to be retried, and resubmitting proposals
// which were likely dropped (but may still apply at a legal Lease index).
// mu must be held.
//
// refreshAtDelta specifies how old (in ticks) a command must be for it to be
// inspected; usually this is called with zero (affect everything) or the
// number of ticks of an election timeout (affect only proposals that have had
// ample time to apply but didn't).
func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftReason) {
// Note that we can't use the commit index here (which is typically a
// little ahead), because a pending command is removed only as it applies.
// Thus we'd risk reproposing a command that has been committed but not yet
Expand Down Expand Up @@ -2447,7 +2486,7 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe
if log.V(1) && (numShouldRetry > 0 || len(reproposals) > 0) {
ctx := r.AnnotateCtx(context.TODO())
log.Infof(ctx,
"pending commands: sent %d back to client, reproposing %d (at %d.%d); %s",
"pending commands: sent %d back to client, reproposing %d (at %d.%d)k %s",
numShouldRetry, len(reproposals), r.mu.state.RaftAppliedIndex,
r.mu.state.LeaseAppliedIndex, reason)
}
Expand All @@ -2457,14 +2496,17 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe
// the "correct" index). For reproposals, it's generally pretty unlikely
// that they can make it in the right place. Reproposing in order is
// definitely required, however.
//
// TODO(tschottdorf): evaluate whether `proposals` should be a list/slice.
sort.Sort(reproposals)
for _, p := range reproposals {
log.Eventf(p.ctx, "reproposing command %x; %s", p.idKey, reason)
log.Eventf(p.ctx, "re-submitting command %x to Raft: %s", p.idKey, reason)
if err := r.submitProposalLocked(p); err != nil {
return err
delete(r.mu.proposals, p.idKey)
p.done <- proposalResult{Err: roachpb.NewError(err)}
close(p.done)
}
}
return nil
}

func (r *Replica) getReplicaDescriptorByIDLocked(
Expand Down
Loading

0 comments on commit 41526e2

Please sign in to comment.