From f0f9db6029680351424319c02b2513bfef5587e3 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 11 Feb 2019 15:24:15 -0500 Subject: [PATCH] intentresolver: batch intent resolution across transactions by range MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR modifies the IntentResolver to batch intent resolution across different transactions on a (best effort) per range basis. It acheives this batching by plumbing RangeDescriptorCache into the IntentResolver so that an intent-range mapping can be determined and then using the RequestBatcher introduced in concurrently without requiring additional goroutines this change extends the interface to allow the client to provide the response channel. **Performance Wins** The change yields significant throughput wins in targetted benchmarks. We expect that write-heavy workloads which generate a large number of intents scattered over ranges to benefit most from this change. The following benchmarks were run using kv0 with a batch size of 10 and secondary indices enabled (see command). On both 4- and 32-core nodes a ~30% throughput increase is observed. ``` ./workload run kv '{pgurl:1-3}' --init --splits=256 --duration 90s --batch 10 --secondary-index --read-percent=0 --concurrency=512 ``` 8-Core ``` name old ops/s new ops/s delta Cockroach 510 ± 3% 651 ± 7% +27.58% (p=0.008 n=5+5) ``` 32-core ``` name old ops/s new ops/s delta KV0 1.01k ± 3% 1.32k ± 1% +30.78% (p=0.008 n=5+5) ``` No change in performance for TPCC was observed: 8-Core (400 warehouses) ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 300.0s 6073.2 94.5% 180.0 167.8 268.4 318.8 402.7 2013.3 name old ops/s new ops/s delta tpmc 6.09k ± 1% 6.08k ± 0% ~ (p=0.886 n=4+4) ``` 32-Core (1500 warehouses) ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 300.0s 19303.0 93.8% 342.5 335.5 453.0 520.1 704.6 3623.9 name old ops/s new ops/s delta tpmc 19.3k ± 0% 19.3k ± 0% ~ (p=1.000 n=4+4) ``` Release note (performance improvement): Increase write throughput for workloads which write large numbers of intents by coalescing intent resolution requests across transactions. --- pkg/server/server.go | 1 + pkg/storage/gc_queue_test.go | 2 +- pkg/storage/intentresolver/intent_resolver.go | 158 ++++++++++++------ .../intentresolver/intent_resolver_test.go | 72 +++++--- pkg/storage/storagebase/knobs.go | 4 + pkg/storage/store.go | 33 ++-- 6 files changed, 183 insertions(+), 87 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 4a0dc8cefc85..8f4942ad5712 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -446,6 +446,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { StorePool: s.storePool, SQLExecutor: internalExecutor, LogRangeEvents: s.cfg.EventLogEnabled, + RangeDescriptorCache: s.distSender.RangeDescriptorCache(), TimeSeriesDataStore: s.tsDB, // Initialize the closed timestamp subsystem. Note that it won't diff --git a/pkg/storage/gc_queue_test.go b/pkg/storage/gc_queue_test.go index 8c5ec3a77db2..0778168dc3ab 100644 --- a/pkg/storage/gc_queue_test.go +++ b/pkg/storage/gc_queue_test.go @@ -674,7 +674,7 @@ func TestGCQueueTransactionTable(t *testing.T) { } var resolved syncmap.Map - + tsc.TestingKnobs.IntentResolverKnobs.MaxIntentResolutionBatchSize = 1 tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = func(filterArgs storagebase.FilterArgs) *roachpb.Error { if resArgs, ok := filterArgs.Req.(*roachpb.ResolveIntentRequest); ok { diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index 28344fabcdcf..4cc863384dbf 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/internal/client/requestbatcher" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" @@ -81,15 +82,29 @@ const ( // defaultGCBatchWait is the default duration which the gc request batcher // will wait between requests for a range before sending it. defaultGCBatchWait = time.Second + + // intentResolutionBatchWait is used to configure the RequestBatcher which + // batches intent resolution requests across transactions. Intent resolution + // needs to occur in a relatively short period of time after the completion + // of a transaction in order to minimize the contention footprint of the write + // for other contending writes. The chosen value was selected based on some + // light experimentation to ensure that performance does not degrade in the + // face of highly contended workloads. + intentResolutionBatchWait = 10 * time.Millisecond + + // intentResolutionBatchIdle is similar to the above setting but is used when + // when no additional traffic hits the batch. + intentResolutionBatchIdle = 5 * time.Millisecond ) // Config contains the dependencies to construct an IntentResolver. type Config struct { - Clock *hlc.Clock - DB *client.DB - Stopper *stop.Stopper - AmbientCtx log.AmbientContext - TestingKnobs storagebase.IntentResolverTestingKnobs + Clock *hlc.Clock + DB *client.DB + Stopper *stop.Stopper + AmbientCtx log.AmbientContext + TestingKnobs storagebase.IntentResolverTestingKnobs + RangeDescriptorCache kvbase.RangeDescriptorCache TaskLimit int MaxGCBatchWait time.Duration @@ -109,7 +124,10 @@ type IntentResolver struct { sem chan struct{} // Semaphore to limit async goroutines. contentionQ *contentionQueue // manages contention on individual keys - batcher *requestbatcher.RequestBatcher + rdc kvbase.RangeDescriptorCache + + gcBatcher *requestbatcher.RequestBatcher + irBatcher *requestbatcher.RequestBatcher mu struct { syncutil.Mutex @@ -136,6 +154,19 @@ func setConfigDefaults(c *Config) { if c.MaxGCBatchWait == 0 { c.MaxGCBatchWait = defaultGCBatchWait } + if c.RangeDescriptorCache == nil { + c.RangeDescriptorCache = nopRangeDescriptorCache{} + } +} + +type nopRangeDescriptorCache struct{} + +var zeroRangeDescriptor = &roachpb.RangeDescriptor{} + +func (nrdc nopRangeDescriptorCache) LookupRangeDescriptor( + ctx context.Context, key roachpb.RKey, +) (*roachpb.RangeDescriptor, error) { + return zeroRangeDescriptor, nil } // New creates an new IntentResolver. @@ -149,19 +180,31 @@ func New(c Config) *IntentResolver { contentionQ: newContentionQueue(c.Clock, c.DB), every: log.Every(time.Minute), Metrics: makeMetrics(), + rdc: c.RangeDescriptorCache, testingKnobs: c.TestingKnobs, } ir.mu.inFlightPushes = map[uuid.UUID]int{} ir.mu.inFlightTxnCleanups = map[uuid.UUID]struct{}{} - ir.batcher = requestbatcher.New(requestbatcher.Config{ - Name: "intent_resolver_batcher", + ir.gcBatcher = requestbatcher.New(requestbatcher.Config{ + Name: "intent_resolver_gc_batcher", MaxMsgsPerBatch: 1024, MaxWait: c.MaxGCBatchWait, MaxIdle: c.MaxGCBatchIdle, Stopper: c.Stopper, Sender: c.DB.NonTransactionalSender(), }) - + batchSize := intentResolverBatchSize + if c.TestingKnobs.MaxIntentResolutionBatchSize > 0 { + batchSize = c.TestingKnobs.MaxIntentResolutionBatchSize + } + ir.irBatcher = requestbatcher.New(requestbatcher.Config{ + Name: "intent_resolver_ir_batcher", + MaxMsgsPerBatch: batchSize, + MaxWait: intentResolutionBatchWait, + MaxIdle: intentResolutionBatchIdle, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), + }) return ir } @@ -724,7 +767,12 @@ func (ir *IntentResolver) gcTxnRecord( gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{ Key: txnKey, }) - _, err := ir.batcher.Send(ctx, rangeID, &gcArgs) + // Although the IntentResolver has a RangeDescriptorCache it could consult to + // to determine the range to which this request corresponds, GCRequests are + // always issued on behalf of the range on which this record resides which is + // a strong signal that it is the range which will contain the transaction + // record now. + _, err := ir.gcBatcher.Send(ctx, rangeID, &gcArgs) if err != nil { return errors.Wrapf(err, "could not GC completed transaction anchored at %s", roachpb.Key(txn.Key)) @@ -795,6 +843,26 @@ type ResolveOptions struct { MinTimestamp hlc.Timestamp } +// lookupRangeID maps a key to a RangeID for best effort batching of intent +// resolution requests. +func (ir *IntentResolver) lookupRangeID(ctx context.Context, key roachpb.Key) roachpb.RangeID { + rKey, err := keys.Addr(key) + if err != nil { + if ir.every.ShouldLog() { + log.Warningf(ctx, "failed to resolve addr for key %q: %v", key, err) + } + return 0 + } + rDesc, err := ir.rdc.LookupRangeDescriptor(ctx, rKey) + if err != nil { + if ir.every.ShouldLog() { + log.Warningf(ctx, "failed to look up range descriptor for key %q: %v", key, err) + } + return 0 + } + return rDesc.RangeID +} + // ResolveIntents synchronously resolves intents accordings to opts. func (ir *IntentResolver) ResolveIntents( ctx context.Context, intents []roachpb.Intent, opts ResolveOptions, @@ -808,18 +876,27 @@ func (ir *IntentResolver) ResolveIntents( return errors.Wrap(err, "aborted resolving intents") } log.Eventf(ctx, "resolving intents [wait=%t]", opts.Wait) - - var resolveReqs []roachpb.Request + ctx, cancel := context.WithCancel(ctx) + defer cancel() + type resolveReq struct { + rangeID roachpb.RangeID + req roachpb.Request + } + var resolveReqs []resolveReq var resolveRangeReqs []roachpb.Request for i := range intents { intent := intents[i] // avoids a race in `i, intent := range ...` if len(intent.EndKey) == 0 { - resolveReqs = append(resolveReqs, &roachpb.ResolveIntentRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span), - IntentTxn: intent.Txn, - Status: intent.Status, - Poison: opts.Poison, - }) + resolveReqs = append(resolveReqs, + resolveReq{ + rangeID: ir.lookupRangeID(ctx, intent.Key), + req: &roachpb.ResolveIntentRequest{ + RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span), + IntentTxn: intent.Txn, + Status: intent.Status, + Poison: opts.Poison, + }, + }) } else { resolveRangeReqs = append(resolveRangeReqs, &roachpb.ResolveIntentRangeRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span), @@ -831,40 +908,23 @@ func (ir *IntentResolver) ResolveIntents( } } - // Sort the intents to maximize batching by range. - sort.Slice(resolveReqs, func(i, j int) bool { - return resolveReqs[i].Header().Key.Compare(resolveReqs[j].Header().Key) < 0 - }) - - // Resolve all of the intents in batches of size intentResolverBatchSize. - // The maximum timeout is intentResolverTimeout, and this is applied to - // each batch to ensure forward progress is made. A large set of intents - // might require more time than a single timeout allows. - for len(resolveReqs) > 0 { - b := &client.Batch{} - if len(resolveReqs) > intentResolverBatchSize { - b.AddRawRequest(resolveReqs[:intentResolverBatchSize]...) - resolveReqs = resolveReqs[intentResolverBatchSize:] - } else { - b.AddRawRequest(resolveReqs...) - resolveReqs = nil - } - // Everything here is best effort; so give the context a timeout - // to avoid waiting too long. This may be a larger timeout than - // the context already has, in which case we'll respect the - // existing timeout. A single txn can have more intents than we - // can handle in the normal timeout, which would prevent us from - // ever cleaning up all of its intents in time to then delete the - // txn record, causing an infinite loop on that txn record, where - // the same initial set of intents is endlessly re-resolved. - if err := contextutil.RunWithTimeout(ctx, "resolve intents", intentResolverTimeout, - func(ctx context.Context) error { - return ir.db.Run(ctx, b) - }); err != nil { - // Bail out on the first error. + respChan := make(chan requestbatcher.Response, len(resolveReqs)) + for _, req := range resolveReqs { + if err := ir.irBatcher.SendWithChan(ctx, respChan, req.rangeID, req.req); err != nil { return err } } + for seen := 0; seen < len(resolveReqs); seen++ { + select { + case resp := <-respChan: + if resp.Err != nil { + return resp.Err + } + _ = resp.Resp // ignore the response + case <-ctx.Done(): + return ctx.Err() + } + } // Resolve spans differently. We don't know how many intents will be // swept up with each request, so we limit the spanning resolve diff --git a/pkg/storage/intentresolver/intent_resolver_test.go b/pkg/storage/intentresolver/intent_resolver_test.go index f0cea93650c9..4a414e7cbc3b 100644 --- a/pkg/storage/intentresolver/intent_resolver_test.go +++ b/pkg/storage/intentresolver/intent_resolver_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" @@ -182,7 +183,8 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { for _, c := range cases { t.Run("", func(t *testing.T) { - ir := newIntentResolverWithSendFuncs(stopper, clock, &c.sendFuncs) + sf := &sendFuncs{sendFuncs: c.sendFuncs} + ir := newIntentResolverWithSendFuncs(stopper, clock, sf) var didPush, didSucceed bool done := make(chan struct{}) onComplete := func(pushed, succeeded bool) { @@ -194,7 +196,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { t.Fatalf("unexpected error sending async transaction") } <-done - if len(c.sendFuncs) != 0 { + if sf.len() != 0 { t.Errorf("Not all send funcs called") } if didSucceed != c.expectSucceed { @@ -427,12 +429,12 @@ func TestCleanupIntentsAsyncThrottled(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) stopper := stop.NewStopper() defer stopper.Stop(context.Background()) - sendFuncs := []sendFunc{ - singlePushTxnSendFunc, - resolveIntentsSendFunc, - } txn := beginTransaction(t, clock, 1, roachpb.Key("a"), true /* putKey */) - ir := newIntentResolverWithSendFuncs(stopper, clock, &sendFuncs) + sf := newSendFuncs( + pushTxnSendFunc(1), + resolveIntentsSendFunc, + ) + ir := newIntentResolverWithSendFuncs(stopper, clock, sf) // Run defaultTaskLimit tasks which will block until blocker is closed. blocker := make(chan struct{}) defer close(blocker) @@ -461,7 +463,7 @@ func TestCleanupIntentsAsyncThrottled(t *testing.T) { // sendFuncs. err = ir.CleanupIntentsAsync(context.Background(), testIntentsWithArg, true) assert.Nil(t, err) - assert.Len(t, sendFuncs, 0) + assert.Equal(t, sf.len(), 0) } // TestCleanupIntentsAsync verifies that CleanupIntentsAsync sends the expected @@ -504,16 +506,47 @@ func TestCleanupIntentsAsync(t *testing.T) { for _, c := range cases { t.Run("", func(t *testing.T) { stopper := stop.NewStopper() - sendFuncs := c.sendFuncs - ir := newIntentResolverWithSendFuncs(stopper, clock, &sendFuncs) + sf := newSendFuncs(c.sendFuncs...) + ir := newIntentResolverWithSendFuncs(stopper, clock, sf) err := ir.CleanupIntentsAsync(context.Background(), c.intents, true) + testutils.SucceedsSoon(t, func() error { + if l := sf.len(); l > 0 { + return fmt.Errorf("Still have %d funcs to send", l) + } + return nil + }) stopper.Stop(context.Background()) assert.Nil(t, err, "error from CleanupIntentsAsync") - assert.Len(t, sendFuncs, 0, "did not use all sendFuncs") }) } } +func newSendFuncs(sf ...sendFunc) *sendFuncs { + return &sendFuncs{sendFuncs: sf} +} + +type sendFuncs struct { + mu syncutil.Mutex + sendFuncs []sendFunc +} + +func (sf *sendFuncs) len() int { + sf.mu.Lock() + defer sf.mu.Unlock() + return len(sf.sendFuncs) +} + +func (sf *sendFuncs) pop() sendFunc { + sf.mu.Lock() + defer sf.mu.Unlock() + if len(sf.sendFuncs) == 0 { + panic("no send funcs left!") + } + ret := sf.sendFuncs[0] + sf.sendFuncs = sf.sendFuncs[1:] + return ret +} + // TestCleanupTxnIntentsAsync verifies that CleanupTxnIntentsAsync sends the // expected requests. func TestCleanupTxnIntentsAsync(t *testing.T) { @@ -560,8 +593,8 @@ func TestCleanupTxnIntentsAsync(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) var sendFuncCalled int64 numSendFuncs := int64(len(c.sendFuncs)) - sendFuncs := counterSendFuncs(&sendFuncCalled, c.sendFuncs) - ir := newIntentResolverWithSendFuncs(stopper, clock, &sendFuncs) + sf := newSendFuncs(counterSendFuncs(&sendFuncCalled, c.sendFuncs)...) + ir := newIntentResolverWithSendFuncs(stopper, clock, sf) if c.before != nil { defer c.before(&c, ir)() } @@ -574,7 +607,7 @@ func TestCleanupTxnIntentsAsync(t *testing.T) { }) stopper.Stop(context.Background()) assert.Nil(t, err) - assert.Len(t, sendFuncs, 0) + assert.Equal(t, sf.len(), 0) }) } } @@ -650,8 +683,7 @@ func TestCleanupIntents(t *testing.T) { defer stopper.Stop(context.Background()) for _, c := range cases { t.Run("", func(t *testing.T) { - sendFuncs := c.sendFuncs - ir := newIntentResolverWithSendFuncs(stopper, clock, &sendFuncs) + ir := newIntentResolverWithSendFuncs(stopper, clock, newSendFuncs(c.sendFuncs...)) num, err := ir.CleanupIntents(context.Background(), c.intents, clock.Now(), roachpb.PUSH_ABORT) assert.Equal(t, num, c.expectedNum, "number of resolved intents") assert.Equal(t, err != nil, c.expectedErr, "error during CleanupIntents: %v", err) @@ -739,15 +771,11 @@ func makeTxnIntents(t *testing.T, clock *hlc.Clock, numIntents int) []roachpb.In type sendFunc func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) func newIntentResolverWithSendFuncs( - stopper *stop.Stopper, clock *hlc.Clock, sendFuncs *[]sendFunc, + stopper *stop.Stopper, clock *hlc.Clock, sf *sendFuncs, ) *IntentResolver { txnSenderFactory := client.NonTransactionalFactoryFunc( func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - if len(*sendFuncs) == 0 { - panic("no send funcs left!") - } - f := (*sendFuncs)[0] - *sendFuncs = (*sendFuncs)[1:] + f := sf.pop() return f(ba) }) db := client.NewDB(log.AmbientContext{ diff --git a/pkg/storage/storagebase/knobs.go b/pkg/storage/storagebase/knobs.go index 39b6dac4dcab..92717f9d6780 100644 --- a/pkg/storage/storagebase/knobs.go +++ b/pkg/storage/storagebase/knobs.go @@ -48,4 +48,8 @@ type IntentResolverTestingKnobs struct { // performed synchronously. It is equivalent to setting IntentResolverTaskLimit // to -1. ForceSyncIntentResolution bool + + // MaxIntentResolutionBatchSize overrides the maximum number of intent + // resolution requests which can be sent in a single batch. + MaxIntentResolutionBatchSize int } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 66e1c2ff870c..f0d71aad3f1f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -594,15 +595,16 @@ type StoreConfig struct { AmbientCtx log.AmbientContext base.RaftConfig - Settings *cluster.Settings - Clock *hlc.Clock - DB *client.DB - Gossip *gossip.Gossip - NodeLiveness *NodeLiveness - StorePool *StorePool - Transport *RaftTransport - NodeDialer *nodedialer.Dialer - RPCContext *rpc.Context + Settings *cluster.Settings + Clock *hlc.Clock + DB *client.DB + Gossip *gossip.Gossip + NodeLiveness *NodeLiveness + StorePool *StorePool + Transport *RaftTransport + NodeDialer *nodedialer.Dialer + RPCContext *rpc.Context + RangeDescriptorCache kvbase.RangeDescriptorCache ClosedTimestamp *container.Container @@ -1262,12 +1264,13 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // create the intent resolver s.intentResolver = intentresolver.New(intentresolver.Config{ - Clock: s.cfg.Clock, - DB: s.db, - Stopper: stopper, - TaskLimit: s.cfg.IntentResolverTaskLimit, - AmbientCtx: s.cfg.AmbientCtx, - TestingKnobs: s.cfg.TestingKnobs.IntentResolverKnobs, + Clock: s.cfg.Clock, + DB: s.db, + Stopper: stopper, + TaskLimit: s.cfg.IntentResolverTaskLimit, + AmbientCtx: s.cfg.AmbientCtx, + TestingKnobs: s.cfg.TestingKnobs.IntentResolverKnobs, + RangeDescriptorCache: s.cfg.RangeDescriptorCache, }) s.metrics.registry.AddMetricStruct(s.intentResolver.Metrics)