forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
work_queue.go
2195 lines (2028 loc) · 77.9 KB
/
work_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package admission
import (
"container/heap"
"context"
"fmt"
"math"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/redact"
)
// Use of the admission control package spans the SQL and KV layers. When
// running in a multi-tenant setting, we have per-tenant SQL-only servers and
// multi-tenant storage servers. These multi-tenant storage servers contain
// the multi-tenant KV layer, and the SQL layer for the system tenant. Most of
// the following settings are relevant to both kinds of servers (except for
// KVAdmissionControlEnabled). Only the system tenant can modify these
// settings in the storage servers, while a regular tenant can modify these
// settings for their SQL-only servers. Which is why these are typically
// TenantWritable.
// KVAdmissionControlEnabled controls whether KV server-side admission control
// is enabled.
var KVAdmissionControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.kv.enabled",
"when true, work performed by the KV layer is subject to admission control",
true).WithPublic()
// KVBulkOnlyAdmissionControlEnabled controls whether user (normal and above
// priority) work is subject to admission control. If it is set to true, then
// user work will not be throttled by admission control but bulk work still will
// be. This setting is a preferable alternative to completely disabling
// admission control. It can be used reactively in cases where index backfill,
// schema modifications or other bulk operations are causing high latency due to
// io_overload on nodes.
// TODO(baptist): Find a better solution to this in v23.1.
var KVBulkOnlyAdmissionControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.kv.bulk_only.enabled",
"when both admission.kv.enabled and this is true, only throttle bulk work",
false)
// SQLKVResponseAdmissionControlEnabled controls whether response processing
// in SQL, for KV requests, is enabled.
var SQLKVResponseAdmissionControlEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"admission.sql_kv_response.enabled",
"when true, work performed by the SQL layer when receiving a KV response is subject to "+
"admission control",
true).WithPublic()
// SQLSQLResponseAdmissionControlEnabled controls whether response processing
// in SQL, for DistSQL requests, is enabled.
var SQLSQLResponseAdmissionControlEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"admission.sql_sql_response.enabled",
"when true, work performed by the SQL layer when receiving a DistSQL response is subject "+
"to admission control",
true).WithPublic()
var admissionControlEnabledSettings = [numWorkKinds]*settings.BoolSetting{
KVWork: KVAdmissionControlEnabled,
SQLKVResponseWork: SQLKVResponseAdmissionControlEnabled,
SQLSQLResponseWork: SQLSQLResponseAdmissionControlEnabled,
}
// KVTenantWeightsEnabled controls whether tenant weights are enabled for KV
// admission control. This setting has no effect if admission.kv.enabled is
// false.
var KVTenantWeightsEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.kv.tenant_weights.enabled",
"when true, tenant weights are enabled for KV admission control",
false).WithPublic()
// KVStoresTenantWeightsEnabled controls whether tenant weights are enabled
// for KV-stores admission control. This setting has no effect if
// admission.kv.enabled is false.
var KVStoresTenantWeightsEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"admission.kv.stores.tenant_weights.enabled",
"when true, tenant weights are enabled for KV-stores admission control",
false).WithPublic()
// EpochLIFOEnabled controls whether the adaptive epoch-LIFO scheme is enabled
// for admission control. Is only relevant when the above admission control
// settings are also set to true. Unlike those settings, which are granular
// for each kind of admission queue, this setting applies to all the queues.
// This is because we recommend that all those settings be enabled or none be
// enabled, and we don't want to carry forward unnecessarily granular
// settings.
var EpochLIFOEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"admission.epoch_lifo.enabled",
"when true, epoch-LIFO behavior is enabled when there is significant delay in admission",
false).WithPublic()
var epochLIFOEpochDuration = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.epoch_lifo.epoch_duration",
"the duration of an epoch, for epoch-LIFO admission control ordering",
epochLength,
func(v time.Duration) error {
if v < time.Millisecond {
return errors.Errorf("epoch-LIFO: epoch duration is too small")
}
return nil
}).WithPublic()
var epochLIFOEpochClosingDeltaDuration = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.epoch_lifo.epoch_closing_delta_duration",
"the delta duration before closing an epoch, for epoch-LIFO admission control ordering",
epochClosingDelta,
func(v time.Duration) error {
if v < time.Millisecond {
return errors.Errorf("epoch-LIFO: epoch closing delta is too small")
}
return nil
}).WithPublic()
var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.epoch_lifo.queue_delay_threshold_to_switch_to_lifo",
"the queue delay encountered by a (tenant,priority) for switching to epoch-LIFO ordering",
maxQueueDelayToSwitchToLifo,
func(v time.Duration) error {
if v < time.Millisecond {
return errors.Errorf("epoch-LIFO: queue delay threshold is too small")
}
return nil
}).WithPublic()
var rangeSequencerGCThreshold = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.replication_control.range_sequencer_gc_threshold",
"the inactive duration for a range sequencer after it's garbage collected",
5*time.Minute,
settings.NonNegativeDuration,
)
// WorkInfo provides information that is used to order work within an WorkQueue.
// The WorkKind is not included as a field since an WorkQueue deals with a
// single WorkKind.
type WorkInfo struct {
// TenantID is the id of the tenant. For single-tenant clusters, this will
// always be the SystemTenantID.
TenantID roachpb.TenantID
// Priority is utilized within a tenant.
Priority admissionpb.WorkPriority
// CreateTime is equivalent to Time.UnixNano() at the creation time of this
// work or a parent work (e.g. could be the start time of the transaction,
// if this work was created as part of a transaction). It is used to order
// work within a (TenantID, Priority) pair -- earlier CreateTime is given
// preference.
CreateTime int64
// BypassAdmission allows the work to bypass admission control, but allows
// for it to be accounted for. Ignored unless TenantID is the
// SystemTenantID. It should be used for high-priority intra-KV work, and
// when KV work generates other KV work (to avoid deadlock). Ignored
// otherwise.
BypassAdmission bool
// RequestedCount is the requested number of tokens or slots. If unset:
// - For slot-based queues we treat it as an implicit request of 1;
// - For the store work queue, we use per-request estimates to deduct some
// number of tokens at-admit time. Note that this only applies to the
// legacy above-raft admission control. With admission control for
// replicated writes (done so asynchronously, below-raft; see
// ReplicatedWrite below), we do know the size of the write being
// admitted, so RequestedCount is set accordingly.
RequestedCount int64
// ReplicatedWorkInfo groups everything needed to admit replicated writes, done
// so asynchronously below-raft as part of replication admission control.
ReplicatedWorkInfo ReplicatedWorkInfo
}
// ReplicatedWorkInfo groups everything needed to admit replicated writes, done
// so asynchronously below-raft as part of replication admission control.
type ReplicatedWorkInfo struct {
// Enabled captures whether this work represents a replicated write,
// subject to below-raft asynchronous admission control.
Enabled bool
// RangeID identifies the raft group on behalf of which work is being
// admitted.
RangeID roachpb.RangeID
// Origin is the node at which this work originated. It's used for
// replication admission control to inform the origin of admitted work
// (after which flow tokens are released, permitting more replicated
// writes).
Origin roachpb.NodeID
// LogPosition is the point on the raft log where the write was replicated.
LogPosition LogPosition
// Ingested captures whether the write work corresponds to an ingest
// (for sstables, for example). This is used alongside RequestedCount to
// maintain accurate linear models for L0 growth due to ingests and
// regular write batches.
Ingested bool
}
// LogPosition is a point on the raft log, identified by a term and an index.
type LogPosition struct {
Term uint64
Index uint64
}
func (r LogPosition) String() string {
return fmt.Sprintf("%d/%d", r.Term, r.Index)
}
func (r LogPosition) Less(o LogPosition) bool {
if r.Term != o.Term {
return r.Term < o.Term
}
return r.Index < o.Index
}
// WorkQueue maintains a queue of work waiting to be admitted. Ordering of
// work is achieved via 2 heaps: a tenant heap orders the tenants with waiting
// work in increasing order of used slots or tokens, optionally adjusted by
// tenant weights. Within each tenant, the waiting work is ordered based on
// priority and create time. Tenants with non-zero values of used slots or
// tokens are tracked even if they have no more waiting work. Token usage is
// reset to zero every second. The choice of 1 second of memory for token
// distribution fairness is somewhat arbitrary. The same 1 second interval is
// also used to garbage collect tenants who have no waiting requests and no
// used slots or tokens.
//
// Usage example:
//
// var grantCoord *GrantCoordinator
// <initialize grantCoord>
// kvQueue := grantCoord.GetWorkQueue(KVWork)
// <hand kvQueue to the code that does kv server work>
//
// // Before starting some kv server work
// if enabled, err := kvQueue.Admit(ctx, WorkInfo{TenantID: tid, ...}); err != nil {
// return err
// }
// <do the work>
// if enabled {
// kvQueue.AdmittedWorkDone(tid)
// }
type WorkQueue struct {
ambientCtx context.Context
workKind WorkKind
granter granter
usesTokens bool
tiedToRange bool
usesAsyncAdmit bool
settings *cluster.Settings
onAdmittedReplicatedWork onAdmittedReplicatedWork
// Prevents more than one caller to be in Admit and calling tryGet or adding
// to the queue. It allows WorkQueue to release mu before calling tryGet and
// be assured that it is not competing with another Admit.
// Lock ordering is admitMu < mu.
admitMu syncutil.Mutex
mu struct {
syncutil.Mutex
// Tenants with waiting work.
tenantHeap tenantHeap
// All tenants, including those without waiting work. Periodically cleaned.
tenants map[uint64]*tenantInfo
tenantWeights struct {
mu syncutil.Mutex
// active refers to the currently active weights. mu is held for updates
// to the inactive weights, to prevent concurrent updates. After
// updating the inactive weights, it is made active by swapping with
// active, while also holding WorkQueue.mu. Therefore, reading
// tenantWeights.active does not require tenantWeights.mu. For lock
// ordering, tenantWeights.mu precedes WorkQueue.mu.
//
// The maps are lazily allocated.
active, inactive map[uint64]uint32
}
// The highest epoch that is closed.
closedEpochThreshold int64
// Following values are copied from the cluster settings.
epochLengthNanos int64
epochClosingDeltaNanos int64
maxQueueDelayToSwitchToLifo time.Duration
}
logThreshold log.EveryN
metrics *WorkQueueMetrics
stopCh chan struct{}
timeSource timeutil.TimeSource
knobs *TestingKnobs
}
var _ requester = &WorkQueue{}
type workQueueOptions struct {
usesTokens bool
tiedToRange bool
usesAsyncAdmit bool
// timeSource can be set to non-nil for tests. If nil,
// the timeutil.DefaultTimeSource will be used.
timeSource timeutil.TimeSource
// The epoch closing goroutine can be disabled for tests.
disableEpochClosingGoroutine bool
}
func makeWorkQueueOptions(workKind WorkKind) workQueueOptions {
switch workKind {
case KVWork:
// CPU bound KV work uses tokens. We also use KVWork for the per-store
// queues, which use tokens -- the caller overrides the usesTokens value
// in that case.
return workQueueOptions{usesTokens: false, tiedToRange: true}
case SQLKVResponseWork, SQLSQLResponseWork:
return workQueueOptions{usesTokens: true, tiedToRange: false}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
return workQueueOptions{usesTokens: false, tiedToRange: false}
default:
panic(errors.AssertionFailedf("unexpected workKind %d", workKind))
}
}
func makeWorkQueue(
ambientCtx log.AmbientContext,
workKind WorkKind,
granter granter,
settings *cluster.Settings,
metrics *WorkQueueMetrics,
opts workQueueOptions,
) requester {
q := &WorkQueue{}
initWorkQueue(q, ambientCtx, workKind, granter, settings, metrics, opts, nil)
return q
}
func initWorkQueue(
q *WorkQueue,
ambientCtx log.AmbientContext,
workKind WorkKind,
granter granter,
settings *cluster.Settings,
metrics *WorkQueueMetrics,
opts workQueueOptions,
knobs *TestingKnobs,
) {
if knobs == nil {
knobs = &TestingKnobs{}
}
stopCh := make(chan struct{})
timeSource := opts.timeSource
if timeSource == nil {
timeSource = timeutil.DefaultTimeSource{}
}
q.ambientCtx = ambientCtx.AnnotateCtx(context.Background())
q.workKind = workKind
q.granter = granter
q.usesTokens = opts.usesTokens
q.tiedToRange = opts.tiedToRange
q.usesAsyncAdmit = opts.usesAsyncAdmit
q.settings = settings
q.logThreshold = log.Every(5 * time.Minute)
q.metrics = metrics
q.stopCh = stopCh
q.timeSource = timeSource
q.knobs = knobs
func() {
q.mu.Lock()
defer q.mu.Unlock()
q.mu.tenants = make(map[uint64]*tenantInfo)
q.sampleEpochLIFOSettingsLocked()
}()
go func() {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
q.gcTenantsAndResetTokens()
case <-stopCh:
// Channel closed.
return
}
}
}()
q.tryCloseEpoch(q.timeNow())
if !opts.disableEpochClosingGoroutine {
q.startClosingEpochs()
}
}
func isInTenantHeap(tenant *tenantInfo) bool {
// If there is some waiting work, this tenant is in tenantHeap.
return len(tenant.waitingWorkHeap) > 0 || len(tenant.openEpochsHeap) > 0
}
func (q *WorkQueue) timeNow() time.Time {
return q.timeSource.Now()
}
func (q *WorkQueue) epochLIFOEnabled() bool {
// We don't use epoch LIFO for below-raft admission control. See I12 from
// kvflowcontrol/doc.go.
return EpochLIFOEnabled.Get(&q.settings.SV) && !q.usesAsyncAdmit
}
// Samples the latest cluster settings for epoch-LIFO.
func (q *WorkQueue) sampleEpochLIFOSettingsLocked() {
epochLengthNanos := int64(epochLIFOEpochDuration.Get(&q.settings.SV))
if epochLengthNanos != q.mu.epochLengthNanos {
// Reset what is closed. A proper closed value will be calculated when the
// next epoch closes. This ensures that if we are increasing the epoch
// length, we will regress what epoch number is closed. Meanwhile, all
// work subject to LIFO queueing will get queued in the openEpochsHeap,
// which is fine (we admit from there too).
q.mu.closedEpochThreshold = 0
}
q.mu.epochLengthNanos = epochLengthNanos
q.mu.epochClosingDeltaNanos = int64(epochLIFOEpochClosingDeltaDuration.Get(&q.settings.SV))
q.mu.maxQueueDelayToSwitchToLifo = epochLIFOQueueDelayThresholdToSwitchToLIFO.Get(&q.settings.SV)
}
func (q *WorkQueue) startClosingEpochs() {
go func() {
// If someone sets the epoch length to a huge value by mistake, we will
// still sample every second, so that we can adjust when they fix their
// mistake.
const maxTimerDur = time.Second
// This is the min duration we set the timer for, to avoid setting smaller
// and smaller timers, in case the timer fires slightly early.
const minTimerDur = time.Millisecond
var timer *time.Timer
for {
q.mu.Lock()
q.sampleEpochLIFOSettingsLocked()
nextCloseTime := q.nextEpochCloseTimeLocked()
q.mu.Unlock()
timeNow := q.timeNow()
timerDur := nextCloseTime.Sub(timeNow)
if timerDur > 0 {
if timerDur > maxTimerDur {
timerDur = maxTimerDur
} else if timerDur < minTimerDur {
timerDur = minTimerDur
}
if timer == nil {
timer = time.NewTimer(timerDur)
} else {
timer.Reset(timerDur)
}
select {
case <-timer.C:
case <-q.stopCh:
// Channel closed.
return
}
} else {
q.tryCloseEpoch(timeNow)
}
}
}()
}
func (q *WorkQueue) nextEpochCloseTimeLocked() time.Time {
// +2 since we need to advance the threshold by 1, and another 1 since the
// epoch closes at its end time.
timeUnixNanos :=
(q.mu.closedEpochThreshold+2)*q.mu.epochLengthNanos + q.mu.epochClosingDeltaNanos
return timeutil.Unix(0, timeUnixNanos)
}
func (q *WorkQueue) tryCloseEpoch(timeNow time.Time) {
epochLIFOEnabled := q.epochLIFOEnabled()
q.mu.Lock()
defer q.mu.Unlock()
epochClosingTimeNanos := timeNow.UnixNano() - q.mu.epochLengthNanos - q.mu.epochClosingDeltaNanos
epoch := epochForTimeNanos(epochClosingTimeNanos, q.mu.epochLengthNanos)
if epoch <= q.mu.closedEpochThreshold {
return
}
q.mu.closedEpochThreshold = epoch
doLog := q.logThreshold.ShouldLog()
for _, tenant := range q.mu.tenants {
prevThreshold := tenant.fifoPriorityThreshold
tenant.fifoPriorityThreshold =
tenant.priorityStates.getFIFOPriorityThresholdAndReset(
tenant.fifoPriorityThreshold, q.mu.epochLengthNanos, q.mu.maxQueueDelayToSwitchToLifo)
if !epochLIFOEnabled {
tenant.fifoPriorityThreshold = int(admissionpb.LowPri)
}
if tenant.fifoPriorityThreshold != prevThreshold || doLog {
logVerb := redact.SafeString("is")
if tenant.fifoPriorityThreshold != prevThreshold {
logVerb = "changed to"
}
// TODO(sumeer): export this as a per-tenant metric somehow. We could
// start with this being a per-WorkQueue metric for only the system
// tenant. However, currently we share metrics across WorkQueues --
// specifically all the store WorkQueues share the same metric. We
// should eliminate that sharing and make those per store metrics.
log.Infof(q.ambientCtx, "%s: FIFO threshold for tenant %d %s %d",
workKindString(q.workKind), tenant.id, logVerb, tenant.fifoPriorityThreshold)
}
// Note that we are ignoring the new priority threshold and only
// dequeueing the ones that are in the closed epoch. It is possible to
// have work items that are not in the closed epoch and whose priority
// makes them no longer subject to LIFO, but they will need to wait here
// until their epochs close. This is considered acceptable since the
// priority threshold should not fluctuate rapidly.
for len(tenant.openEpochsHeap) > 0 {
work := tenant.openEpochsHeap[0]
if work.epoch > epoch {
break
}
heap.Pop(&tenant.openEpochsHeap)
heap.Push(&tenant.waitingWorkHeap, work)
}
}
}
// Admit is called when requesting admission for some work. If err!=nil, the
// request was not admitted, potentially due to the deadline being exceeded.
// The enabled return value is relevant when err=nil, and represents whether
// admission control is enabled. AdmittedWorkDone must be called iff
// enabled=true && err!=nil, and the WorkKind for this queue uses slots.
func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err error) {
if !info.ReplicatedWorkInfo.Enabled {
enabledSetting := admissionControlEnabledSettings[q.workKind]
if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) {
return false, nil
}
}
// TODO(irfansharif): When enabling replication admission control for
// regular writes with arbitrary concurrency (part of #95563), measure
// the memory overhead of enqueueing each raft command to see whether we
// need to do some coalescing at this level.
if info.RequestedCount == 0 {
// We treat unset RequestCounts as an implicit request of 1.
info.RequestedCount = 1
}
if !q.usesTokens && info.RequestedCount != 1 {
panic(errors.AssertionFailedf("unexpected RequestedCount: %d", info.RequestedCount))
}
q.metrics.incRequested(info.Priority)
tenantID := info.TenantID.ToUint64()
// The code in this method does not use defer to unlock the mutexes because
// it needs the flexibility of selectively unlocking one of these on a
// certain code path. When changing the code, be careful in making sure the
// mutexes are properly unlocked on all code paths.
q.admitMu.Lock()
q.mu.Lock()
tenant, ok := q.mu.tenants[tenantID]
if !ok {
tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID))
q.mu.tenants[tenantID] = tenant
}
if info.ReplicatedWorkInfo.Enabled {
if info.BypassAdmission {
// TODO(irfansharif): "Admin" work (like splits, scatters, lease
// transfers, etc.), and work originating from AdmissionHeader_OTHER,
// don't use flow control tokens above-raft. So there's nothing to
// virtually enqueue below-raft, since we have nothing to return. That
// said, it might still be useful to physically admit these proposals
// for correct token modeling. To do that, we'd have to pass down
// information about it being bypassed above-raft.
panic("unexpected BypassAdmission bit set for below raft admission")
}
if !q.usesTokens {
panic("unexpected ReplicatedWrite.Enabled on slot-based queue")
}
}
if info.BypassAdmission && roachpb.IsSystemTenantID(tenantID) && q.workKind == KVWork {
tenant.used += uint64(info.RequestedCount)
if isInTenantHeap(tenant) {
q.mu.tenantHeap.fix(tenant)
}
q.mu.Unlock()
q.admitMu.Unlock()
q.granter.tookWithoutPermission(info.RequestedCount)
q.metrics.incAdmitted(info.Priority)
return true, nil
}
// Work is subject to admission control.
// Tell priorityStates about this received work. We don't tell it about work
// that has bypassed admission control, since priorityStates is deciding the
// threshold for LIFO queueing based on observed admission latency.
tenant.priorityStates.requestAtPriority(info.Priority)
if len(q.mu.tenantHeap) == 0 && !q.knobs.DisableWorkQueueFastPath {
// Fast-path. Try to grab token/slot.
// Optimistically update used to avoid locking again.
tenant.used += uint64(info.RequestedCount)
q.mu.Unlock()
if q.granter.tryGet(info.RequestedCount) {
q.admitMu.Unlock()
q.metrics.incAdmitted(info.Priority)
if info.ReplicatedWorkInfo.Enabled {
// TODO(irfansharif): There's a race here, and could lead to
// over-admission. It's possible that there are enqueued work
// items with lower log positions than the request that just got
// through using the fast-path, and since we're returning flow
// tokens by specifying a log prefix, we'd be returning more
// flow tokens than actually admitted. Fix it as part of #95563,
// by either adding more synchronization, getting rid of this
// fast path, or swapping this entry from the top-most one in
// the waiting heap (and fixing the heap).
if log.V(1) {
log.Infof(ctx, "fast-path: admitting t%d pri=%s r%s origin=n%s log-position=%s ingested=%t",
tenantID, info.Priority,
info.ReplicatedWorkInfo.RangeID,
info.ReplicatedWorkInfo.Origin,
info.ReplicatedWorkInfo.LogPosition.String(),
info.ReplicatedWorkInfo.Ingested,
)
}
q.onAdmittedReplicatedWork.admittedReplicatedWork(
roachpb.MustMakeTenantID(tenantID),
info.Priority,
info.ReplicatedWorkInfo,
info.RequestedCount,
info.CreateTime,
false, /* coordMuLocked */
)
}
return true, nil
}
// Did not get token/slot.
//
// There is a race here: before q.mu is acquired, the granter could
// experience a reduction in load and call
// WorkQueue.hasWaitingRequests to see if it should grant, but since
// there is nothing in the queue that method will return false. Then the
// work here queues up even though granter has spare capacity. We could
// add additional synchronization (and complexity to the granter
// interface) to deal with this, by keeping the granter's lock
// (GrantCoordinator.mu) locked when returning from tryGrant and call
// granter again to release that lock after this work has been queued.
// But it has the downside of extending the scope of
// GrantCoordinator.mu. Instead we tolerate this race in the knowledge
// that GrantCoordinator will periodically, at a high frequency, look at
// the state of the requesters to see if there is any queued work that
// can be granted admission.
q.mu.Lock()
prevTenant := tenant
// The tenant could have been removed when using tokens. See the comment
// where the tenantInfo struct is declared.
tenant, ok = q.mu.tenants[tenantID]
if !q.usesTokens {
if !ok || prevTenant != tenant {
panic("prev tenantInfo no longer in map")
}
if tenant.used < uint64(info.RequestedCount) {
panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d",
tenant.used, info.RequestedCount))
}
tenant.used -= uint64(info.RequestedCount)
} else {
if !ok {
tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID))
q.mu.tenants[tenantID] = tenant
}
// Don't want to overflow tenant.used if it is already 0 because of
// being reset to 0 by the GC goroutine.
if tenant.used >= uint64(info.RequestedCount) {
tenant.used -= uint64(info.RequestedCount)
}
}
}
// Check for cancellation.
startTime := q.timeNow()
if ctx.Err() != nil {
if info.ReplicatedWorkInfo.Enabled {
panic("not equipped to deal with cancelable contexts below raft")
}
// Already canceled. More likely to happen if cpu starvation is
// causing entering into the work queue to be delayed.
q.mu.Unlock()
q.admitMu.Unlock()
q.metrics.incErrored(info.Priority)
deadline, _ := ctx.Deadline()
return true,
errors.Newf("work %s deadline already expired: deadline: %v, now: %v",
workKindString(q.workKind), deadline, startTime)
}
// Push onto heap(s).
ordering := fifoWorkOrdering
if int(info.Priority) < tenant.fifoPriorityThreshold {
ordering = lifoWorkOrdering
}
work := newWaitingWork(info.Priority, ordering, info.CreateTime, info.RequestedCount, startTime, q.mu.epochLengthNanos)
work.replicated = info.ReplicatedWorkInfo
inTenantHeap := isInTenantHeap(tenant)
if work.epoch <= q.mu.closedEpochThreshold || ordering == fifoWorkOrdering {
heap.Push(&tenant.waitingWorkHeap, work)
} else {
heap.Push(&tenant.openEpochsHeap, work)
}
if !inTenantHeap {
heap.Push(&q.mu.tenantHeap, tenant)
}
// Else already in tenantHeap.
// Release all locks.
q.mu.Unlock()
q.admitMu.Unlock()
q.metrics.recordStartWait(info.Priority)
if info.ReplicatedWorkInfo.Enabled {
if log.V(1) {
log.Infof(ctx, "async-path: len(waiting-work)=%d: enqueued t%d pri=%s r%s origin=n%s log-position=%s ingested=%t",
tenant.waitingWorkHeap.Len(),
tenant.id, info.Priority,
info.ReplicatedWorkInfo.RangeID,
info.ReplicatedWorkInfo.Origin,
info.ReplicatedWorkInfo.LogPosition,
info.ReplicatedWorkInfo.Ingested,
)
}
return // return without waiting (admission is asynchronous)
}
// Start waiting for admission.
defer releaseWaitingWork(work)
select {
case <-ctx.Done():
waitDur := q.timeNow().Sub(startTime)
q.mu.Lock()
// The work was cancelled, so waitDur is less than the wait time this work
// would have encountered if it actually waited until admission. However,
// this lower bound is still useful for calculating the FIFO=>LIFO switch
// since it is possible that all work at this priority is exceeding the
// deadline and being cancelled. The risk here is that if the deadlines
// are too short, we could underestimate the actual wait time.
tenant.priorityStates.updateDelayLocked(work.priority, waitDur, true /* canceled */)
if work.heapIndex == -1 {
// No longer in heap. Raced with token/slot grant.
if !q.usesTokens {
if tenant.used < uint64(info.RequestedCount) {
panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d",
tenant.used, info.RequestedCount))
}
tenant.used -= uint64(info.RequestedCount)
}
// Else, we don't decrement tenant.used since we don't want to race with
// the gc goroutine that will set used=0.
q.mu.Unlock()
q.granter.returnGrant(info.RequestedCount)
// The channel is sent to after releasing mu, so we don't need to hold
// mu when receiving from it. Additionally, we've already called
// returnGrant so we're not holding back future grant chains if this one
// chain gets terminated.
chainID := <-work.ch
q.granter.continueGrantChain(chainID)
} else {
if work.inWaitingWorkHeap {
tenant.waitingWorkHeap.remove(work)
} else {
tenant.openEpochsHeap.remove(work)
}
if !isInTenantHeap(tenant) {
q.mu.tenantHeap.remove(tenant)
}
q.mu.Unlock()
}
q.metrics.incErrored(info.Priority)
q.metrics.recordFinishWait(info.Priority, waitDur)
deadline, _ := ctx.Deadline()
log.Eventf(ctx, "deadline expired, waited in %s queue for %v",
workKindString(q.workKind), waitDur)
return true,
errors.Newf("work %s deadline expired while waiting: deadline: %v, start: %v, dur: %v",
workKindString(q.workKind), deadline, startTime, waitDur)
case chainID, ok := <-work.ch:
if !ok {
panic(errors.AssertionFailedf("channel should not be closed"))
}
q.metrics.incAdmitted(info.Priority)
waitDur := q.timeNow().Sub(startTime)
q.metrics.recordFinishWait(info.Priority, waitDur)
if work.heapIndex != -1 {
panic(errors.AssertionFailedf("grantee should be removed from heap"))
}
log.Eventf(ctx, "admitted, waited in %s queue for %v", workKindString(q.workKind), waitDur)
q.granter.continueGrantChain(chainID)
return true, nil
}
}
// AdmittedWorkDone is used to inform the WorkQueue that some admitted work is
// finished. It must be called iff the WorkKind of this WorkQueue uses slots
// (not tokens), i.e., KVWork, SQLStatementLeafStartWork,
// SQLStatementRootStartWork.
func (q *WorkQueue) AdmittedWorkDone(tenantID roachpb.TenantID) {
if q.usesTokens {
panic(errors.AssertionFailedf("tokens should not be returned"))
}
// Single slot is allocated for the work.
q.mu.Lock()
tenant, ok := q.mu.tenants[tenantID.ToUint64()]
if !ok {
panic(errors.AssertionFailedf("tenant not found"))
}
tenant.used--
if isInTenantHeap(tenant) {
q.mu.tenantHeap.fix(tenant)
}
q.mu.Unlock()
q.granter.returnGrant(1)
}
func (q *WorkQueue) hasWaitingRequests() bool {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.mu.tenantHeap) > 0
}
func (q *WorkQueue) granted(grantChainID grantChainID) int64 {
// Reduce critical section by getting time before mutex acquisition.
now := q.timeNow()
q.mu.Lock()
if len(q.mu.tenantHeap) == 0 {
q.mu.Unlock()
return 0
}
if fn := q.knobs.DisableWorkQueueGranting; fn != nil && fn() {
q.mu.Unlock()
return 0
}
tenant := q.mu.tenantHeap[0]
var item *waitingWork
if len(tenant.waitingWorkHeap) > 0 {
item = heap.Pop(&tenant.waitingWorkHeap).(*waitingWork)
} else {
item = heap.Pop(&tenant.openEpochsHeap).(*waitingWork)
}
waitDur := now.Sub(item.enqueueingTime)
tenant.priorityStates.updateDelayLocked(item.priority, waitDur, false /* canceled */)
tenant.used += uint64(item.requestedCount)
if isInTenantHeap(tenant) {
q.mu.tenantHeap.fix(tenant)
} else {
q.mu.tenantHeap.remove(tenant)
}
// Get the value of requestedCount before releasing the mutex, since after
// releasing Admit can notice that item is no longer in the heap and call
// releaseWaitingWork to return item to the waitingWorkPool.
requestedCount := item.requestedCount
q.mu.Unlock()
if !item.replicated.Enabled {
// Reduce critical section by sending on channel after releasing mutex.
item.ch <- grantChainID
} else {
// NB: We don't use grant chains for store tokens, so they don't apply
// to replicated writes.
if log.V(1) {
log.Infof(q.ambientCtx, "async-path: len(waiting-work)=%d dequeued t%d pri=%s r%s origin=n%s log-position=%s ingested=%t",
tenant.waitingWorkHeap.Len(),
tenant.id, item.priority,
item.replicated.RangeID,
item.replicated.Origin,
item.replicated.LogPosition,
item.replicated.Ingested,
)
}
defer releaseWaitingWork(item)
q.onAdmittedReplicatedWork.admittedReplicatedWork(
roachpb.MustMakeTenantID(tenant.id),
item.priority,
item.replicated,
item.requestedCount,
item.createTime,
true, /* coordMuLocked */
)
q.metrics.incAdmitted(item.priority)
waitDur := q.timeNow().Sub(item.enqueueingTime)
q.metrics.recordFinishWait(item.priority, waitDur)
if item.heapIndex != -1 {
panic(errors.AssertionFailedf("grantee should be removed from heap"))
}
}
return requestedCount
}
func (q *WorkQueue) gcTenantsAndResetTokens() {
q.mu.Lock()
defer q.mu.Unlock()
// With large numbers of active tenants, this iteration could hold the lock
// longer than desired. We could break this iteration into smaller parts if
// needed.
for id, info := range q.mu.tenants {
if info.used == 0 && !isInTenantHeap(info) {
delete(q.mu.tenants, id)
releaseTenantInfo(info)
} else if q.usesTokens {
info.used = 0
// All the heap members will reset used=0, so no need to change heap
// ordering.
}
}
}
// adjustTenantTokens is used internally by StoreWorkQueue. The
// additionalTokensNeeded count can be negative, in which case it is returning
// tokens. This is only for WorkQueue's own accounting -- it should not call
// into granter.
func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokensNeeded int64) {
tid := tenantID.ToUint64()
q.mu.Lock()
defer q.mu.Unlock()
tenant, ok := q.mu.tenants[tid]
if ok {
if additionalTokensNeeded < 0 {
toReturn := uint64(-additionalTokensNeeded)
if tenant.used < toReturn {
tenant.used = 0
} else {
tenant.used -= toReturn
}
} else {
tenant.used += uint64(additionalTokensNeeded)
}
}
}
func (q *WorkQueue) String() string {
return redact.StringWithoutMarkers(q)
}
// SafeFormat implements the redact.SafeFormatter interface.
func (q *WorkQueue) SafeFormat(s redact.SafePrinter, _ rune) {
q.mu.Lock()
defer q.mu.Unlock()
s.Printf("closed epoch: %d ", q.mu.closedEpochThreshold)
s.Printf("tenantHeap len: %d", len(q.mu.tenantHeap))
if len(q.mu.tenantHeap) > 0 {
s.Printf(" top tenant: %d", q.mu.tenantHeap[0].id)
}
var ids []uint64
for id := range q.mu.tenants {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
for _, id := range ids {
tenant := q.mu.tenants[id]
s.Printf("\n tenant-id: %d used: %d, w: %d, fifo: %d", tenant.id, tenant.used,
tenant.weight, tenant.fifoPriorityThreshold)
if len(tenant.waitingWorkHeap) > 0 {
s.Printf(" waiting work heap:")
for i := range tenant.waitingWorkHeap {
var workOrdering string
if tenant.waitingWorkHeap[i].arrivalTimeWorkOrdering == lifoWorkOrdering {
workOrdering = ", lifo-ordering"
}
s.Printf(" [%d: pri: %d, ct: %d, epoch: %d, qt: %d%s]", i,
tenant.waitingWorkHeap[i].priority,
tenant.waitingWorkHeap[i].createTime/int64(time.Millisecond),
tenant.waitingWorkHeap[i].epoch,
tenant.waitingWorkHeap[i].enqueueingTime.UnixNano()/int64(time.Millisecond), workOrdering)
}
}
if len(tenant.openEpochsHeap) > 0 {
s.Printf(" open epochs heap:")
for i := range tenant.openEpochsHeap {
s.Printf(" [%d: pri: %d, ct: %d, epoch: %d, qt: %d]", i,
tenant.openEpochsHeap[i].priority,