Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachpb: add canBackpressure request flag #36380

Merged
merged 3 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,23 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error {
}

const (
isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder
isRead // read-only cmds don't go through raft, but may run on lease holder
isWrite // write cmds go through raft and must be proposed on lease holder
isTxn // txn commands may be part of a transaction
isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution
isRange // range commands may span multiple keys
isReverse // reverse commands traverse ranges in descending direction
isAlone // requests which must be alone in a batch
isPrefix // requests which should be grouped with the next request in a batch
isUnsplittable // range command that must not be split during sending
// Requests for acquiring a lease skip the (proposal-time) check that the
// proposing replica has a valid lease.
skipLeaseCheck
consultsTSCache // mutating commands which write data at a timestamp
updatesReadTSCache // commands which update the read timestamp cache
updatesWriteTSCache // commands which update the write timestamp cache
updatesTSCacheOnError // commands which make read data available on errors
needsRefresh // commands which require refreshes to avoid serializable retries
isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder
isRead // read-only cmds don't go through raft, but may run on lease holder
isWrite // write cmds go through raft and must be proposed on lease holder
isTxn // txn commands may be part of a transaction
isTxnWrite // txn write cmds start heartbeat and are marked for intent resolution
isRange // range commands may span multiple keys
isReverse // reverse commands traverse ranges in descending direction
isAlone // requests which must be alone in a batch
isPrefix // requests which should be grouped with the next request in a batch
isUnsplittable // range command that must not be split during sending
skipLeaseCheck // commands which skip the check that the evaluting replica has a valid lease
consultsTSCache // mutating commands which write data at a timestamp
updatesReadTSCache // commands which update the read timestamp cache
updatesWriteTSCache // commands which update the write timestamp cache
updatesTSCacheOnErr // commands which make read data available on errors
needsRefresh // commands which require refreshes to avoid serializable retries
canBackpressure // commands which deserve backpressure when a Range grows too large
)

