diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 2ae22bc3afb1..c646580cb38d 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -87,24 +87,23 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error { } const ( - isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder - isRead // read-only cmds don't go through raft, but may run on lease holder - isWrite // write cmds go through raft and must be proposed on lease holder - isTxn // txn commands may be part of a transaction - isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution - isRange // range commands may span multiple keys - isReverse // reverse commands traverse ranges in descending direction - isAlone // requests which must be alone in a batch - isPrefix // requests which should be grouped with the next request in a batch - isUnsplittable // range command that must not be split during sending - // Requests for acquiring a lease skip the (proposal-time) check that the - // proposing replica has a valid lease. - skipLeaseCheck - consultsTSCache // mutating commands which write data at a timestamp - updatesReadTSCache // commands which update the read timestamp cache - updatesWriteTSCache // commands which update the write timestamp cache - updatesTSCacheOnError // commands which make read data available on errors - needsRefresh // commands which require refreshes to avoid serializable retries + isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder + isRead // read-only cmds don't go through raft, but may run on lease holder + isWrite // write cmds go through raft and must be proposed on lease holder + isTxn // txn commands may be part of a transaction + isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution + isRange // range commands may span multiple keys + isReverse // reverse commands traverse ranges in descending direction + isAlone // requests which must be alone in a batch + isPrefix // requests which should be grouped with the next request in a batch + isUnsplittable // range command that must not be split during sending + skipLeaseCheck // commands which skip the check that the evaluting replica has a valid lease + consultsTSCache // mutating commands which write data at a timestamp + updatesReadTSCache // commands which update the read timestamp cache + updatesWriteTSCache // commands which update the write timestamp cache + updatesTSCacheOnErr // commands which make read data available on errors + needsRefresh // commands which require refreshes to avoid serializable retries + canBackpressure // commands which deserve backpressure when a Range grows too large ) // IsReadOnly returns true iff the request is read-only. @@ -157,7 +156,7 @@ func UpdatesWriteTimestampCache(args Request) bool { // update the timestamp cache even on error, as in some cases the data // which was read is returned (e.g. ConditionalPut ConditionFailedError). func UpdatesTimestampCacheOnError(args Request) bool { - return (args.flags() & updatesTSCacheOnError) != 0 + return (args.flags() & updatesTSCacheOnErr) != 0 } // NeedsRefresh returns whether the command must be refreshed in @@ -166,6 +165,12 @@ func NeedsRefresh(args Request) bool { return (args.flags() & needsRefresh) != 0 } +// CanBackpressure returns whether the command can be backpressured +// when waiting for a Range to split after it has grown too large. +func CanBackpressure(args Request) bool { + return (args.flags() & canBackpressure) != 0 +} + // Request is an interface for RPC requests. type Request interface { protoutil.Message @@ -948,7 +953,9 @@ func NewReverseScan(key, endKey Key) Request { } func (*GetRequest) flags() int { return isRead | isTxn | updatesReadTSCache | needsRefresh } -func (*PutRequest) flags() int { return isWrite | isTxn | isTxnWrite | consultsTSCache } +func (*PutRequest) flags() int { + return isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure +} // ConditionalPut effectively reads and may not write, so must update // the timestamp cache. Note that on ConditionFailedErrors @@ -957,7 +964,7 @@ func (*PutRequest) flags() int { return isWrite | isTxn | isTxnWrite | consultsT // errors, they return an error immediately instead of continuing a // serializable transaction to be retried at end transaction. func (*ConditionalPutRequest) flags() int { - return isRead | isWrite | isTxn | isTxnWrite | updatesReadTSCache | updatesTSCacheOnError | consultsTSCache + return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | updatesReadTSCache | updatesTSCacheOnErr | canBackpressure } // InitPut, like ConditionalPut, effectively reads and may not write. @@ -967,7 +974,7 @@ func (*ConditionalPutRequest) flags() int { // immediately instead of continuing a serializable transaction to be // retried at end transaction. func (*InitPutRequest) flags() int { - return isRead | isWrite | isTxn | isTxnWrite | updatesReadTSCache | updatesTSCacheOnError | consultsTSCache + return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | updatesReadTSCache | updatesTSCacheOnErr | canBackpressure } // Increment reads the existing value, but always leaves an intent so @@ -976,10 +983,12 @@ func (*InitPutRequest) flags() int { // error immediately instead of continuing a serializable transaction // to be retried at end transaction. func (*IncrementRequest) flags() int { - return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache + return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure } -func (*DeleteRequest) flags() int { return isWrite | isTxn | isTxnWrite | consultsTSCache } +func (*DeleteRequest) flags() int { + return isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure +} func (drr *DeleteRangeRequest) flags() int { // DeleteRangeRequest has different properties if the "inline" flag is set. // This flag indicates that the request is deleting inline MVCC values, @@ -1002,7 +1011,7 @@ func (drr *DeleteRangeRequest) flags() int { // intents or tombstones for keys which don't yet exist. By updating // the write timestamp cache, it forces subsequent writes to get a // write-too-old error and avoids the phantom delete anomaly. - return isWrite | isTxn | isTxnWrite | isRange | updatesWriteTSCache | needsRefresh | consultsTSCache + return isWrite | isTxn | isTxnWrite | isRange | consultsTSCache | updatesWriteTSCache | needsRefresh | canBackpressure } // Note that ClearRange commands cannot be part of a transaction as @@ -1041,7 +1050,7 @@ func (*QueryIntentRequest) flags() int { return isRead | isPrefix | updat func (*ResolveIntentRequest) flags() int { return isWrite } func (*ResolveIntentRangeRequest) flags() int { return isWrite | isRange } func (*TruncateLogRequest) flags() int { return isWrite } -func (*MergeRequest) flags() int { return isWrite } +func (*MergeRequest) flags() int { return isWrite | canBackpressure } func (*RequestLeaseRequest) flags() int { return isWrite | isAlone | skipLeaseCheck } // LeaseInfoRequest is usually executed in an INCONSISTENT batch, which has the @@ -1068,8 +1077,10 @@ func (*CheckConsistencyRequest) flags() int { return isAdmin | isRange } func (*WriteBatchRequest) flags() int { return isWrite | isRange } func (*ExportRequest) flags() int { return isRead | isRange | updatesReadTSCache } func (*ImportRequest) flags() int { return isAdmin | isAlone } -func (*AdminScatterRequest) flags() int { return isAdmin | isAlone | isRange } -func (*AddSSTableRequest) flags() int { return isWrite | isAlone | isRange | isUnsplittable } +func (*AdminScatterRequest) flags() int { return isAdmin | isRange | isAlone } +func (*AddSSTableRequest) flags() int { + return isWrite | isRange | isAlone | isUnsplittable | canBackpressure +} // RefreshRequest and RefreshRangeRequest both determine which timestamp cache // they update based on their Write parameter. diff --git a/pkg/storage/replica_backpressure.go b/pkg/storage/replica_backpressure.go index cc79e3f7963d..2e44e5ce49f0 100644 --- a/pkg/storage/replica_backpressure.go +++ b/pkg/storage/replica_backpressure.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) @@ -44,20 +43,6 @@ var backpressureRangeSizeMultiplier = settings.RegisterValidatedFloatSetting( }, ) -// backpressurableReqMethods is the set of all request methods that can -// be backpressured. If a batch contains any method in this set, it may -// be backpressured. -var backpressurableReqMethods = util.MakeFastIntSet( - int(roachpb.Put), - int(roachpb.InitPut), - int(roachpb.ConditionalPut), - int(roachpb.Merge), - int(roachpb.Increment), - int(roachpb.Delete), - int(roachpb.DeleteRange), - int(roachpb.AddSSTable), -) - // backpressurableSpans contains spans of keys where write backpressuring // is permitted. Writes to any keys within these spans may cause a batch // to be backpressured. @@ -78,7 +63,7 @@ func canBackpressureBatch(ba roachpb.BatchRequest) bool { // method that is within a "backpressurable" key span. for _, ru := range ba.Requests { req := ru.GetInner() - if !backpressurableReqMethods.Contains(int(req.Method())) { + if !roachpb.CanBackpressure(req) { continue }