-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
config.go
141 lines (121 loc) · 4.33 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package quotapool
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/redact"
)
// Option is used to configure a quotapool.
type Option interface {
apply(*config)
}
// AcquisitionFunc is used to configure a quotapool to call a function after
// an acquisition has occurred.
type AcquisitionFunc func(
ctx context.Context, poolName string, r Request, start time.Time,
)
// OnAcquisition creates an Option to configure a callback upon acquisition.
// It is often useful for recording metrics.
func OnAcquisition(f AcquisitionFunc) Option {
return optionFunc(func(cfg *config) {
cfg.onAcquisition = f
})
}
// OnWaitStartFunc is the prototype for functions called to notify the start or
// finish of a waiting period when a request is blocked.
type OnWaitStartFunc func(
ctx context.Context, poolName string, r Request,
)
// OnWaitStart creates an Option to configure a callback which is called when a
// request blocks and has to wait for quota.
func OnWaitStart(onStart OnWaitStartFunc) Option {
return optionFunc(func(cfg *config) {
cfg.onWaitStart = onStart
})
}
// OnWaitFinish creates an Option to configure a callback which is called when a
// previously blocked request acquires resources.
func OnWaitFinish(onFinish AcquisitionFunc) Option {
return optionFunc(func(cfg *config) {
cfg.onWaitFinish = onFinish
})
}
// OnSlowAcquisition creates an Option to configure a callback upon slow
// acquisitions. Only one OnSlowAcquisition may be used. If multiple are
// specified only the last will be used.
func OnSlowAcquisition(threshold time.Duration, f SlowAcquisitionFunc) Option {
return optionFunc(func(cfg *config) {
cfg.slowAcquisitionThreshold = threshold
cfg.onSlowAcquisition = f
})
}
// LogSlowAcquisition is a SlowAcquisitionFunc.
func LogSlowAcquisition(ctx context.Context, poolName string, r Request, start time.Time) func() {
log.Warningf(ctx, "have been waiting %s attempting to acquire %s quota",
timeutil.Since(start), redact.Safe(poolName))
return func() {
log.Infof(ctx, "acquired %s quota after %s",
redact.Safe(poolName), timeutil.Since(start))
}
}
// SlowAcquisitionFunc is used to configure a quotapool to call a function when
// quota acquisition is slow. The returned callback is called when the
// acquisition occurs.
type SlowAcquisitionFunc func(
ctx context.Context, poolName string, r Request, start time.Time,
) (onAcquire func())
type optionFunc func(cfg *config)
func (f optionFunc) apply(cfg *config) { f(cfg) }
// WithTimeSource is used to configure a quotapool to use the provided
// TimeSource.
func WithTimeSource(ts timeutil.TimeSource) Option {
return optionFunc(func(cfg *config) {
cfg.timeSource = ts
})
}
// WithCloser allows the client to provide a channel which will lead to the
// AbstractPool being closed.
func WithCloser(closer <-chan struct{}) Option {
return optionFunc(func(cfg *config) {
cfg.closer = closer
})
}
// WithMinimumWait is used with the RateLimiter to control the minimum duration
// which a goroutine will sleep waiting for quota to accumulate. This
// can help avoid expensive spinning when the workload consists of many
// small acquisitions. If used with a regular (not rate limiting) quotapool,
// this option has no effect.
func WithMinimumWait(duration time.Duration) Option {
return optionFunc(func(cfg *config) {
cfg.minimumWait = duration
})
}
type config struct {
onAcquisition AcquisitionFunc
onSlowAcquisition SlowAcquisitionFunc
onWaitStart OnWaitStartFunc
onWaitFinish AcquisitionFunc
slowAcquisitionThreshold time.Duration
timeSource timeutil.TimeSource
closer <-chan struct{}
minimumWait time.Duration
}
var defaultConfig = config{
timeSource: timeutil.DefaultTimeSource{},
}
func initializeConfig(cfg *config, options ...Option) {
*cfg = defaultConfig
for _, opt := range options {
opt.apply(cfg)
}
}