Skip to content

Commit

Permalink
Merge #36380
Browse files Browse the repository at this point in the history
36380: roachpb: add canBackpressure request flag r=nvanbenschoten a=nvanbenschoten

This commit adds a new request flag that was discussed in #36363.
Doing so allows us to remove the `backpressurableReqMethods` set.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Apr 2, 2019
2 parents b1cfbef + bfebf4c commit 2851c7d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 44 deletions.
67 changes: 39 additions & 28 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,23 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error {
}

const (
isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder
isRead // read-only cmds don't go through raft, but may run on lease holder
isWrite // write cmds go through raft and must be proposed on lease holder
isTxn // txn commands may be part of a transaction
isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution
isRange // range commands may span multiple keys
isReverse // reverse commands traverse ranges in descending direction
isAlone // requests which must be alone in a batch
isPrefix // requests which should be grouped with the next request in a batch
isUnsplittable // range command that must not be split during sending
// Requests for acquiring a lease skip the (proposal-time) check that the
// proposing replica has a valid lease.
skipLeaseCheck
consultsTSCache // mutating commands which write data at a timestamp
updatesReadTSCache // commands which update the read timestamp cache
updatesWriteTSCache // commands which update the write timestamp cache
updatesTSCacheOnError // commands which make read data available on errors
needsRefresh // commands which require refreshes to avoid serializable retries
isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder
isRead // read-only cmds don't go through raft, but may run on lease holder
isWrite // write cmds go through raft and must be proposed on lease holder
isTxn // txn commands may be part of a transaction
isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution
isRange // range commands may span multiple keys
isReverse // reverse commands traverse ranges in descending direction
isAlone // requests which must be alone in a batch
isPrefix // requests which should be grouped with the next request in a batch
isUnsplittable // range command that must not be split during sending
skipLeaseCheck // commands which skip the check that the evaluting replica has a valid lease
consultsTSCache // mutating commands which write data at a timestamp
updatesReadTSCache // commands which update the read timestamp cache
updatesWriteTSCache // commands which update the write timestamp cache
updatesTSCacheOnErr // commands which make read data available on errors
needsRefresh // commands which require refreshes to avoid serializable retries
canBackpressure // commands which deserve backpressure when a Range grows too large
)

