Skip to content

Commit

Permalink
intentresolver: batch intent resolution across transactions by range
Browse files Browse the repository at this point in the history
This PR modifies the IntentResolver to batch intent resolution across different
transactions on a (best effort) per range basis. It acheives this batching by
plumbing RangeDescriptorCache into the IntentResolver so that an intent-range
mapping can be determined and then using the RequestBatcher introduced in
concurrently without requiring additional goroutines this change extends the
interface to allow the client to provide the response channel.

**Performance Wins**

The change yields significant throughput wins in targetted benchmarks. We expect
that write-heavy workloads which generate a large number of intents scattered
over ranges to benefit most from this change. The following benchmarks were run
using kv0 with a batch size of 10 and secondary indices enabled (see command).
On both 4- and 32-core nodes a ~30% throughput increase is observed.

```
./workload run kv '{pgurl:1-3}' --init --splits=256 --duration 90s --batch 10 --secondary-index --read-percent=0 --concurrency=512
```

8-Core
```
name       old ops/s  new ops/s  delta
Cockroach   510 ± 3%   651 ± 7%  +27.58%  (p=0.008 n=5+5)
```

32-core
```
name       old ops/s   new ops/s   delta
KV0        1.01k ± 3%  1.32k ± 1%  +30.78%  (p=0.008 n=5+5)
```

No change in performance for TPCC was observed:

8-Core (400 warehouses)
```
_elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms)
  300.0s     6073.2  94.5%    180.0    167.8    268.4    318.8    402.7   2013.3
name       old ops/s   new ops/s   delta
tpmc       6.09k ± 1%  6.08k ± 0%   ~     (p=0.886 n=4+4)
```

32-Core (1500 warehouses)
```
_elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms)
  300.0s    19303.0  93.8%    342.5    335.5    453.0    520.1    704.6   3623.9
name       old ops/s   new ops/s   delta
tpmc      19.3k ± 0%  19.3k ± 0%   ~     (p=1.000 n=4+4)
```

Release note (performance improvement): Increase write throughput for workloads
which write large numbers of intents by coalescing intent resolution requests
across transactions.
  • Loading branch information
ajwerner committed Feb 19, 2019
1 parent fc31766 commit f0f9db6
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 87 deletions.
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
StorePool: s.storePool,
SQLExecutor: internalExecutor,
LogRangeEvents: s.cfg.EventLogEnabled,
RangeDescriptorCache: s.distSender.RangeDescriptorCache(),
TimeSeriesDataStore: s.tsDB,

// Initialize the closed timestamp subsystem. Note that it won't
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func TestGCQueueTransactionTable(t *testing.T) {
}

var resolved syncmap.Map

