Skip to content

Commit

Permalink
kvserver: apply admission control in handleRaftReady
Browse files Browse the repository at this point in the history
Wait in handleRaftReady, which isn't great either, but at least allows
us to target individual ranges (mod Go scheduler concurrency).

Release note: None
  • Loading branch information
tbg committed May 16, 2022
1 parent 7bcbd87 commit cfc7c74
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
32 changes: 31 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
Expand Down Expand Up @@ -654,6 +655,35 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
stats := handleRaftReadyStats{
tBegin: timeutil.Now(),
}

var ba roachpb.BatchRequest
ba.AdmissionHeader = roachpb.AdmissionHeader{
Priority: int32(admissionpb.NormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_ROOT_KV,
NoMemoryReservedAtSource: true,
}
// Need to make sure ba.IsWrite() == true.
ba.Add(roachpb.NewDelete(roachpb.Key("foo")))
if !ba.IsWrite() {
panic("not a write")
}
ba.Replica.StoreID = r.store.StoreID()

admissionHandle, err := func() (interface{}, error) {
c := r.store.cfg.KVAdmissionController
if c == nil || !enableRaftFollowerAdmissionControl || r.RangeID < 45 {
return nil, nil
}
return c.AdmitKVWork(ctx, roachpb.SystemTenantID, &ba)
}()
if err != nil {
panic(err)
}
if admissionHandle != nil {
defer r.store.cfg.KVAdmissionController.AdmittedKVWorkDone(admissionHandle)
}

if inSnap.Desc != nil {
stats.snap.offered = true
}
Expand All @@ -666,7 +696,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
raftLogSize := r.mu.raftLogSize
leaderID := r.mu.leaderID
lastLeaderID := leaderID
err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
err = r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup)
if err != nil {
return false, err
Expand Down
35 changes: 9 additions & 26 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ import (
var enableRaftFollowerAdmissionControl = envutil.EnvOrDefaultBool("COCKROACH_EXPERIMENTAL_FOLLOWER_ADMISSION_CONTROL_ENABLED", true)

type raftRequestInfo struct {
req *kvserverpb.RaftMessageRequest
size int64 // size of req in bytes
admCtrlHandle interface{} // for AdmittedKVWorkDone
respStream RaftMessageResponseStream
req *kvserverpb.RaftMessageRequest
size int64 // size of req in bytes
respStream RaftMessageResponseStream
}

type raftReceiveQueue struct {
Expand Down Expand Up @@ -102,7 +101,7 @@ func (q *raftReceiveQueue) Recycle(processed []raftRequestInfo) {
}

func (q *raftReceiveQueue) Append(
req *kvserverpb.RaftMessageRequest, admCtrlHandle interface{}, s RaftMessageResponseStream,
req *kvserverpb.RaftMessageRequest, s RaftMessageResponseStream,
) (shouldQueue bool, size int64, appended bool) {
size = int64(req.Size())
q.mu.Lock()
Expand All @@ -114,10 +113,9 @@ func (q *raftReceiveQueue) Append(
return false, size, false
}
q.mu.infos = append(q.mu.infos, raftRequestInfo{
req: req,
admCtrlHandle: admCtrlHandle,
respStream: s,
size: size,
req: req,
respStream: s,
size: size,
})
// The operation that enqueues the first message will
// be put in charge of triggering a drain of the queue.
Expand Down Expand Up @@ -331,19 +329,8 @@ func (s *Store) HandleRaftUncoalescedRequest(
}
}()

admissionHandle, err := func() (interface{}, error) {
c := s.cfg.KVAdmissionController
if c == nil || !enableRaftFollowerAdmissionControl || req.Message.Type != raftpb.MsgApp {
return nil, nil
}
return c.AdmitKVWork(ctx, roachpb.SystemTenantID, &ba)
}()
if err != nil {
return 0, false
}

q, _ := s.raftRecvQueues.LoadOrCreate(req.RangeID)
enqueue, size, appended := q.Append(req, admissionHandle, respStream)
enqueue, size, appended := q.Append(req, respStream)
if !appended {
return 0, false
}
Expand Down Expand Up @@ -618,11 +605,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID
info := &infos[i]
if pErr := s.withReplicaForRequest(
ctx, info.req, func(_ context.Context, r *Replica) *roachpb.Error {
pErr := s.processRaftRequestWithReplica(r.raftCtx, r, info.req)
if info.admCtrlHandle != nil {
s.cfg.KVAdmissionController.AdmittedKVWorkDone(info.admCtrlHandle)
}
return pErr
return s.processRaftRequestWithReplica(r.raftCtx, r, info.req)
},
); pErr != nil {
hadError = true
Expand Down

0 comments on commit cfc7c74

Please sign in to comment.