// IsReadOnly returns true iff the request is read-only.
Expand Down Expand Up @@ -157,7 +156,7 @@ func UpdatesWriteTimestampCache(args Request) bool {
// update the timestamp cache even on error, as in some cases the data
// which was read is returned (e.g. ConditionalPut ConditionFailedError).
func UpdatesTimestampCacheOnError(args Request) bool {
return (args.flags() & updatesTSCacheOnError) != 0
return (args.flags() & updatesTSCacheOnErr) != 0
}

// NeedsRefresh returns whether the command must be refreshed in
Expand All @@ -166,6 +165,12 @@ func NeedsRefresh(args Request) bool {
return (args.flags() & needsRefresh) != 0
}

// CanBackpressure returns whether the command can be backpressured
// when waiting for a Range to split after it has grown too large.
func CanBackpressure(args Request) bool {
return (args.flags() & canBackpressure) != 0
}

// Request is an interface for RPC requests.
type Request interface {
protoutil.Message
Expand Down Expand Up @@ -948,7 +953,9 @@ func NewReverseScan(key, endKey Key) Request {
}

func (*GetRequest) flags() int { return isRead | isTxn | updatesReadTSCache | needsRefresh }
func (*PutRequest) flags() int { return isWrite | isTxn | isTxnWrite | consultsTSCache }
func (*PutRequest) flags() int {
return isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure
}

// ConditionalPut effectively reads and may not write, so must update
// the timestamp cache. Note that on ConditionFailedErrors
Expand All @@ -957,7 +964,7 @@ func (*PutRequest) flags() int { return isWrite | isTxn | isTxnWrite | consultsT
// errors, they return an error immediately instead of continuing a
// serializable transaction to be retried at end transaction.
func (*ConditionalPutRequest) flags() int {
return isRead | isWrite | isTxn | isTxnWrite | updatesReadTSCache | updatesTSCacheOnError | consultsTSCache
return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | updatesReadTSCache | updatesTSCacheOnErr | canBackpressure
}

// InitPut, like ConditionalPut, effectively reads and may not write.
Expand All @@ -967,7 +974,7 @@ func (*ConditionalPutRequest) flags() int {
// immediately instead of continuing a serializable transaction to be
// retried at end transaction.
func (*InitPutRequest) flags() int {
return isRead | isWrite | isTxn | isTxnWrite | updatesReadTSCache | updatesTSCacheOnError | consultsTSCache
return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | updatesReadTSCache | updatesTSCacheOnErr | canBackpressure
}

// Increment reads the existing value, but always leaves an intent so
Expand All @@ -976,10 +983,12 @@ func (*InitPutRequest) flags() int {
// error immediately instead of continuing a serializable transaction
// to be retried at end transaction.
func (*IncrementRequest) flags() int {
return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache
return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure
}

func (*DeleteRequest) flags() int { return isWrite | isTxn | isTxnWrite | consultsTSCache }
func (*DeleteRequest) flags() int {
return isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure
}
func (drr *DeleteRangeRequest) flags() int {
// DeleteRangeRequest has different properties if the "inline" flag is set.
// This flag indicates that the request is deleting inline MVCC values,
Expand All @@ -1002,7 +1011,7 @@ func (drr *DeleteRangeRequest) flags() int {
// intents or tombstones for keys which don't yet exist. By updating
// the write timestamp cache, it forces subsequent writes to get a
// write-too-old error and avoids the phantom delete anomaly.
return isWrite | isTxn | isTxnWrite | isRange | updatesWriteTSCache | needsRefresh | consultsTSCache
return isWrite | isTxn | isTxnWrite | isRange | consultsTSCache | updatesWriteTSCache | needsRefresh | canBackpressure
}

// Note that ClearRange commands cannot be part of a transaction as
Expand Down Expand Up @@ -1041,7 +1050,7 @@ func (*QueryIntentRequest) flags() int { return isRead | isPrefix | updat
func (*ResolveIntentRequest) flags() int { return isWrite }
func (*ResolveIntentRangeRequest) flags() int { return isWrite | isRange }
func (*TruncateLogRequest) flags() int { return isWrite }
func (*MergeRequest) flags() int { return isWrite }
func (*MergeRequest) flags() int { return isWrite | canBackpressure }
func (*RequestLeaseRequest) flags() int { return isWrite | isAlone | skipLeaseCheck }

// LeaseInfoRequest is usually executed in an INCONSISTENT batch, which has the
Expand All @@ -1068,8 +1077,10 @@ func (*CheckConsistencyRequest) flags() int { return isAdmin | isRange }
func (*WriteBatchRequest) flags() int { return isWrite | isRange }
func (*ExportRequest) flags() int { return isRead | isRange | updatesReadTSCache }
func (*ImportRequest) flags() int { return isAdmin | isAlone }
func (*AdminScatterRequest) flags() int { return isAdmin | isAlone | isRange }
func (*AddSSTableRequest) flags() int { return isWrite | isAlone | isRange | isUnsplittable }
func (*AdminScatterRequest) flags() int { return isAdmin | isRange | isAlone }
func (*AddSSTableRequest) flags() int {
return isWrite | isRange | isAlone | isUnsplittable | canBackpressure
}

// RefreshRequest and RefreshRangeRequest both determine which timestamp cache
// they update based on their Write parameter.
Expand Down
17 changes: 1 addition & 16 deletions pkg/storage/replica_backpressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)
Expand All @@ -44,20 +43,6 @@ var backpressureRangeSizeMultiplier = settings.RegisterValidatedFloatSetting(
},
)

// backpressurableReqMethods is the set of all request methods that can
// be backpressured. If a batch contains any method in this set, it may
// be backpressured.
var backpressurableReqMethods = util.MakeFastIntSet(
int(roachpb.Put),
int(roachpb.InitPut),
int(roachpb.ConditionalPut),
int(roachpb.Merge),
int(roachpb.Increment),
int(roachpb.Delete),
int(roachpb.DeleteRange),
int(roachpb.AddSSTable),
)

// backpressurableSpans contains spans of keys where write backpressuring
// is permitted. Writes to any keys within these spans may cause a batch
// to be backpressured.
Expand All @@ -78,7 +63,7 @@ func canBackpressureBatch(ba roachpb.BatchRequest) bool {
// method that is within a "backpressurable" key span.
for _, ru := range ba.Requests {
req := ru.GetInner()
if !backpressurableReqMethods.Contains(int(req.Method())) {
if !roachpb.CanBackpressure(req) {
continue
}

Expand Down

0 comments on commit 2851c7d

Please sign in to comment.