// IsReadOnly returns true iff the request is read-only.
Expand Down Expand Up @@ -157,7 +156,7 @@ func UpdatesWriteTimestampCache(args Request) bool {
// update the timestamp cache even on error, as in some cases the data
// which was read is returned (e.g. ConditionalPut ConditionFailedError).
func UpdatesTimestampCacheOnError(args Request) bool {
return (args.flags() & updatesTSCacheOnError) != 0
return (args.flags() & updatesTSCacheOnErr) != 0
}

// NeedsRefresh returns whether the command must be refreshed in
Expand All @@ -166,6 +165,12 @@ func NeedsRefresh(args Request) bool {
return (args.flags() & needsRefresh) != 0
}

// CanBackpressure returns whether the command can be backpressured
// when waiting for a Range to split after it has grown too large.
func CanBackpressure(args Request) bool {
return (args.flags() & canBackpressure) != 0
}

// Request is an interface for RPC requests.
type Request interface {
protoutil.Message
Expand Down Expand Up @@ -948,7 +953,9 @@ func NewReverseScan(key, endKey Key) Request {
}

func (*GetRequest) flags() int { return isRead | isTxn | updatesReadTSCache | needsRefresh }
func (*PutRequest) flags() int { return isWrite | isTxn | isTxnWrite | consultsTSCache }
func (*PutRequest) flags() int {
return isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure
}

// ConditionalPut effectively reads and may not write, so must update
// the timestamp cache. Note that on ConditionFailedErrors
Expand All @@ -957,7 +964,7 @@ func (*PutRequest) flags() int { return isWrite | isTxn | isTxnWrite | consultsT
// errors, they return an error immediately instead of continuing a
// serializable transaction to be retried at end transaction.
func (*ConditionalPutRequest) flags() int {
return isRead | isWrite | isTxn | isTxnWrite | updatesReadTSCache | updatesTSCacheOnError | consultsTSCache
return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | updatesReadTSCache | updatesTSCacheOnErr | canBackpressure
}

// InitPut, like ConditionalPut, effectively reads and may not write.
Expand All @@ -967,7 +974,7 @@ func (*ConditionalPutRequest) flags() int {
// immediately instead of continuing a serializable transaction to be
// retried at end transaction.
func (*InitPutRequest) flags() int {
return isRead | isWrite | isTxn | isTxnWrite | updatesReadTSCache | updatesTSCacheOnError | consultsTSCache
return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | updatesReadTSCache | updatesTSCacheOnErr | canBackpressure
}

// Increment reads the existing value, but always leaves an intent so
Expand All @@ -976,10 +983,12 @@ func (*InitPutRequest) flags() int {
// error immediately instead of continuing a serializable transaction
// to be retried at end transaction.
func (*IncrementRequest) flags() int {
return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache
return isRead | isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure
}

func (*DeleteRequest) flags() int { return isWrite | isTxn | isTxnWrite | consultsTSCache }
func (*DeleteRequest) flags() int {
return isWrite | isTxn | isTxnWrite | consultsTSCache | canBackpressure
}
func (drr *DeleteRangeRequest) flags() int {
// DeleteRangeRequest has different properties if the "inline" flag is set.
// This flag indicates that the request is deleting inline MVCC values,
Expand All @@ -1002,7 +1011,7 @@ func (drr *DeleteRangeRequest) flags() int {
// intents or tombstones for keys which don't yet exist. By updating
// the write timestamp cache, it forces subsequent writes to get a
// write-too-old error and avoids the phantom delete anomaly.
return isWrite | isTxn | isTxnWrite | isRange | updatesWriteTSCache | needsRefresh | consultsTSCache
return isWrite | isTxn | isTxnWrite | isRange | consultsTSCache | updatesWriteTSCache | needsRefresh | canBackpressure
}

// Note that ClearRange commands cannot be part of a transaction as
Expand Down Expand Up @@ -1041,7 +1050,7 @@ func (*QueryIntentRequest) flags() int { return isRead | isPrefix | updat
func (*ResolveIntentRequest) flags() int { return isWrite }
func (*ResolveIntentRangeRequest) flags() int { return isWrite | isRange }
func (*TruncateLogRequest) flags() int { return isWrite }
func (*MergeRequest) flags() int { return isWrite }
func (*MergeRequest) flags() int { return isWrite | canBackpressure }
func (*RequestLeaseRequest) flags() int { return isWrite | isAlone | skipLeaseCheck }

// LeaseInfoRequest is usually executed in an INCONSISTENT batch, which has the
Expand All @@ -1068,8 +1077,10 @@ func (*CheckConsistencyRequest) flags() int { return isAdmin | isRange }
func (*WriteBatchRequest) flags() int { return isWrite | isRange }
func (*ExportRequest) flags() int { return isRead | isRange | updatesReadTSCache }
func (*ImportRequest) flags() int { return isAdmin | isAlone }
func (*AdminScatterRequest) flags() int { return isAdmin | isAlone | isRange }
func (*AddSSTableRequest) flags() int { return isWrite | isAlone | isRange | isUnsplittable }
func (*AdminScatterRequest) flags() int { return isAdmin | isRange | isAlone }
func (*AddSSTableRequest) flags() int {
return isWrite | isRange | isAlone | isUnsplittable | canBackpressure
}

// RefreshRequest and RefreshRangeRequest both determine which timestamp cache
// they update based on their Write parameter.
Expand Down
17 changes: 1 addition & 16 deletions pkg/storage/replica_backpressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)
Expand All @@ -44,20 +43,6 @@ var backpressureRangeSizeMultiplier = settings.RegisterValidatedFloatSetting(
},
)

// backpressurableReqMethods is the set of all request methods that can
// be backpressured. If a batch contains any method in this set, it may
// be backpressured.
var backpressurableReqMethods = util.MakeFastIntSet(
int(roachpb.Put),
int(roachpb.InitPut),
int(roachpb.ConditionalPut),
int(roachpb.Merge),
int(roachpb.Increment),
int(roachpb.Delete),
int(roachpb.DeleteRange),
int(roachpb.AddSSTable),
)

// backpressurableSpans contains spans of keys where write backpressuring
// is permitted. Writes to any keys within these spans may cause a batch
// to be backpressured.
Expand All @@ -78,7 +63,7 @@ func canBackpressureBatch(ba roachpb.BatchRequest) bool {
// method that is within a "backpressurable" key span.
for _, ru := range ba.Requests {
req := ru.GetInner()
if !backpressurableReqMethods.Contains(int(req.Method())) {
if !roachpb.CanBackpressure(req) {
continue
}

Expand Down