-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
dist_sender.go
1727 lines (1608 loc) · 64.8 KB
/
dist_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package kv
import (
"context"
"fmt"
"sync/atomic"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
)
const (
// The default limit for asynchronous senders.
defaultSenderConcurrency = 500
// The maximum number of range descriptors to prefetch during range lookups.
rangeLookupPrefetchCount = 8
)
var (
metaDistSenderBatchCount = metric.Metadata{
Name: "distsender.batches",
Help: "Number of batches processed",
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
metaDistSenderPartialBatchCount = metric.Metadata{
Name: "distsender.batches.partial",
Help: "Number of partial batches processed after being divided on range boundaries",
Measurement: "Partial Batches",
Unit: metric.Unit_COUNT,
}
metaDistSenderAsyncSentCount = metric.Metadata{
Name: "distsender.batches.async.sent",
Help: "Number of partial batches sent asynchronously",
Measurement: "Partial Batches",
Unit: metric.Unit_COUNT,
}
metaDistSenderAsyncThrottledCount = metric.Metadata{
Name: "distsender.batches.async.throttled",
Help: "Number of partial batches not sent asynchronously due to throttling",
Measurement: "Partial Batches",
Unit: metric.Unit_COUNT,
}
metaTransportSentCount = metric.Metadata{
Name: "distsender.rpc.sent",
Help: "Number of RPCs sent",
Measurement: "RPCs",
Unit: metric.Unit_COUNT,
}
metaTransportLocalSentCount = metric.Metadata{
Name: "distsender.rpc.sent.local",
Help: "Number of local RPCs sent",
Measurement: "RPCs",
Unit: metric.Unit_COUNT,
}
metaTransportSenderNextReplicaErrCount = metric.Metadata{
Name: "distsender.rpc.sent.nextreplicaerror",
Help: "Number of RPCs sent due to per-replica errors",
Measurement: "RPCs",
Unit: metric.Unit_COUNT,
}
metaDistSenderNotLeaseHolderErrCount = metric.Metadata{
Name: "distsender.errors.notleaseholder",
Help: "Number of NotLeaseHolderErrors encountered",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
metaDistSenderInLeaseTransferBackoffsCount = metric.Metadata{
Name: "distsender.errors.inleasetransferbackoffs",
Help: "Number of times backed off due to NotLeaseHolderErrors during lease transfer.",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
)
// CanSendToFollower is used by the DistSender to determine if it needs to look
// up the current lease holder for a request. It is used by the
// followerreadsccl code to inject logic to check if follower reads are enabled.
// By default, without CCL code, this function returns false.
var CanSendToFollower = func(
clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest,
) bool {
return false
}
var rangeDescriptorCacheSize = settings.RegisterIntSetting(
"kv.range_descriptor_cache.size",
"maximum number of entries in the range descriptor and leaseholder caches",
1e6,
)
// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
}
func makeDistSenderMetrics() DistSenderMetrics {
return DistSenderMetrics{
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
}
}
// A firstRangeMissingError indicates that the first range has not yet
// been gossiped. This will be the case for a node which hasn't yet
// joined the gossip network.
type firstRangeMissingError struct{}
// Error is part of the error interface.
func (f firstRangeMissingError) Error() string {
return "the descriptor for the first range is not available via gossip"
}
// A DistSender provides methods to access Cockroach's monolithic,
// distributed key value store. Each method invocation triggers a
// lookup or lookups to find replica metadata for implicated key
// ranges. RPCs are sent to one or more of the replicas to satisfy
// the method invocation.
type DistSender struct {
log.AmbientContext
st *cluster.Settings
// nodeDescriptor, if set, holds the descriptor of the node the
// DistSender lives on. It should be accessed via getNodeDescriptor(),
// which tries to obtain the value from the Gossip network if the
// descriptor is unknown.
nodeDescriptor unsafe.Pointer
// clock is used to set time for some calls. E.g. read-only ops
// which span ranges and don't require read consistency.
clock *hlc.Clock
// gossip provides up-to-date information about the start of the
// key range, used to find the replica metadata for arbitrary key
// ranges.
gossip *gossip.Gossip
metrics DistSenderMetrics
// rangeCache caches replica metadata for key ranges.
rangeCache *RangeDescriptorCache
// leaseHolderCache caches range lease holders by range ID.
leaseHolderCache *LeaseHolderCache
transportFactory TransportFactory
rpcContext *rpc.Context
nodeDialer *nodedialer.Dialer
rpcRetryOptions retry.Options
asyncSenderSem chan struct{}
// clusterID is used to verify access to enterprise features.
// It is copied out of the rpcContext at construction time and used in
// testing.
clusterID *base.ClusterIDContainer
// disableFirstRangeUpdates disables updates of the first range via
// gossip. Used by tests which want finer control of the contents of the
// range cache.
disableFirstRangeUpdates int32
// disableParallelBatches instructs DistSender to never parallelize
// the transmission of partial batch requests across ranges.
disableParallelBatches bool
}
var _ client.Sender = &DistSender{}
// DistSenderConfig holds configuration and auxiliary objects that can be passed
// to NewDistSender.
type DistSenderConfig struct {
AmbientCtx log.AmbientContext
Settings *cluster.Settings
Clock *hlc.Clock
RPCRetryOptions *retry.Options
// nodeDescriptor, if provided, is used to describe which node the DistSender
// lives on, for instance when deciding where to send RPCs.
// Usually it is filled in from the Gossip network on demand.
nodeDescriptor *roachpb.NodeDescriptor
RPCContext *rpc.Context
RangeDescriptorDB RangeDescriptorDB
NodeDialer *nodedialer.Dialer
TestingKnobs ClientTestingKnobs
}
// NewDistSender returns a batch.Sender instance which connects to the
// Cockroach cluster via the supplied gossip instance. Supplying a
// DistSenderContext or the fields within is optional. For omitted values, sane
// defaults will be used.
func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender {
ds := &DistSender{
st: cfg.Settings,
clock: cfg.Clock,
gossip: g,
metrics: makeDistSenderMetrics(),
nodeDialer: cfg.NodeDialer,
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
}
ds.AmbientContext = cfg.AmbientCtx
if ds.AmbientContext.Tracer == nil {
panic("no tracer set in AmbientCtx")
}
if cfg.nodeDescriptor != nil {
atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(cfg.nodeDescriptor))
}
rdb := cfg.RangeDescriptorDB
if rdb == nil {
rdb = ds
}
getRangeDescCacheSize := func() int64 {
return rangeDescriptorCacheSize.Get(&ds.st.SV)
}
ds.rangeCache = NewRangeDescriptorCache(ds.st, rdb, getRangeDescCacheSize)
ds.leaseHolderCache = NewLeaseHolderCache(getRangeDescCacheSize)
if tf := cfg.TestingKnobs.TransportFactory; tf != nil {
ds.transportFactory = tf
} else {
ds.transportFactory = GRPCTransportFactory
}
ds.rpcRetryOptions = base.DefaultRetryOptions()
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
}
if cfg.RPCContext == nil {
panic("no RPCContext set in DistSenderConfig")
}
ds.rpcContext = cfg.RPCContext
if ds.rpcRetryOptions.Closer == nil {
ds.rpcRetryOptions.Closer = ds.rpcContext.Stopper.ShouldQuiesce()
}
ds.clusterID = &cfg.RPCContext.ClusterID
ds.nodeDialer = cfg.NodeDialer
ds.asyncSenderSem = make(chan struct{}, defaultSenderConcurrency)
if g != nil {
ctx := ds.AnnotateCtx(context.Background())
g.RegisterCallback(gossip.KeyFirstRangeDescriptor,
func(_ string, value roachpb.Value) {
if atomic.LoadInt32(&ds.disableFirstRangeUpdates) == 1 {
return
}
if log.V(1) {
var desc roachpb.RangeDescriptor
if err := value.GetProto(&desc); err != nil {
log.Errorf(ctx, "unable to parse gossiped first range descriptor: %s", err)
} else {
log.Infof(ctx, "gossiped first range descriptor: %+v", desc.Replicas())
}
}
err := ds.rangeCache.EvictCachedRangeDescriptor(ctx, roachpb.RKeyMin, nil, false)
if err != nil {
log.Warningf(ctx, "failed to evict first range descriptor: %s", err)
}
})
}
return ds
}
// DisableFirstRangeUpdates disables updates of the first range via
// gossip. Used by tests which want finer control of the contents of the range
// cache.
func (ds *DistSender) DisableFirstRangeUpdates() {
atomic.StoreInt32(&ds.disableFirstRangeUpdates, 1)
}
// DisableParallelBatches instructs DistSender to never parallelize the
// transmission of partial batch requests across ranges.
func (ds *DistSender) DisableParallelBatches() {
ds.disableParallelBatches = true
}
// Metrics returns a struct which contains metrics related to the distributed
// sender's activity.
func (ds *DistSender) Metrics() DistSenderMetrics {
return ds.metrics
}
// RangeDescriptorCache gives access to the DistSender's range cache.
func (ds *DistSender) RangeDescriptorCache() *RangeDescriptorCache {
return ds.rangeCache
}
// LeaseHolderCache gives access to the DistSender's lease cache.
func (ds *DistSender) LeaseHolderCache() *LeaseHolderCache {
return ds.leaseHolderCache
}
// RangeLookup implements the RangeDescriptorDB interface. It uses LookupRange
// to perform a lookup scan for the provided key, using DistSender itself as the
// client.Sender. This means that the scan will recurse into DistSender, which
// will in turn use the RangeDescriptorCache again to lookup the RangeDescriptor
// necessary to perform the scan.
func (ds *DistSender) RangeLookup(
ctx context.Context, key roachpb.RKey, useReverseScan bool,
) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error) {
// We perform the range lookup scan with a READ_UNCOMMITTED consistency
// level because we want the scan to return intents as well as committed
// values. The reason for this is because it's not clear whether the intent
// or the previous value points to the correct location of the Range. It
// gets even more complicated when there are split-related intents or a txn
// record co-located with a replica involved in the split. Since we cannot
// know the correct answer, we lookup both the pre- and post- transaction
// values.
rc := roachpb.READ_UNCOMMITTED
// By using DistSender as the sender, we guarantee that even if the desired
// RangeDescriptor is not on the first range we send the lookup too, we'll
// still find it when we scan to the next range. This addresses the issue
// described in #18032 and #16266, allowing us to support meta2 splits.
return client.RangeLookup(ctx, ds, key.AsRawKey(), rc, rangeLookupPrefetchCount, useReverseScan)
}
// FirstRange implements the RangeDescriptorDB interface.
// FirstRange returns the RangeDescriptor for the first range on the cluster,
// which is retrieved from the gossip protocol instead of the datastore.
func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error) {
if ds.gossip == nil {
panic("with `nil` Gossip, DistSender must not use itself as rangeDescriptorDB")
}
rangeDesc := &roachpb.RangeDescriptor{}
if err := ds.gossip.GetInfoProto(gossip.KeyFirstRangeDescriptor, rangeDesc); err != nil {
return nil, firstRangeMissingError{}
}
return rangeDesc, nil
}
// getNodeDescriptor returns ds.nodeDescriptor, but makes an attempt to load
// it from the Gossip network if a nil value is found.
// We must jump through hoops here to get the node descriptor because it's not available
// until after the node has joined the gossip network and been allowed to initialize
// its stores.
func (ds *DistSender) getNodeDescriptor() *roachpb.NodeDescriptor {
if desc := atomic.LoadPointer(&ds.nodeDescriptor); desc != nil {
return (*roachpb.NodeDescriptor)(desc)
}
if ds.gossip == nil {
return nil
}
ownNodeID := ds.gossip.NodeID.Get()
if ownNodeID > 0 {
// TODO(tschottdorf): Consider instead adding the NodeID of the
// coordinator to the header, so we can get this from incoming
// requests. Just in case we want to mostly eliminate gossip here.
nodeDesc := &roachpb.NodeDescriptor{}
if err := ds.gossip.GetInfoProto(gossip.MakeNodeIDKey(ownNodeID), nodeDesc); err == nil {
atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(nodeDesc))
return nodeDesc
}
}
if log.V(1) {
ctx := ds.AnnotateCtx(context.TODO())
log.Infof(ctx, "unable to determine this node's attributes for replica "+
"selection; node is most likely bootstrapping")
}
return nil
}
// sendRPC sends one or more RPCs to replicas from the supplied
// roachpb.Replica slice. Returns an RPC error if the request could
// not be sent. Note that the reply may contain a higher level error
// and must be checked in addition to the RPC error.
//
// The replicas are assumed to be ordered by preference, with closer
// ones (i.e. expected lowest latency) first.
//
// See sendToReplicas for a description of the withCommit parameter.
func (ds *DistSender) sendRPC(
ctx context.Context,
ba roachpb.BatchRequest,
rangeID roachpb.RangeID,
replicas ReplicaSlice,
cachedLeaseHolder roachpb.ReplicaDescriptor,
withCommit bool,
) (*roachpb.BatchResponse, error) {
if len(replicas) == 0 {
return nil, roachpb.NewSendError(
fmt.Sprintf("no replica node addresses available via gossip for r%d", rangeID))
}
ba.RangeID = rangeID
tracing.AnnotateTrace()
defer tracing.AnnotateTrace()
return ds.sendToReplicas(
ctx,
ba,
SendOptions{metrics: &ds.metrics},
rangeID,
replicas,
ds.nodeDialer,
cachedLeaseHolder,
withCommit,
)
}
// CountRanges returns the number of ranges that encompass the given key span.
func (ds *DistSender) CountRanges(ctx context.Context, rs roachpb.RSpan) (int64, error) {
var count int64
ri := NewRangeIterator(ds)
for ri.Seek(ctx, rs.Key, Ascending); ri.Valid(); ri.Next(ctx) {
count++
if !ri.NeedAnother(rs) {
break
}
}
return count, ri.Error().GoError()
}
// getDescriptor looks up the range descriptor to use for a query of
// the key descKey with the given options. The lookup takes into
// consideration the last range descriptor that the caller had used
// for this key span, if any, and if the last range descriptor has
// been evicted because it was found to be stale, which is all managed
// through the EvictionToken. The function should be provided with an
// EvictionToken if one was acquired from this function on a previous
// call. If not, an empty EvictionToken can be provided.
//
// The range descriptor which contains the range in which the request should
// start its query is returned first. Next returned is an EvictionToken. In
// case the descriptor is discovered stale, the returned EvictionToken's evict
// method should be called; it evicts the cache appropriately.
func (ds *DistSender) getDescriptor(
ctx context.Context, descKey roachpb.RKey, evictToken *EvictionToken, useReverseScan bool,
) (*roachpb.RangeDescriptor, *EvictionToken, error) {
desc, returnToken, err := ds.rangeCache.LookupRangeDescriptorWithEvictionToken(
ctx, descKey, evictToken, useReverseScan,
)
if err != nil {
return nil, returnToken, err
}
return desc, returnToken, nil
}
// sendSingleRange gathers and rearranges the replicas, and makes an RPC call.
func (ds *DistSender) sendSingleRange(
ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor, withCommit bool,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Try to send the call.
replicas := NewReplicaSlice(ds.gossip, desc)
// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
var cachedLeaseHolder roachpb.ReplicaDescriptor
canSendToFollower := ds.clusterID != nil &&
CanSendToFollower(ds.clusterID.Get(), ds.st, ba)
if !canSendToFollower && ba.RequiresLeaseHolder() {
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok {
if i := replicas.FindReplica(storeID); i >= 0 {
replicas.MoveToFront(i)
cachedLeaseHolder = replicas[0].ReplicaDescriptor
}
}
}
if (cachedLeaseHolder == roachpb.ReplicaDescriptor{}) {
// Rearrange the replicas so that they're ordered in expectation of
// request latency.
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.rpcContext.RemoteClocks.Latency)
}
br, err := ds.sendRPC(ctx, ba, desc.RangeID, replicas, cachedLeaseHolder, withCommit)
if err != nil {
log.VErrEvent(ctx, 2, err.Error())
return nil, roachpb.NewError(err)
}
// If the reply contains a timestamp, update the local HLC with it.
if br.Error != nil && br.Error.Now != (hlc.Timestamp{}) {
ds.clock.Update(br.Error.Now)
} else if br.Now != (hlc.Timestamp{}) {
ds.clock.Update(br.Now)
}
// Untangle the error from the received response.
pErr := br.Error
br.Error = nil // scrub the response error
return br, pErr
}
// initAndVerifyBatch initializes timestamp-related information and
// verifies batch constraints before splitting.
func (ds *DistSender) initAndVerifyBatch(
ctx context.Context, ba *roachpb.BatchRequest,
) *roachpb.Error {
// Attach the local node ID to each request.
if ba.Header.GatewayNodeID == 0 && ds.gossip != nil {
ba.Header.GatewayNodeID = ds.gossip.NodeID.Get()
}
// In the event that timestamp isn't set and read consistency isn't
// required, set the timestamp using the local clock.
if ba.ReadConsistency != roachpb.CONSISTENT && ba.Timestamp == (hlc.Timestamp{}) {
ba.Timestamp = ds.clock.Now()
}
if len(ba.Requests) < 1 {
return roachpb.NewErrorf("empty batch")
}
if ba.MaxSpanRequestKeys != 0 {
// Verify that the batch contains only specific range requests or the
// Begin/EndTransactionRequest. Verify that a batch with a ReverseScan
// only contains ReverseScan range requests.
isReverse := ba.IsReverse()
for _, req := range ba.Requests {
inner := req.GetInner()
switch inner.(type) {
case *roachpb.ScanRequest, *roachpb.DeleteRangeRequest:
// Accepted range requests. All other range requests are still
// not supported. Note that ReverseScanRequest is _not_ handled here.
// TODO(vivek): don't enumerate all range requests.
if isReverse {
return roachpb.NewErrorf("batch with limit contains both forward and reverse scans")
}
case *roachpb.QueryIntentRequest, *roachpb.ResolveIntentRangeRequest:
continue
case *roachpb.BeginTransactionRequest, *roachpb.EndTransactionRequest, *roachpb.ReverseScanRequest:
continue
default:
return roachpb.NewErrorf("batch with limit contains %T request", inner)
}
}
}
// If ScanOptions is set the batch is only allowed to contain scans.
if ba.ScanOptions != nil {
for _, req := range ba.Requests {
switch req.GetInner().(type) {
case *roachpb.ScanRequest, *roachpb.ReverseScanRequest:
// Scans are supported.
case *roachpb.BeginTransactionRequest, *roachpb.EndTransactionRequest:
// These requests are ignored.
default:
return roachpb.NewErrorf("batch with scan option has non-scans: %s", ba)
}
}
// If both MaxSpanRequestKeys and MinResults are set, then they can't be
// contradictory.
if ba.Header.MaxSpanRequestKeys != 0 &&
ba.Header.MaxSpanRequestKeys < ba.Header.ScanOptions.MinResults {
return roachpb.NewErrorf("MaxSpanRequestKeys (%d) < MinResults (%d): %s",
ba.Header.MaxSpanRequestKeys, ba.Header.ScanOptions.MinResults, ba)
}
}
// Make sure that MVCCScan requests aren't in batch form if our cluster
// version is too old.
// TODO(jordan): delete this stanza after 2.1 is released.
if !ds.st.Version.IsActive(cluster.VersionBatchResponse) {
for i := range ba.Requests {
switch req := ba.Requests[i].GetInner().(type) {
case *roachpb.ScanRequest:
req.ScanFormat = roachpb.KEY_VALUES
case *roachpb.ReverseScanRequest:
req.ScanFormat = roachpb.KEY_VALUES
}
}
}
return nil
}
// errNo1PCTxn indicates that a batch cannot be sent as a 1 phase
// commit because it spans multiple ranges and must be split into at
// least two parts, with the final part containing the EndTransaction
// request.
var errNo1PCTxn = roachpb.NewErrorf("cannot send 1PC txn to multiple ranges")
// splitBatchAndCheckForRefreshSpans splits the batch according to the
// canSplitET parameter and checks whether the final request is an
// EndTransaction. If so, the EndTransactionRequest.NoRefreshSpans
// flag is reset to indicate whether earlier parts of the split may
// result in refresh spans.
func splitBatchAndCheckForRefreshSpans(
ba roachpb.BatchRequest, canSplitET bool,
) [][]roachpb.RequestUnion {
parts := ba.Split(canSplitET)
// If the final part contains an EndTransaction, we need to check
// whether earlier split parts contain any refresh spans and properly
// set the NoRefreshSpans flag on the end transaction.
lastPart := parts[len(parts)-1]
lastReq := lastPart[len(lastPart)-1].GetInner()
if et, ok := lastReq.(*roachpb.EndTransactionRequest); ok && et.NoRefreshSpans {
hasRefreshSpans := func() bool {
for _, part := range parts[:len(parts)-1] {
for _, req := range part {
if roachpb.NeedsRefresh(req.GetInner()) {
return true
}
}
}
return false
}()
if hasRefreshSpans {
etCopy := *et
etCopy.NoRefreshSpans = false
lastPart = append([]roachpb.RequestUnion(nil), lastPart...)
lastPart[len(lastPart)-1].MustSetInner(&etCopy)
parts[len(parts)-1] = lastPart
}
}
return parts
}
// Send implements the batch.Sender interface. It subdivides the Batch
// into batches admissible for sending (preventing certain illegal
// mixtures of requests), executes each individual part (which may
// span multiple ranges), and recombines the response.
//
// When the request spans ranges, it is split by range and a partial
// subset of the batch request is sent to affected ranges in parallel.
//
// Note that on error, this method will return any batch responses for
// successfully processed batch requests. This allows the caller to
// deal with potential retry situations where a batch is split so that
// EndTransaction is processed alone, after earlier requests in the
// batch succeeded. Where possible, the caller may be able to update
// spans encountered in the transaction and retry just the
// EndTransaction request to avoid client-side serializable txn retries.
func (ds *DistSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
ds.metrics.BatchCount.Inc(1)
tracing.AnnotateTrace()
// TODO(nvanbenschoten): This causes ba to escape to the heap. Either
// commit to passing BatchRequests by reference or return an updated
// value from this method instead.
if pErr := ds.initAndVerifyBatch(ctx, &ba); pErr != nil {
return nil, pErr
}
ctx = ds.AnnotateCtx(ctx)
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender send")
defer sp.Finish()
var rplChunks []*roachpb.BatchResponse
splitET := false
var require1PC bool
lastReq := ba.Requests[len(ba.Requests)-1].GetInner()
if et, ok := lastReq.(*roachpb.EndTransactionRequest); ok && et.Require1PC {
require1PC = true
}
// To ensure that we lay down intents to prevent starvation, always
// split the end transaction request into its own batch on retries.
// Txns requiring 1PC are an exception and should never be split.
if ba.Txn != nil && ba.Txn.Epoch > 0 && !require1PC {
splitET = true
}
parts := splitBatchAndCheckForRefreshSpans(ba, splitET)
if len(parts) > 1 && ba.MaxSpanRequestKeys != 0 {
// We already verified above that the batch contains only scan requests of the same type.
// Such a batch should never need splitting.
panic("batch with MaxSpanRequestKeys needs splitting")
}
var pErr *roachpb.Error
errIdxOffset := 0
for len(parts) > 0 {
part := parts[0]
ba.Requests = part
// The minimal key range encompassing all requests contained within.
// Local addressing has already been resolved.
// TODO(tschottdorf): consider rudimentary validation of the batch here
// (for example, non-range requests with EndKey, or empty key ranges).
rs, err := keys.Range(ba)
if err != nil {
return nil, roachpb.NewError(err)
}
// Determine whether this part of the BatchRequest contains a committing
// EndTransaction request.
var withCommit, withParallelCommit bool
if etArg, ok := ba.GetArg(roachpb.EndTransaction); ok {
et := etArg.(*roachpb.EndTransactionRequest)
withCommit = et.Commit
withParallelCommit = et.IsParallelCommit()
}
var rpl *roachpb.BatchResponse
if withParallelCommit {
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, 0 /* batchIdx */)
} else {
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, 0 /* batchIdx */)
}
if pErr == errNo1PCTxn {
// If we tried to send a single round-trip EndTransaction but
// it looks like it's going to hit multiple ranges, split it
// here and try again.
if len(parts) != 1 {
panic("EndTransaction not in last chunk of batch")
} else if require1PC {
log.Fatalf(ctx, "required 1PC transaction cannot be split: %s", ba)
}
parts = splitBatchAndCheckForRefreshSpans(ba, true /* split ET */)
// Restart transaction of the last chunk as multiple parts
// with EndTransaction in the last part.
continue
}
if pErr != nil {
if pErr.Index != nil && pErr.Index.Index != -1 {
pErr.Index.Index += int32(errIdxOffset)
}
// Break out of loop to collate batch responses received so far to
// return with error.
break
}
errIdxOffset += len(ba.Requests)
// Propagate transaction from last reply to next request. The final
// update is taken and put into the response's main header.
ba.UpdateTxn(rpl.Txn)
rplChunks = append(rplChunks, rpl)
parts = parts[1:]
}
var reply *roachpb.BatchResponse
if len(rplChunks) > 0 {
reply = rplChunks[0]
for _, rpl := range rplChunks[1:] {
reply.Responses = append(reply.Responses, rpl.Responses...)
reply.CollectedSpans = append(reply.CollectedSpans, rpl.CollectedSpans...)
}
lastHeader := rplChunks[len(rplChunks)-1].BatchResponse_Header
lastHeader.CollectedSpans = reply.CollectedSpans
reply.BatchResponse_Header = lastHeader
}
return reply, pErr
}
type response struct {
reply *roachpb.BatchResponse
positions []int
pErr *roachpb.Error
}
// divideAndSendParallelCommit divides a parallel-committing batch into
// sub-batches that can be evaluated in parallel but should not be evaluated
// on a Store together.
//
// The case where this comes up is if the batch is performing a parallel
// commit and the transaction has previously pipelined writes that have yet
// to be proven successful. In this scenario, the EndTransaction request
// will be preceded by a series of QueryIntent requests (see
// txn_pipeliner.go). Before evaluating, each of these QueryIntent requests
// will grab latches and wait for their corresponding write to finish. This
// is how the QueryIntent requests synchronize with the write they are
// trying to verify.
//
// If these QueryIntents remained in the same batch as the EndTransaction
// request then they would force the EndTransaction request to wait for the
// previous write before evaluating itself. This "pipeline stall" would
// effectively negate the benefit of the parallel commit. To avoid this, we
// make sure that these "pre-commit" QueryIntent requests are split from and
// issued concurrently with the rest of the parallel commit batch.
//
// batchIdx indicates which partial fragment of the larger batch is being
// processed by this method. Currently it is always set to zero because this
// method is never invoked recursively, but it is exposed to maintain symmetry
// with divideAndSendBatchToRanges.
func (ds *DistSender) divideAndSendParallelCommit(
ctx context.Context, ba roachpb.BatchRequest, rs roachpb.RSpan, batchIdx int,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// Search backwards, looking for the first pre-commit QueryIntent.
swapIdx := -1
lastIdx := len(ba.Requests) - 1
for i := lastIdx - 1; i >= 0; i-- {
req := ba.Requests[i].GetInner()
if req.Method() == roachpb.QueryIntent {
swapIdx = i
} else {
break
}
}
if swapIdx == -1 {
// No pre-commit QueryIntents. Nothing to split.
return ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
}
// Swap the EndTransaction request and the first pre-commit QueryIntent.
// This effectively creates a split point between the two groups of requests.
//
// Before: [put qi(1) put del qi(2) qi(3) qi(4) et]
// After: [put qi(1) put del et qi(3) qi(4) qi(2)]
// Separated: [put qi(1) put del et] [qi(3) qi(4) qi(2)]
//
// NOTE: the non-pre-commit QueryIntent's must remain where they are in the
// batch. These ensure that the transaction always reads its writes (see
// txnPipeliner.chainToInFlightWrites). These will introduce pipeline stalls
// and undo most of the benefit of this method, but luckily they are rare in
// practice.
swappedReqs := append([]roachpb.RequestUnion(nil), ba.Requests...)
swappedReqs[swapIdx], swappedReqs[lastIdx] = swappedReqs[lastIdx], swappedReqs[swapIdx]
// Create a new pre-commit QueryIntent-only batch and issue it
// in a non-limited async task. This batch may need to be split
// over multiple ranges, so call into divideAndSendBatchToRanges.
qiBa := ba
qiBa.Requests = swappedReqs[swapIdx+1:]
qiRS, err := keys.Range(qiBa)
if err != nil {
return br, roachpb.NewError(err)
}
qiBatchIdx := batchIdx + 1
qiResponseCh := make(chan response, 1)
runTask := ds.rpcContext.Stopper.RunAsyncTask
if ds.disableParallelBatches {
runTask = ds.rpcContext.Stopper.RunTask
}
if err := runTask(ctx, "kv.DistSender: sending pre-commit query intents", func(ctx context.Context) {
// Map response index to the original un-swapped batch index.
// Remember that we moved the last QueryIntent in this batch
// from swapIdx to the end.
//
// From the example above:
// Before: [put qi(1) put del qi(2) qi(3) qi(4) et]
// Separated: [put qi(1) put del et] [qi(3) qi(4) qi(2)]
//
// qiBa.Requests = [qi(3) qi(4) qi(2)]
// swapIdx = 4
// positions = [5 6 4]
//
positions := make([]int, len(qiBa.Requests))
positions[len(positions)-1] = swapIdx
for i := range positions[:len(positions)-1] {
positions[i] = swapIdx + 1 + i
}
// Send the batch with withCommit=true since it will be inflight
// concurrently with the EndTransaction batch below.
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, true /* withCommit */, qiBatchIdx)
qiResponseCh <- response{reply: reply, positions: positions, pErr: pErr}
}); err != nil {
return nil, roachpb.NewError(err)
}
// Adjust the original batch request to ignore the pre-commit
// QueryIntent requests. Make sure to determine the request's
// new key span.
ba.Requests = swappedReqs[:swapIdx+1]
rs, err = keys.Range(ba)
if err != nil {
return nil, roachpb.NewError(err)
}
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
// Wait for the QueryIntent-only batch to complete and stitch
// the responses together.
qiReply := <-qiResponseCh
// Handle error conditions.
if pErr != nil {
// The batch with the EndTransaction returned an error. Ignore errors
// from the pre-commit QueryIntent requests because that request is
// read-only and will produce the same errors next time, if applicable.
if qiReply.reply != nil {
pErr.UpdateTxn(qiReply.reply.Txn)
}
maybeSwapErrorIndex(pErr, swapIdx, lastIdx)
return nil, pErr
}
if qiPErr := qiReply.pErr; qiPErr != nil {
// The batch with the pre-commit QueryIntent requests returned an error.
// Wrap this in a MixedSuccessError, as we know that the EndTransaction
// batch succeeded. It is not possible for qiPErr to be a MixedSuccessError
// itself, so we don't need to handle that case like we do down below.
qiPErr.UpdateTxn(ba.Txn)
maybeSwapErrorIndex(qiPErr, swapIdx, lastIdx)
pErr := roachpb.NewError(&roachpb.MixedSuccessError{Wrapped: qiPErr})
pErr.Index = qiPErr.Index
return nil, pErr
}
// Both halves of the split batch succeeded. Piece them back together.
resps := make([]roachpb.ResponseUnion, len(swappedReqs))
copy(resps, br.Responses)
resps[swapIdx], resps[lastIdx] = resps[lastIdx], resps[swapIdx]
br.Responses = resps
if err := br.Combine(qiReply.reply, qiReply.positions); err != nil {
return nil, roachpb.NewError(err)
}
return br, nil
}
// maybeSwapErrorIndex swaps the error index from a to b or b to a if the
// error's index is set and is equal to one of these to values.
func maybeSwapErrorIndex(pErr *roachpb.Error, a, b int) {
if pErr.Index == nil {
return
}
if pErr.Index.Index == int32(a) {
pErr.Index.Index = int32(b)
} else if pErr.Index.Index == int32(b) {
pErr.Index.Index = int32(a)
}
}
// divideAndSendBatchToRanges sends the supplied batch to all of the
// ranges which comprise the span specified by rs. The batch request
// is trimmed against each range which is part of the span and sent
// either serially or in parallel, if possible.
//
// batchIdx indicates which partial fragment of the larger batch is
// being processed by this method. It's specified as non-zero when
// this method is invoked recursively.
//
// withCommit indicates that the batch contains a transaction commit
// or that a transaction commit is being run concurrently with this
// batch. Either way, if this is true then sendToReplicas will need
// to handle errors differently.
func (ds *DistSender) divideAndSendBatchToRanges(
ctx context.Context, ba roachpb.BatchRequest, rs roachpb.RSpan, withCommit bool, batchIdx int,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// Clone the BatchRequest's transaction so that future mutations to the
// proto don't affect the proto in this batch.
if ba.Txn != nil {
txnCopy := *ba.Txn
ba.Txn = &txnCopy
}
// Get initial seek key depending on direction of iteration.
var scanDir ScanDirection
var seekKey roachpb.RKey
if !ba.IsReverse() {
scanDir = Ascending
seekKey = rs.Key
} else {
scanDir = Descending
seekKey = rs.EndKey
}
ri := NewRangeIterator(ds)
ri.Seek(ctx, seekKey, scanDir)
if !ri.Valid() {
return nil, ri.Error()
}
// Take the fast path if this batch fits within a single range.
if !ri.NeedAnother(rs) {
resp := ds.sendPartialBatch(
ctx, ba, rs, ri.Desc(), ri.Token(), withCommit, batchIdx, false, /* needsTruncate */
)
return resp.reply, resp.pErr
}
// The batch spans ranges (according to our cached range descriptors).
// Verify that this is ok.
// TODO(tschottdorf): we should have a mechanism for discovering range
// merges (descriptor staleness will mostly go unnoticed), or we'll be
// turning single-range queries into multi-range queries for no good