From 4c9110f7040b1d911e5d02760fe7ceacc24413ca Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 26 Aug 2020 16:55:58 -0400 Subject: [PATCH] kv: split rate limits and metrics for read and write requests This commit splits the existing request rate limit into two categories, read requests and write requests. Experimentation has shown that the fixed cost of a request is dramatically different between these two categories, primarily because write requests need to go through Raft while read requests do not. By splitting the limits and metrics along this dimension, we expect to be able to more accurately model the cost of KV traffic and more effectively tune rate limits. In making the split, the commit replaces the existing metric: ``` kv.tenant_rate_limit.requests_admitted ``` with the following two new metrics: ``` kv.tenant_rate_limit.read_requests_admitted kv.tenant_rate_limit.write_requests_admitted ``` The commit also replaced the existing two settings: ``` kv.tenant_rate_limiter.requests.rate_limit kv.tenant_rate_limiter.request.burst_limit ``` with the following four new settings: ``` kv.tenant_rate_limiter.read_requests.rate_limit kv.tenant_rate_limiter.read_requests.burst_limit kv.tenant_rate_limiter.write_requests.rate_limit kv.tenant_rate_limiter.write_requests.burst_limit ``` Release justification: Low-risk, high benefit change. --- pkg/kv/kvserver/client_tenant_test.go | 12 +-- pkg/kv/kvserver/replica_rate_limit.go | 2 +- pkg/kv/kvserver/tenantrate/helpers_test.go | 6 +- pkg/kv/kvserver/tenantrate/limiter.go | 79 +++++++++++++------- pkg/kv/kvserver/tenantrate/limiter_test.go | 18 +++-- pkg/kv/kvserver/tenantrate/metrics.go | 67 ++++++++++------- pkg/kv/kvserver/tenantrate/settings.go | 45 +++++++---- pkg/kv/kvserver/tenantrate/system_limiter.go | 8 +- pkg/kv/kvserver/tenantrate/testdata/basic | 56 ++++++++------ pkg/kv/kvserver/tenantrate/testdata/burst | 13 ++-- pkg/kv/kvserver/tenantrate/testdata/cancel | 13 ++-- pkg/kv/kvserver/tenantrate/testdata/reads | 4 +- pkg/kv/kvserver/tenantrate/testdata/update | 11 +-- pkg/sql/logictest/logic.go | 6 +- pkg/ts/catalog/chart_catalog.go | 14 +++- 15 files changed, 219 insertions(+), 135 deletions(-) diff --git a/pkg/kv/kvserver/client_tenant_test.go b/pkg/kv/kvserver/client_tenant_test.go index 2ed7fd887d9e..e824ff1eb521 100644 --- a/pkg/kv/kvserver/client_tenant_test.go +++ b/pkg/kv/kvserver/client_tenant_test.go @@ -175,20 +175,20 @@ func TestTenantRateLimiter(t *testing.T) { // the tenant range. tenantCtx := roachpb.NewContextForTenant(ctx, tenantID) cfg := tenantrate.LimitConfigsFromSettings(s.ClusterSettings()) - for i := 0; i < int(cfg.Requests.Burst); i++ { + for i := 0; i < int(cfg.WriteRequests.Burst); i++ { require.NoError(t, db.Put(ctx, mkKey(), 0)) } // Now ensure that in the same instant the write QPS limit does affect the // tenant. Issuing up to the burst limit of requests can happen without // blocking. - for i := 0; i < int(cfg.Requests.Burst); i++ { + for i := 0; i < int(cfg.WriteRequests.Burst); i++ { require.NoError(t, db.Put(tenantCtx, mkKey(), 0)) } // Attempt to issue another request, make sure that it gets blocked by // observing a timer. errCh := make(chan error, 1) go func() { errCh <- db.Put(tenantCtx, mkKey(), 0) }() - expectedTimer := t0.Add(time.Duration(float64(1/cfg.Requests.Rate) * float64(time.Second))) + expectedTimer := t0.Add(time.Duration(float64(1/cfg.WriteRequests.Rate) * float64(time.Second))) testutils.SucceedsSoon(t, func() error { timers := timeSource.Timers() if len(timers) != 1 { @@ -213,18 +213,18 @@ func TestTenantRateLimiter(t *testing.T) { return string(read) } makeMetricStr := func(expCount int64) string { - const tenantMetricStr = `kv_tenant_rate_limit_requests_admitted{store="1",tenant_id="10"}` + const tenantMetricStr = `kv_tenant_rate_limit_write_requests_admitted{store="1",tenant_id="10"}` return fmt.Sprintf("%s %d", tenantMetricStr, expCount) } // Ensure that the metric for the admitted requests is equal to the number of // requests which we've admitted. - require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst)) + require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst)) // Allow the blocked request to proceed. timeSource.Advance(time.Second) require.NoError(t, <-errCh) // Ensure that it is now reflected in the metrics. - require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst+1)) + require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst+1)) } diff --git a/pkg/kv/kvserver/replica_rate_limit.go b/pkg/kv/kvserver/replica_rate_limit.go index b53149271f26..8e5efa7848ef 100644 --- a/pkg/kv/kvserver/replica_rate_limit.go +++ b/pkg/kv/kvserver/replica_rate_limit.go @@ -27,7 +27,7 @@ func (r *Replica) maybeRateLimitBatch(ctx context.Context, ba *roachpb.BatchRequ if !ok || tenantID == roachpb.SystemTenantID { return nil } - return r.tenantLimiter.Wait(ctx, bytesWrittenFromRequest(ba)) + return r.tenantLimiter.Wait(ctx, ba.IsWrite(), bytesWrittenFromRequest(ba)) } // bytesWrittenFromBatchRequest returns an approximation of the number of bytes diff --git a/pkg/kv/kvserver/tenantrate/helpers_test.go b/pkg/kv/kvserver/tenantrate/helpers_test.go index b8a02b4ecbb2..81f29ec0ff94 100644 --- a/pkg/kv/kvserver/tenantrate/helpers_test.go +++ b/pkg/kv/kvserver/tenantrate/helpers_test.go @@ -15,8 +15,10 @@ import "github.com/cockroachdb/cockroach/pkg/settings/cluster" // OverrideSettingsWithRateLimits utilizes LimitConfigs from the values stored in the // settings. func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) { - requestRateLimit.Override(&settings.SV, float64(rl.Requests.Rate)) - requestBurstLimit.Override(&settings.SV, rl.Requests.Burst) + readRequestRateLimit.Override(&settings.SV, float64(rl.ReadRequests.Rate)) + readRequestBurstLimit.Override(&settings.SV, rl.ReadRequests.Burst) + writeRequestRateLimit.Override(&settings.SV, float64(rl.WriteRequests.Rate)) + writeRequestBurstLimit.Override(&settings.SV, rl.WriteRequests.Burst) readRateLimit.Override(&settings.SV, int64(rl.ReadBytes.Rate)) readBurstLimit.Override(&settings.SV, rl.ReadBytes.Burst) writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate)) diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go index a6fc7e14a2ec..5276840e0f22 100644 --- a/pkg/kv/kvserver/tenantrate/limiter.go +++ b/pkg/kv/kvserver/tenantrate/limiter.go @@ -48,14 +48,15 @@ import ( type Limiter interface { // Wait acquires n quota from the limiter. This acquisition cannot be - // released. Each call to wait will consume 1 read byte, 1 request, and - // writeBytes from the token buckets. Calls to Wait will block until the - // buckets contain adequate resources. If a request attempts to write more - // than the burst limit, it will wait until the bucket is completely full - // before acquiring the requested quantity and putting the limiter in debt. + // released. Each call to wait will consume 1 read or write request + // depending on isWrite, 1 read byte, and writeBytes from the token buckets. + // Calls to Wait will block until the buckets contain adequate resources. If + // a request attempts to write more than the burst limit, it will wait until + // the bucket is completely full before acquiring the requested quantity and + // putting the limiter in debt. // // The only errors which should be returned are due to the context. - Wait(ctx context.Context, writeBytes int64) error + Wait(ctx context.Context, isWrite bool, writeBytes int64) error // RecordRead subtracts the bytes read by a request from the token bucket. // This call may push the Limiter into debt in the ReadBytes dimensions @@ -85,28 +86,38 @@ func (rl *limiter) init( metrics: metrics, } buckets := tokenBuckets{ - requests: makeTokenBucket(conf.Requests), - readBytes: makeTokenBucket(conf.ReadBytes), - writeBytes: makeTokenBucket(conf.WriteBytes), + readRequests: makeTokenBucket(conf.ReadRequests), + writeRequests: makeTokenBucket(conf.WriteRequests), + readBytes: makeTokenBucket(conf.ReadBytes), + writeBytes: makeTokenBucket(conf.WriteBytes), } options = append(options, quotapool.OnAcquisition(func( ctx context.Context, poolName string, r quotapool.Request, start time.Time, ) { req := r.(*waitRequest) + if req.readRequests > 0 { + rl.metrics.readRequestsAdmitted.Inc(req.readRequests) + } + if req.writeRequests > 0 { + rl.metrics.writeRequestsAdmitted.Inc(req.writeRequests) + } + // Accounted for in limiter.RecordRead. + // if req.readBytes > 0 { + // rl.metrics.readBytesAdmitted.Inc(req.readBytes) + // } if req.writeBytes > 0 { rl.metrics.writeBytesAdmitted.Inc(req.writeBytes) } - rl.metrics.requestsAdmitted.Inc(1) })) rl.qp = quotapool.New(tenantID.String(), &buckets, options...) buckets.clock = rl.qp.TimeSource() buckets.lastUpdated = buckets.clock.Now() } -func (rl *limiter) Wait(ctx context.Context, writeBytes int64) error { +func (rl *limiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error { rl.metrics.currentBlocked.Inc(1) defer rl.metrics.currentBlocked.Dec(1) - r := newWaitRequest(writeBytes) + r := newWaitRequest(isWrite, writeBytes) defer putWaitRequest(r) if err := rl.qp.Acquire(ctx, r); err != nil { return err @@ -130,11 +141,12 @@ func (rl *limiter) updateLimits(limits LimitConfigs) { // tokenBuckets is the implementation of Resource which remains in the quotapool // for a limiter. type tokenBuckets struct { - clock timeutil.TimeSource - lastUpdated time.Time - requests tokenBucket - readBytes tokenBucket - writeBytes tokenBucket + clock timeutil.TimeSource + lastUpdated time.Time + readRequests tokenBucket + writeRequests tokenBucket + readBytes tokenBucket + writeBytes tokenBucket } var _ quotapool.Resource = (*tokenBuckets)(nil) @@ -146,7 +158,8 @@ func (rb *tokenBuckets) update() { // TODO(ajwerner): Consider instituting a minimum update frequency to avoid // spinning too fast on timers for tons of tiny allocations at a fast rate. if since := now.Sub(rb.lastUpdated); since > 0 { - rb.requests.update(since) + rb.readRequests.update(since) + rb.writeRequests.update(since) rb.readBytes.update(since) rb.writeBytes.update(since) rb.lastUpdated = now @@ -166,14 +179,16 @@ func (rb *tokenBuckets) check(req *waitRequest) (fulfilled bool, tryAgainAfter t } } } - check(&rb.requests, req.requests) + check(&rb.readRequests, req.readRequests) + check(&rb.writeRequests, req.writeRequests) check(&rb.readBytes, req.readBytes) check(&rb.writeBytes, req.writeBytes) return fulfilled, tryAgainAfter } func (rb *tokenBuckets) subtract(req *waitRequest) { - rb.requests.tokens -= float64(req.requests) + rb.readRequests.tokens -= float64(req.readRequests) + rb.writeRequests.tokens -= float64(req.writeRequests) rb.readBytes.tokens -= float64(req.readBytes) rb.writeBytes.tokens -= float64(req.writeBytes) } @@ -185,7 +200,8 @@ func (rb *tokenBuckets) Merge(val interface{}) (shouldNotify bool) { // configuration. rb.update() - rb.requests.setConf(toAdd.Requests) + rb.readRequests.setConf(toAdd.ReadRequests) + rb.writeRequests.setConf(toAdd.WriteRequests) rb.readBytes.setConf(toAdd.ReadBytes) rb.writeBytes.setConf(toAdd.WriteBytes) return true @@ -264,9 +280,10 @@ func (t *tokenBucket) clampTokens() { // waitRequest is used to wait for adequate resources in the tokenBuckets. type waitRequest struct { - requests int64 - writeBytes int64 - readBytes int64 + readRequests int64 + writeRequests int64 + writeBytes int64 + readBytes int64 } var waitRequestSyncPool = sync.Pool{ @@ -275,12 +292,18 @@ var waitRequestSyncPool = sync.Pool{ // newWaitRequest allocates a waitRequest from the sync.Pool. // It should be returned with putWaitRequest. -func newWaitRequest(writeBytes int64) *waitRequest { +func newWaitRequest(isWrite bool, writeBytes int64) *waitRequest { r := waitRequestSyncPool.Get().(*waitRequest) *r = waitRequest{ - requests: 1, - readBytes: 1, - writeBytes: writeBytes, + readRequests: 0, + writeRequests: 0, + readBytes: 1, + writeBytes: writeBytes, + } + if isWrite { + r.writeRequests = 1 + } else { + r.readRequests = 1 } return r } diff --git a/pkg/kv/kvserver/tenantrate/limiter_test.go b/pkg/kv/kvserver/tenantrate/limiter_test.go index 20648b4ab107..2b46c9d3f225 100644 --- a/pkg/kv/kvserver/tenantrate/limiter_test.go +++ b/pkg/kv/kvserver/tenantrate/limiter_test.go @@ -48,9 +48,9 @@ func TestCloser(t *testing.T) { limiter := factory.GetTenant(tenant, closer) ctx := context.Background() // First Wait call will not block. - require.NoError(t, limiter.Wait(ctx, 1)) + require.NoError(t, limiter.Wait(ctx, false, 1)) errCh := make(chan error, 1) - go func() { errCh <- limiter.Wait(ctx, 1<<30) }() + go func() { errCh <- limiter.Wait(ctx, false, 1<<30) }() testutils.SucceedsSoon(t, func() error { if timers := timeSource.Timers(); len(timers) != 1 { return errors.Errorf("expected 1 timer, found %d", len(timers)) @@ -84,6 +84,7 @@ type launchState struct { tenantID roachpb.TenantID ctx context.Context cancel context.CancelFunc + isWrite bool writeBytes int64 reserveCh chan error } @@ -202,6 +203,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { var cmds []struct { ID string Tenant uint64 + IsWrite bool WriteBytes int64 } if err := yaml.UnmarshalStrict([]byte(d.Input), &cmds); err != nil { @@ -213,6 +215,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { s.tenantID = roachpb.MakeTenantID(cmd.Tenant) s.ctx, s.cancel = context.WithCancel(context.Background()) s.reserveCh = make(chan error, 1) + s.isWrite = cmd.IsWrite s.writeBytes = cmd.WriteBytes ts.running[s.id] = &s lims := ts.tenants[s.tenantID] @@ -221,7 +224,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { } go func() { // We'll not worry about ever releasing tenant Limiters. - s.reserveCh <- lims[0].Wait(s.ctx, s.writeBytes) + s.reserveCh <- lims[0].Wait(s.ctx, s.isWrite, s.writeBytes) }() } return ts.FormatRunning() @@ -337,12 +340,15 @@ func (ts *testState) recordRead(t *testing.T, d *datadriven.TestData) string { // kv_tenant_rate_limit_read_bytes_admitted 0 // kv_tenant_rate_limit_read_bytes_admitted{tenant_id="2"} 0 // kv_tenant_rate_limit_read_bytes_admitted{tenant_id="system"} 100 -// kv_tenant_rate_limit_requests_admitted 0 -// kv_tenant_rate_limit_requests_admitted{tenant_id="2"} 0 -// kv_tenant_rate_limit_requests_admitted{tenant_id="system"} 0 +// kv_tenant_rate_limit_read_requests_admitted 0 +// kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 0 +// kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 0 // kv_tenant_rate_limit_write_bytes_admitted 50 // kv_tenant_rate_limit_write_bytes_admitted{tenant_id="2"} 50 // kv_tenant_rate_limit_write_bytes_admitted{tenant_id="system"} 0 +// kv_tenant_rate_limit_write_requests_admitted 0 +// kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 0 +// kv_tenant_rate_limit_write_requests_admitted{tenant_id="system"} 0 // // Or with a regular expression: // diff --git a/pkg/kv/kvserver/tenantrate/metrics.go b/pkg/kv/kvserver/tenantrate/metrics.go index b16344c64c2a..8de30d104112 100644 --- a/pkg/kv/kvserver/tenantrate/metrics.go +++ b/pkg/kv/kvserver/tenantrate/metrics.go @@ -18,11 +18,12 @@ import ( // Metrics is a metric.Struct for the LimiterFactory. type Metrics struct { - Tenants *metric.Gauge - CurrentBlocked *aggmetric.AggGauge - RequestsAdmitted *aggmetric.AggCounter - WriteBytesAdmitted *aggmetric.AggCounter - ReadBytesAdmitted *aggmetric.AggCounter + Tenants *metric.Gauge + CurrentBlocked *aggmetric.AggGauge + ReadRequestsAdmitted *aggmetric.AggCounter + WriteRequestsAdmitted *aggmetric.AggCounter + ReadBytesAdmitted *aggmetric.AggCounter + WriteBytesAdmitted *aggmetric.AggCounter } var _ metric.Struct = (*Metrics)(nil) @@ -40,17 +41,17 @@ var ( Measurement: "Requests", Unit: metric.Unit_COUNT, } - metaRequestsAdmitted = metric.Metadata{ - Name: "kv.tenant_rate_limit.requests_admitted", - Help: "Number of requests admitted by the rate limiter", + metaReadRequestsAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.read_requests_admitted", + Help: "Number of read requests admitted by the rate limiter", Measurement: "Requests", Unit: metric.Unit_COUNT, } - metaWriteBytesAdmitted = metric.Metadata{ - Name: "kv.tenant_rate_limit.write_bytes_admitted", - Help: "Number of write bytes admitted by the rate limiter", - Measurement: "Bytes", - Unit: metric.Unit_BYTES, + metaWriteRequestsAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.write_requests_admitted", + Help: "Number of write requests admitted by the rate limiter", + Measurement: "Requests", + Unit: metric.Unit_COUNT, } metaReadBytesAdmitted = metric.Metadata{ Name: "kv.tenant_rate_limit.read_bytes_admitted", @@ -58,6 +59,12 @@ var ( Measurement: "Bytes", Unit: metric.Unit_BYTES, } + metaWriteBytesAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.write_bytes_admitted", + Help: "Number of write bytes admitted by the rate limiter", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } ) // TenantIDLabel is the label used with metrics associated with a tenant. @@ -67,11 +74,12 @@ const TenantIDLabel = "tenant_id" func makeMetrics() Metrics { b := aggmetric.MakeBuilder(TenantIDLabel) return Metrics{ - Tenants: metric.NewGauge(metaTenants), - CurrentBlocked: b.Gauge(metaCurrentBlocked), - RequestsAdmitted: b.Counter(metaRequestsAdmitted), - WriteBytesAdmitted: b.Counter(metaWriteBytesAdmitted), - ReadBytesAdmitted: b.Counter(metaReadBytesAdmitted), + Tenants: metric.NewGauge(metaTenants), + CurrentBlocked: b.Gauge(metaCurrentBlocked), + ReadRequestsAdmitted: b.Counter(metaReadRequestsAdmitted), + WriteRequestsAdmitted: b.Counter(metaWriteRequestsAdmitted), + ReadBytesAdmitted: b.Counter(metaReadBytesAdmitted), + WriteBytesAdmitted: b.Counter(metaWriteBytesAdmitted), } } @@ -80,25 +88,28 @@ func (m *Metrics) MetricStruct() {} // tenantMetrics represent metrics for an individual tenant. type tenantMetrics struct { - currentBlocked *aggmetric.Gauge - requestsAdmitted *aggmetric.Counter - writeBytesAdmitted *aggmetric.Counter - readBytesAdmitted *aggmetric.Counter + currentBlocked *aggmetric.Gauge + readRequestsAdmitted *aggmetric.Counter + writeRequestsAdmitted *aggmetric.Counter + readBytesAdmitted *aggmetric.Counter + writeBytesAdmitted *aggmetric.Counter } func (m *Metrics) tenantMetrics(tenantID roachpb.TenantID) tenantMetrics { tid := tenantID.String() return tenantMetrics{ - currentBlocked: m.CurrentBlocked.AddChild(tid), - requestsAdmitted: m.RequestsAdmitted.AddChild(tid), - writeBytesAdmitted: m.WriteBytesAdmitted.AddChild(tid), - readBytesAdmitted: m.ReadBytesAdmitted.AddChild(tid), + currentBlocked: m.CurrentBlocked.AddChild(tid), + readRequestsAdmitted: m.ReadRequestsAdmitted.AddChild(tid), + writeRequestsAdmitted: m.WriteRequestsAdmitted.AddChild(tid), + readBytesAdmitted: m.ReadBytesAdmitted.AddChild(tid), + writeBytesAdmitted: m.WriteBytesAdmitted.AddChild(tid), } } func (tm *tenantMetrics) destroy() { tm.currentBlocked.Destroy() - tm.requestsAdmitted.Destroy() - tm.writeBytesAdmitted.Destroy() + tm.readRequestsAdmitted.Destroy() + tm.writeRequestsAdmitted.Destroy() tm.readBytesAdmitted.Destroy() + tm.writeBytesAdmitted.Destroy() } diff --git a/pkg/kv/kvserver/tenantrate/settings.go b/pkg/kv/kvserver/tenantrate/settings.go index 50ac9c4b4ffc..33903a075e58 100644 --- a/pkg/kv/kvserver/tenantrate/settings.go +++ b/pkg/kv/kvserver/tenantrate/settings.go @@ -28,18 +28,23 @@ type LimitConfig struct { // It is exported for convenience and testing. // The values are derived from cluster settings. type LimitConfigs struct { - Requests LimitConfig - ReadBytes LimitConfig - WriteBytes LimitConfig + ReadRequests LimitConfig + WriteRequests LimitConfig + ReadBytes LimitConfig + WriteBytes LimitConfig } // LimitConfigsFromSettings constructs LimitConfigs from the values stored in // the settings. func LimitConfigsFromSettings(settings *cluster.Settings) LimitConfigs { return LimitConfigs{ - Requests: LimitConfig{ - Rate: Limit(requestRateLimit.Get(&settings.SV)), - Burst: requestBurstLimit.Get(&settings.SV), + ReadRequests: LimitConfig{ + Rate: Limit(readRequestRateLimit.Get(&settings.SV)), + Burst: readRequestBurstLimit.Get(&settings.SV), + }, + WriteRequests: LimitConfig{ + Rate: Limit(writeRequestRateLimit.Get(&settings.SV)), + Burst: writeRequestBurstLimit.Get(&settings.SV), }, ReadBytes: LimitConfig{ Rate: Limit(readRateLimit.Get(&settings.SV)), @@ -53,14 +58,24 @@ func LimitConfigsFromSettings(settings *cluster.Settings) LimitConfigs { } var ( - requestRateLimit = settings.RegisterPositiveFloatSetting( - "kv.tenant_rate_limiter.requests.rate_limit", - "per-tenant request rate limit in requests per second", + readRequestRateLimit = settings.RegisterPositiveFloatSetting( + "kv.tenant_rate_limiter.read_requests.rate_limit", + "per-tenant read request rate limit in requests per second", + 128) + + readRequestBurstLimit = settings.RegisterPositiveIntSetting( + "kv.tenant_rate_limiter.read_requests.burst_limit", + "per-tenant read request burst limit in requests", + 512) + + writeRequestRateLimit = settings.RegisterPositiveFloatSetting( + "kv.tenant_rate_limiter.write_requests.rate_limit", + "per-tenant write request rate limit in requests per second", 128) - requestBurstLimit = settings.RegisterPositiveIntSetting( - "kv.tenant_rate_limiter.request.burst_limit", - "per-tenant request burst limit in requests", + writeRequestBurstLimit = settings.RegisterPositiveIntSetting( + "kv.tenant_rate_limiter.write_requests.burst_limit", + "per-tenant write request burst limit in requests", 512) readRateLimit = settings.RegisterByteSizeSetting( @@ -86,8 +101,10 @@ var ( // settingsSetOnChangeFuncs are the functions used to register the factory to // be notified of changes to any of the settings which configure it. settingsSetOnChangeFuncs = [...]func(*settings.Values, func()){ - requestRateLimit.SetOnChange, - requestBurstLimit.SetOnChange, + readRequestRateLimit.SetOnChange, + readRequestBurstLimit.SetOnChange, + writeRequestRateLimit.SetOnChange, + writeRequestBurstLimit.SetOnChange, readRateLimit.SetOnChange, readBurstLimit.SetOnChange, writeRateLimit.SetOnChange, diff --git a/pkg/kv/kvserver/tenantrate/system_limiter.go b/pkg/kv/kvserver/tenantrate/system_limiter.go index e065f8da01f7..ad8e48e60891 100644 --- a/pkg/kv/kvserver/tenantrate/system_limiter.go +++ b/pkg/kv/kvserver/tenantrate/system_limiter.go @@ -18,9 +18,13 @@ type systemLimiter struct { tenantMetrics } -func (s systemLimiter) Wait(ctx context.Context, writeBytes int64) error { +func (s systemLimiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error { + if isWrite { + s.writeRequestsAdmitted.Inc(1) + } else { + s.readRequestsAdmitted.Inc(1) + } s.writeBytesAdmitted.Inc(writeBytes) - s.requestsAdmitted.Inc(1) return nil } diff --git a/pkg/kv/kvserver/tenantrate/testdata/basic b/pkg/kv/kvserver/tenantrate/testdata/basic index b428f36bb9aa..77a8db5de8d8 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/basic +++ b/pkg/kv/kvserver/tenantrate/testdata/basic @@ -1,7 +1,8 @@ init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 1024, burst: 2048 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 1024, burst: 2048 } ---- 00:00:00.000 @@ -12,31 +13,35 @@ get_tenants ---- [2#2, 3#1, 5#3, system#1] -# Launch two requests on behalf of tenant 2 and one on behalf of 3. +# Launch four requests on behalf of tenant 2, one on behalf of 3, and one on +# behalf of the system tenant. launch - { id: g0, tenant: 1 } - { id: g1, tenant: 2 } - { id: g2, tenant: 2 } -- { id: g3, tenant: 3 } +- { id: g3, tenant: 2, iswrite: true } +- { id: g4, tenant: 2, iswrite: true } +- { id: g5, tenant: 3 } ---- -[g0@system, g1@2, g2@2, g3@3] +[g0@system, g1@2, g2@2, g3@2, g4@2, g5@3] # Ensure that none of the above requests get blocked because they use less # than the configured burst for their respective limiters. await -[g0, g1, g2, g3] +[g0, g1, g2, g3, g4, g5] ---- [] -# Launch another request on behalf of tenant 2, it will block due to the request -# rate limit. +# Launch another read and another write request on behalf of tenant 2, it will +# block due to the request rate limit. launch -- { id: g4, tenant: 2 } +- { id: g6, tenant: 2 } +- { id: g7, tenant: 2, iswrite: true } ---- -[g4@2] +[g6@2, g7@2] # Ensure that it the above request was blocked by observing the timer it creates # to wait for available quota. @@ -46,29 +51,34 @@ timers 00:00:01.000 # Observe that the "current_blocked" counter has appropriate values to indicate -# that there is one blocked request for tenant 2 and total. +# that there are two blocked request for tenant 2 and total. metrics current_blocked ---- -kv_tenant_rate_limit_current_blocked 1 -kv_tenant_rate_limit_current_blocked{tenant_id="2"} 1 +kv_tenant_rate_limit_current_blocked 2 +kv_tenant_rate_limit_current_blocked{tenant_id="2"} 2 kv_tenant_rate_limit_current_blocked{tenant_id="3"} 0 kv_tenant_rate_limit_current_blocked{tenant_id="5"} 0 kv_tenant_rate_limit_current_blocked{tenant_id="system"} 0 -# Observe that the "requests_admitted" counter has appropriate values to +# Observe that the "requests_admitted" counters has appropriate values to # indicate that requests have been admitted. metrics -kv_tenant_rate_limit_requests_admitted +kv_tenant_rate_limit_.*_requests_admitted ---- -kv_tenant_rate_limit_requests_admitted 4 -kv_tenant_rate_limit_requests_admitted{tenant_id="2"} 2 -kv_tenant_rate_limit_requests_admitted{tenant_id="3"} 1 -kv_tenant_rate_limit_requests_admitted{tenant_id="5"} 0 -kv_tenant_rate_limit_requests_admitted{tenant_id="system"} 1 +kv_tenant_rate_limit_read_requests_admitted 4 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 2 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="3"} 1 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="5"} 0 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 1 +kv_tenant_rate_limit_write_requests_admitted 2 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 2 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="3"} 0 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="5"} 0 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="system"} 0 # Release the tenant 3 rate limiter. @@ -91,9 +101,9 @@ advance ---- 00:00:01.001 -# Observe that the blocked request is now unblocked. +# Observe that the blocked requests are now unblocked. await -- g4 +[g6, g7] ---- [] diff --git a/pkg/kv/kvserver/tenantrate/testdata/burst b/pkg/kv/kvserver/tenantrate/testdata/burst index 6624677e9b00..9ed29bd12113 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/burst +++ b/pkg/kv/kvserver/tenantrate/testdata/burst @@ -2,9 +2,10 @@ # into debt. init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 10, burst: 20 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 10, burst: 20 } ---- 00:00:00.000 @@ -19,7 +20,7 @@ get_tenants # limit. This will not block but will put the limiter into debt. launch -- { id: g1, tenant: 2, writebytes: 30 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } ---- [g1@2] @@ -33,7 +34,7 @@ await # 10/s. launch -- { id: g1, tenant: 2, writebytes: 10 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 10 } ---- [g1@2] @@ -67,7 +68,7 @@ await # is above the burst of 20, we'll need to wait for the bucket to be full. launch -- { id: g1, tenant: 2, writebytes: 30 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } ---- [g1@2] diff --git a/pkg/kv/kvserver/tenantrate/testdata/cancel b/pkg/kv/kvserver/tenantrate/testdata/cancel index 6cc28b14da76..77ce14d13d5f 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/cancel +++ b/pkg/kv/kvserver/tenantrate/testdata/cancel @@ -1,9 +1,10 @@ # This tests cancellation and unblocking subsequent requests. init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 1024, burst: 2048 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 1024, burst: 2048 } ---- 00:00:00.000 @@ -15,7 +16,7 @@ get_tenants # Launch a request to consume half of the 1024 capacity. launch -- { id: g1, tenant: 2, writebytes: 1024 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 1024 } ---- [g1@2] @@ -27,7 +28,7 @@ await # Launch a request requiring more quota than exists. launch -- { id: g2, tenant: 2, writebytes: 1536 } +- { id: g2, tenant: 2, iswrite: true, writebytes: 1536 } ---- [g2@2] @@ -40,7 +41,7 @@ timers # Launch another request which could be fulfilled by the existing quota. launch -- { id: g3, tenant: 2, writebytes: 1024 } +- { id: g3, tenant: 2, iswrite: true, writebytes: 1024 } ---- [g2@2, g3@2] diff --git a/pkg/kv/kvserver/tenantrate/testdata/reads b/pkg/kv/kvserver/tenantrate/testdata/reads index e9b686ffb8dd..9f99d85c5c17 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/reads +++ b/pkg/kv/kvserver/tenantrate/testdata/reads @@ -2,8 +2,8 @@ # into debt init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 10, burst: 100 } +readrequests: { rate: 1, burst: 2 } +readbytes: { rate: 10, burst: 100 } ---- 00:00:00.000 diff --git a/pkg/kv/kvserver/tenantrate/testdata/update b/pkg/kv/kvserver/tenantrate/testdata/update index 88dda006b8fa..fefc8e88364e 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/update +++ b/pkg/kv/kvserver/tenantrate/testdata/update @@ -1,9 +1,10 @@ # Test updating the configuration of the rate limiter. init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 10, burst: 20 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 10, burst: 20 } ---- 00:00:00.000 @@ -15,7 +16,7 @@ get_tenants # Launch a request that puts the limiter in debt by 10. launch -- { id: g1, tenant: 2, writebytes: 30 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } ---- [g1@2] @@ -28,7 +29,7 @@ await # with the current debt. launch -- { id: g1, tenant: 2, writebytes: 20 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 20 } ---- [g1@2] diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index b14529e5bf27..17e00d2d0e9c 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1348,8 +1348,10 @@ func (t *logicTest) setup(cfg testClusterConfig, serverArgs TestServerArgs) { // Increase tenant rate limits for faster tests. conn := t.cluster.ServerConn(0) for _, settingName := range []string{ - "kv.tenant_rate_limiter.requests.rate_limit", - "kv.tenant_rate_limiter.request.burst_limit", + "kv.tenant_rate_limiter.read_requests.rate_limit", + "kv.tenant_rate_limiter.read_requests.burst_limit", + "kv.tenant_rate_limiter.write_requests.rate_limit", + "kv.tenant_rate_limiter.write_requests.burst_limit", } { if _, err := conn.Exec( fmt.Sprintf("SET CLUSTER SETTING %s = %d", settingName, 100000), diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index d17831605091..5466f5962fe7 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -743,16 +743,16 @@ var charts = []sectionDescription{ Metrics: []string{"kv.tenant_rate_limit.num_tenants"}, }, { - Title: "Requests Admitted by Rate Limiter", + Title: "Read Requests Admitted by Rate Limiter", Downsampler: DescribeAggregator_MAX, Percentiles: false, - Metrics: []string{"kv.tenant_rate_limit.requests_admitted"}, + Metrics: []string{"kv.tenant_rate_limit.read_requests_admitted"}, }, { - Title: "Write Bytes Admitted by Rate Limiter", + Title: "Write Requests Admitted by Rate Limiter", Downsampler: DescribeAggregator_MAX, Percentiles: false, - Metrics: []string{"kv.tenant_rate_limit.write_bytes_admitted"}, + Metrics: []string{"kv.tenant_rate_limit.write_requests_admitted"}, }, { Title: "Read Bytes Admitted by Rate Limiter", @@ -760,6 +760,12 @@ var charts = []sectionDescription{ Percentiles: false, Metrics: []string{"kv.tenant_rate_limit.read_bytes_admitted"}, }, + { + Title: "Write Bytes Admitted by Rate Limiter", + Downsampler: DescribeAggregator_MAX, + Percentiles: false, + Metrics: []string{"kv.tenant_rate_limit.write_bytes_admitted"}, + }, }, }, {