tsc.TestingKnobs.IntentResolverKnobs.MaxIntentResolutionBatchSize = 1
tsc.TestingKnobs.EvalKnobs.TestingEvalFilter =
func(filterArgs storagebase.FilterArgs) *roachpb.Error {
if resArgs, ok := filterArgs.Req.(*roachpb.ResolveIntentRequest); ok {
Expand Down
158 changes: 109 additions & 49 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/internal/client/requestbatcher"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -81,15 +82,29 @@ const (
// defaultGCBatchWait is the default duration which the gc request batcher
// will wait between requests for a range before sending it.
defaultGCBatchWait = time.Second

// intentResolutionBatchWait is used to configure the RequestBatcher which
// batches intent resolution requests across transactions. Intent resolution
// needs to occur in a relatively short period of time after the completion
// of a transaction in order to minimize the contention footprint of the write
// for other contending writes. The chosen value was selected based on some
// light experimentation to ensure that performance does not degrade in the
// face of highly contended workloads.
intentResolutionBatchWait = 10 * time.Millisecond

// intentResolutionBatchIdle is similar to the above setting but is used when
// when no additional traffic hits the batch.
intentResolutionBatchIdle = 5 * time.Millisecond
)

// Config contains the dependencies to construct an IntentResolver.
type Config struct {
Clock *hlc.Clock
DB *client.DB
Stopper *stop.Stopper
AmbientCtx log.AmbientContext
TestingKnobs storagebase.IntentResolverTestingKnobs
Clock *hlc.Clock
DB *client.DB
Stopper *stop.Stopper
AmbientCtx log.AmbientContext
TestingKnobs storagebase.IntentResolverTestingKnobs
RangeDescriptorCache kvbase.RangeDescriptorCache

TaskLimit int
MaxGCBatchWait time.Duration
Expand All @@ -109,7 +124,10 @@ type IntentResolver struct {
sem chan struct{} // Semaphore to limit async goroutines.
contentionQ *contentionQueue // manages contention on individual keys

batcher *requestbatcher.RequestBatcher
rdc kvbase.RangeDescriptorCache

gcBatcher *requestbatcher.RequestBatcher
irBatcher *requestbatcher.RequestBatcher

mu struct {
syncutil.Mutex
Expand All @@ -136,6 +154,19 @@ func setConfigDefaults(c *Config) {
if c.MaxGCBatchWait == 0 {
c.MaxGCBatchWait = defaultGCBatchWait
}
if c.RangeDescriptorCache == nil {
c.RangeDescriptorCache = nopRangeDescriptorCache{}
}
}

type nopRangeDescriptorCache struct{}

var zeroRangeDescriptor = &roachpb.RangeDescriptor{}

func (nrdc nopRangeDescriptorCache) LookupRangeDescriptor(
ctx context.Context, key roachpb.RKey,
) (*roachpb.RangeDescriptor, error) {
return zeroRangeDescriptor, nil
}

// New creates an new IntentResolver.
Expand All @@ -149,19 +180,31 @@ func New(c Config) *IntentResolver {
contentionQ: newContentionQueue(c.Clock, c.DB),
every: log.Every(time.Minute),
Metrics: makeMetrics(),
rdc: c.RangeDescriptorCache,
testingKnobs: c.TestingKnobs,
}
ir.mu.inFlightPushes = map[uuid.UUID]int{}
ir.mu.inFlightTxnCleanups = map[uuid.UUID]struct{}{}
ir.batcher = requestbatcher.New(requestbatcher.Config{
Name: "intent_resolver_batcher",
ir.gcBatcher = requestbatcher.New(requestbatcher.Config{
Name: "intent_resolver_gc_batcher",
MaxMsgsPerBatch: 1024,
MaxWait: c.MaxGCBatchWait,
MaxIdle: c.MaxGCBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})

batchSize := intentResolverBatchSize
if c.TestingKnobs.MaxIntentResolutionBatchSize > 0 {
batchSize = c.TestingKnobs.MaxIntentResolutionBatchSize
}
ir.irBatcher = requestbatcher.New(requestbatcher.Config{
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: batchSize,
MaxWait: intentResolutionBatchWait,
MaxIdle: intentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
return ir
}

Expand Down Expand Up @@ -724,7 +767,12 @@ func (ir *IntentResolver) gcTxnRecord(
gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{
Key: txnKey,
})
_, err := ir.batcher.Send(ctx, rangeID, &gcArgs)
// Although the IntentResolver has a RangeDescriptorCache it could consult to
// to determine the range to which this request corresponds, GCRequests are
// always issued on behalf of the range on which this record resides which is
// a strong signal that it is the range which will contain the transaction
// record now.
_, err := ir.gcBatcher.Send(ctx, rangeID, &gcArgs)
if err != nil {
return errors.Wrapf(err, "could not GC completed transaction anchored at %s",
roachpb.Key(txn.Key))
Expand Down Expand Up @@ -795,6 +843,26 @@ type ResolveOptions struct {
MinTimestamp hlc.Timestamp
}

// lookupRangeID maps a key to a RangeID for best effort batching of intent
// resolution requests.
func (ir *IntentResolver) lookupRangeID(ctx context.Context, key roachpb.Key) roachpb.RangeID {
rKey, err := keys.Addr(key)
if err != nil {
if ir.every.ShouldLog() {
log.Warningf(ctx, "failed to resolve addr for key %q: %v", key, err)
}
return 0
}
rDesc, err := ir.rdc.LookupRangeDescriptor(ctx, rKey)
if err != nil {
if ir.every.ShouldLog() {
log.Warningf(ctx, "failed to look up range descriptor for key %q: %v", key, err)
}
return 0
}
return rDesc.RangeID
}

// ResolveIntents synchronously resolves intents accordings to opts.
func (ir *IntentResolver) ResolveIntents(
ctx context.Context, intents []roachpb.Intent, opts ResolveOptions,
Expand All @@ -808,18 +876,27 @@ func (ir *IntentResolver) ResolveIntents(
return errors.Wrap(err, "aborted resolving intents")
}
log.Eventf(ctx, "resolving intents [wait=%t]", opts.Wait)

var resolveReqs []roachpb.Request
ctx, cancel := context.WithCancel(ctx)
defer cancel()
type resolveReq struct {
rangeID roachpb.RangeID
req roachpb.Request
}
var resolveReqs []resolveReq
var resolveRangeReqs []roachpb.Request
for i := range intents {
intent := intents[i] // avoids a race in `i, intent := range ...`
if len(intent.EndKey) == 0 {
resolveReqs = append(resolveReqs, &roachpb.ResolveIntentRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span),
IntentTxn: intent.Txn,
Status: intent.Status,
Poison: opts.Poison,
})
resolveReqs = append(resolveReqs,
resolveReq{
rangeID: ir.lookupRangeID(ctx, intent.Key),
req: &roachpb.ResolveIntentRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span),
IntentTxn: intent.Txn,
Status: intent.Status,
Poison: opts.Poison,
},
})
} else {
resolveRangeReqs = append(resolveRangeReqs, &roachpb.ResolveIntentRangeRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span),
Expand All @@ -831,40 +908,23 @@ func (ir *IntentResolver) ResolveIntents(
}
}

// Sort the intents to maximize batching by range.
sort.Slice(resolveReqs, func(i, j int) bool {
return resolveReqs[i].Header().Key.Compare(resolveReqs[j].Header().Key) < 0
})

// Resolve all of the intents in batches of size intentResolverBatchSize.
// The maximum timeout is intentResolverTimeout, and this is applied to
// each batch to ensure forward progress is made. A large set of intents
// might require more time than a single timeout allows.
for len(resolveReqs) > 0 {
b := &client.Batch{}
if len(resolveReqs) > intentResolverBatchSize {
b.AddRawRequest(resolveReqs[:intentResolverBatchSize]...)
resolveReqs = resolveReqs[intentResolverBatchSize:]
} else {
b.AddRawRequest(resolveReqs...)
resolveReqs = nil
}
// Everything here is best effort; so give the context a timeout
// to avoid waiting too long. This may be a larger timeout than
// the context already has, in which case we'll respect the
// existing timeout. A single txn can have more intents than we
// can handle in the normal timeout, which would prevent us from
// ever cleaning up all of its intents in time to then delete the
// txn record, causing an infinite loop on that txn record, where
// the same initial set of intents is endlessly re-resolved.
if err := contextutil.RunWithTimeout(ctx, "resolve intents", intentResolverTimeout,
func(ctx context.Context) error {
return ir.db.Run(ctx, b)
}); err != nil {
// Bail out on the first error.
respChan := make(chan requestbatcher.Response, len(resolveReqs))
for _, req := range resolveReqs {
if err := ir.irBatcher.SendWithChan(ctx, respChan, req.rangeID, req.req); err != nil {
return err
}
}
for seen := 0; seen < len(resolveReqs); seen++ {
select {
case resp := <-respChan:
if resp.Err != nil {
return resp.Err
}
_ = resp.Resp // ignore the response
case <-ctx.Done():
return ctx.Err()
}
}

// Resolve spans differently. We don't know how many intents will be
// swept up with each request, so we limit the spanning resolve
Expand Down
Loading

0 comments on commit f0f9db6

Please sign in to comment.