diff --git a/pkg/kv/kvserver/client_tenant_test.go b/pkg/kv/kvserver/client_tenant_test.go index 2ed7fd887d9e..e824ff1eb521 100644 --- a/pkg/kv/kvserver/client_tenant_test.go +++ b/pkg/kv/kvserver/client_tenant_test.go @@ -175,20 +175,20 @@ func TestTenantRateLimiter(t *testing.T) { // the tenant range. tenantCtx := roachpb.NewContextForTenant(ctx, tenantID) cfg := tenantrate.LimitConfigsFromSettings(s.ClusterSettings()) - for i := 0; i < int(cfg.Requests.Burst); i++ { + for i := 0; i < int(cfg.WriteRequests.Burst); i++ { require.NoError(t, db.Put(ctx, mkKey(), 0)) } // Now ensure that in the same instant the write QPS limit does affect the // tenant. Issuing up to the burst limit of requests can happen without // blocking. - for i := 0; i < int(cfg.Requests.Burst); i++ { + for i := 0; i < int(cfg.WriteRequests.Burst); i++ { require.NoError(t, db.Put(tenantCtx, mkKey(), 0)) } // Attempt to issue another request, make sure that it gets blocked by // observing a timer. errCh := make(chan error, 1) go func() { errCh <- db.Put(tenantCtx, mkKey(), 0) }() - expectedTimer := t0.Add(time.Duration(float64(1/cfg.Requests.Rate) * float64(time.Second))) + expectedTimer := t0.Add(time.Duration(float64(1/cfg.WriteRequests.Rate) * float64(time.Second))) testutils.SucceedsSoon(t, func() error { timers := timeSource.Timers() if len(timers) != 1 { @@ -213,18 +213,18 @@ func TestTenantRateLimiter(t *testing.T) { return string(read) } makeMetricStr := func(expCount int64) string { - const tenantMetricStr = `kv_tenant_rate_limit_requests_admitted{store="1",tenant_id="10"}` + const tenantMetricStr = `kv_tenant_rate_limit_write_requests_admitted{store="1",tenant_id="10"}` return fmt.Sprintf("%s %d", tenantMetricStr, expCount) } // Ensure that the metric for the admitted requests is equal to the number of // requests which we've admitted. - require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst)) + require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst)) // Allow the blocked request to proceed. timeSource.Advance(time.Second) require.NoError(t, <-errCh) // Ensure that it is now reflected in the metrics. - require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst+1)) + require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst+1)) } diff --git a/pkg/kv/kvserver/replica_rate_limit.go b/pkg/kv/kvserver/replica_rate_limit.go index b53149271f26..8e5efa7848ef 100644 --- a/pkg/kv/kvserver/replica_rate_limit.go +++ b/pkg/kv/kvserver/replica_rate_limit.go @@ -27,7 +27,7 @@ func (r *Replica) maybeRateLimitBatch(ctx context.Context, ba *roachpb.BatchRequ if !ok || tenantID == roachpb.SystemTenantID { return nil } - return r.tenantLimiter.Wait(ctx, bytesWrittenFromRequest(ba)) + return r.tenantLimiter.Wait(ctx, ba.IsWrite(), bytesWrittenFromRequest(ba)) } // bytesWrittenFromBatchRequest returns an approximation of the number of bytes diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 509ca9d21a90..1d0fdd65b7b0 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -65,8 +65,6 @@ func (r *Replica) sendWithRangeID( r.maybeInitializeRaftGroup(ctx) isReadOnly := ba.IsReadOnly() - useRaft := !isReadOnly && ba.IsWrite() - if err := r.checkBatchRequest(ba, isReadOnly); err != nil { return nil, roachpb.NewError(err) } @@ -92,14 +90,14 @@ func (r *Replica) sendWithRangeID( // Differentiate between read-write, read-only, and admin. var pErr *roachpb.Error - if useRaft { - log.Event(ctx, "read-write path") - fn := (*Replica).executeWriteBatch - br, pErr = r.executeBatchWithConcurrencyRetries(ctx, ba, fn) - } else if isReadOnly { + if isReadOnly { log.Event(ctx, "read-only path") fn := (*Replica).executeReadOnlyBatch br, pErr = r.executeBatchWithConcurrencyRetries(ctx, ba, fn) + } else if ba.IsWrite() { + log.Event(ctx, "read-write path") + fn := (*Replica).executeWriteBatch + br, pErr = r.executeBatchWithConcurrencyRetries(ctx, ba, fn) } else if ba.IsAdmin() { log.Event(ctx, "admin path") br, pErr = r.executeAdminBatch(ctx, ba) diff --git a/pkg/kv/kvserver/tenantrate/helpers_test.go b/pkg/kv/kvserver/tenantrate/helpers_test.go index b8a02b4ecbb2..81f29ec0ff94 100644 --- a/pkg/kv/kvserver/tenantrate/helpers_test.go +++ b/pkg/kv/kvserver/tenantrate/helpers_test.go @@ -15,8 +15,10 @@ import "github.com/cockroachdb/cockroach/pkg/settings/cluster" // OverrideSettingsWithRateLimits utilizes LimitConfigs from the values stored in the // settings. func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) { - requestRateLimit.Override(&settings.SV, float64(rl.Requests.Rate)) - requestBurstLimit.Override(&settings.SV, rl.Requests.Burst) + readRequestRateLimit.Override(&settings.SV, float64(rl.ReadRequests.Rate)) + readRequestBurstLimit.Override(&settings.SV, rl.ReadRequests.Burst) + writeRequestRateLimit.Override(&settings.SV, float64(rl.WriteRequests.Rate)) + writeRequestBurstLimit.Override(&settings.SV, rl.WriteRequests.Burst) readRateLimit.Override(&settings.SV, int64(rl.ReadBytes.Rate)) readBurstLimit.Override(&settings.SV, rl.ReadBytes.Burst) writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate)) diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go index a6fc7e14a2ec..5276840e0f22 100644 --- a/pkg/kv/kvserver/tenantrate/limiter.go +++ b/pkg/kv/kvserver/tenantrate/limiter.go @@ -48,14 +48,15 @@ import ( type Limiter interface { // Wait acquires n quota from the limiter. This acquisition cannot be - // released. Each call to wait will consume 1 read byte, 1 request, and - // writeBytes from the token buckets. Calls to Wait will block until the - // buckets contain adequate resources. If a request attempts to write more - // than the burst limit, it will wait until the bucket is completely full - // before acquiring the requested quantity and putting the limiter in debt. + // released. Each call to wait will consume 1 read or write request + // depending on isWrite, 1 read byte, and writeBytes from the token buckets. + // Calls to Wait will block until the buckets contain adequate resources. If + // a request attempts to write more than the burst limit, it will wait until + // the bucket is completely full before acquiring the requested quantity and + // putting the limiter in debt. // // The only errors which should be returned are due to the context. - Wait(ctx context.Context, writeBytes int64) error + Wait(ctx context.Context, isWrite bool, writeBytes int64) error // RecordRead subtracts the bytes read by a request from the token bucket. // This call may push the Limiter into debt in the ReadBytes dimensions @@ -85,28 +86,38 @@ func (rl *limiter) init( metrics: metrics, } buckets := tokenBuckets{ - requests: makeTokenBucket(conf.Requests), - readBytes: makeTokenBucket(conf.ReadBytes), - writeBytes: makeTokenBucket(conf.WriteBytes), + readRequests: makeTokenBucket(conf.ReadRequests), + writeRequests: makeTokenBucket(conf.WriteRequests), + readBytes: makeTokenBucket(conf.ReadBytes), + writeBytes: makeTokenBucket(conf.WriteBytes), } options = append(options, quotapool.OnAcquisition(func( ctx context.Context, poolName string, r quotapool.Request, start time.Time, ) { req := r.(*waitRequest) + if req.readRequests > 0 { + rl.metrics.readRequestsAdmitted.Inc(req.readRequests) + } + if req.writeRequests > 0 { + rl.metrics.writeRequestsAdmitted.Inc(req.writeRequests) + } + // Accounted for in limiter.RecordRead. + // if req.readBytes > 0 { + // rl.metrics.readBytesAdmitted.Inc(req.readBytes) + // } if req.writeBytes > 0 { rl.metrics.writeBytesAdmitted.Inc(req.writeBytes) } - rl.metrics.requestsAdmitted.Inc(1) })) rl.qp = quotapool.New(tenantID.String(), &buckets, options...) buckets.clock = rl.qp.TimeSource() buckets.lastUpdated = buckets.clock.Now() } -func (rl *limiter) Wait(ctx context.Context, writeBytes int64) error { +func (rl *limiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error { rl.metrics.currentBlocked.Inc(1) defer rl.metrics.currentBlocked.Dec(1) - r := newWaitRequest(writeBytes) + r := newWaitRequest(isWrite, writeBytes) defer putWaitRequest(r) if err := rl.qp.Acquire(ctx, r); err != nil { return err @@ -130,11 +141,12 @@ func (rl *limiter) updateLimits(limits LimitConfigs) { // tokenBuckets is the implementation of Resource which remains in the quotapool // for a limiter. type tokenBuckets struct { - clock timeutil.TimeSource - lastUpdated time.Time - requests tokenBucket - readBytes tokenBucket - writeBytes tokenBucket + clock timeutil.TimeSource + lastUpdated time.Time + readRequests tokenBucket + writeRequests tokenBucket + readBytes tokenBucket + writeBytes tokenBucket } var _ quotapool.Resource = (*tokenBuckets)(nil) @@ -146,7 +158,8 @@ func (rb *tokenBuckets) update() { // TODO(ajwerner): Consider instituting a minimum update frequency to avoid // spinning too fast on timers for tons of tiny allocations at a fast rate. if since := now.Sub(rb.lastUpdated); since > 0 { - rb.requests.update(since) + rb.readRequests.update(since) + rb.writeRequests.update(since) rb.readBytes.update(since) rb.writeBytes.update(since) rb.lastUpdated = now @@ -166,14 +179,16 @@ func (rb *tokenBuckets) check(req *waitRequest) (fulfilled bool, tryAgainAfter t } } } - check(&rb.requests, req.requests) + check(&rb.readRequests, req.readRequests) + check(&rb.writeRequests, req.writeRequests) check(&rb.readBytes, req.readBytes) check(&rb.writeBytes, req.writeBytes) return fulfilled, tryAgainAfter } func (rb *tokenBuckets) subtract(req *waitRequest) { - rb.requests.tokens -= float64(req.requests) + rb.readRequests.tokens -= float64(req.readRequests) + rb.writeRequests.tokens -= float64(req.writeRequests) rb.readBytes.tokens -= float64(req.readBytes) rb.writeBytes.tokens -= float64(req.writeBytes) } @@ -185,7 +200,8 @@ func (rb *tokenBuckets) Merge(val interface{}) (shouldNotify bool) { // configuration. rb.update() - rb.requests.setConf(toAdd.Requests) + rb.readRequests.setConf(toAdd.ReadRequests) + rb.writeRequests.setConf(toAdd.WriteRequests) rb.readBytes.setConf(toAdd.ReadBytes) rb.writeBytes.setConf(toAdd.WriteBytes) return true @@ -264,9 +280,10 @@ func (t *tokenBucket) clampTokens() { // waitRequest is used to wait for adequate resources in the tokenBuckets. type waitRequest struct { - requests int64 - writeBytes int64 - readBytes int64 + readRequests int64 + writeRequests int64 + writeBytes int64 + readBytes int64 } var waitRequestSyncPool = sync.Pool{ @@ -275,12 +292,18 @@ var waitRequestSyncPool = sync.Pool{ // newWaitRequest allocates a waitRequest from the sync.Pool. // It should be returned with putWaitRequest. -func newWaitRequest(writeBytes int64) *waitRequest { +func newWaitRequest(isWrite bool, writeBytes int64) *waitRequest { r := waitRequestSyncPool.Get().(*waitRequest) *r = waitRequest{ - requests: 1, - readBytes: 1, - writeBytes: writeBytes, + readRequests: 0, + writeRequests: 0, + readBytes: 1, + writeBytes: writeBytes, + } + if isWrite { + r.writeRequests = 1 + } else { + r.readRequests = 1 } return r } diff --git a/pkg/kv/kvserver/tenantrate/limiter_test.go b/pkg/kv/kvserver/tenantrate/limiter_test.go index 20648b4ab107..2b46c9d3f225 100644 --- a/pkg/kv/kvserver/tenantrate/limiter_test.go +++ b/pkg/kv/kvserver/tenantrate/limiter_test.go @@ -48,9 +48,9 @@ func TestCloser(t *testing.T) { limiter := factory.GetTenant(tenant, closer) ctx := context.Background() // First Wait call will not block. - require.NoError(t, limiter.Wait(ctx, 1)) + require.NoError(t, limiter.Wait(ctx, false, 1)) errCh := make(chan error, 1) - go func() { errCh <- limiter.Wait(ctx, 1<<30) }() + go func() { errCh <- limiter.Wait(ctx, false, 1<<30) }() testutils.SucceedsSoon(t, func() error { if timers := timeSource.Timers(); len(timers) != 1 { return errors.Errorf("expected 1 timer, found %d", len(timers)) @@ -84,6 +84,7 @@ type launchState struct { tenantID roachpb.TenantID ctx context.Context cancel context.CancelFunc + isWrite bool writeBytes int64 reserveCh chan error } @@ -202,6 +203,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { var cmds []struct { ID string Tenant uint64 + IsWrite bool WriteBytes int64 } if err := yaml.UnmarshalStrict([]byte(d.Input), &cmds); err != nil { @@ -213,6 +215,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { s.tenantID = roachpb.MakeTenantID(cmd.Tenant) s.ctx, s.cancel = context.WithCancel(context.Background()) s.reserveCh = make(chan error, 1) + s.isWrite = cmd.IsWrite s.writeBytes = cmd.WriteBytes ts.running[s.id] = &s lims := ts.tenants[s.tenantID] @@ -221,7 +224,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { } go func() { // We'll not worry about ever releasing tenant Limiters. - s.reserveCh <- lims[0].Wait(s.ctx, s.writeBytes) + s.reserveCh <- lims[0].Wait(s.ctx, s.isWrite, s.writeBytes) }() } return ts.FormatRunning() @@ -337,12 +340,15 @@ func (ts *testState) recordRead(t *testing.T, d *datadriven.TestData) string { // kv_tenant_rate_limit_read_bytes_admitted 0 // kv_tenant_rate_limit_read_bytes_admitted{tenant_id="2"} 0 // kv_tenant_rate_limit_read_bytes_admitted{tenant_id="system"} 100 -// kv_tenant_rate_limit_requests_admitted 0 -// kv_tenant_rate_limit_requests_admitted{tenant_id="2"} 0 -// kv_tenant_rate_limit_requests_admitted{tenant_id="system"} 0 +// kv_tenant_rate_limit_read_requests_admitted 0 +// kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 0 +// kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 0 // kv_tenant_rate_limit_write_bytes_admitted 50 // kv_tenant_rate_limit_write_bytes_admitted{tenant_id="2"} 50 // kv_tenant_rate_limit_write_bytes_admitted{tenant_id="system"} 0 +// kv_tenant_rate_limit_write_requests_admitted 0 +// kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 0 +// kv_tenant_rate_limit_write_requests_admitted{tenant_id="system"} 0 // // Or with a regular expression: // diff --git a/pkg/kv/kvserver/tenantrate/metrics.go b/pkg/kv/kvserver/tenantrate/metrics.go index b16344c64c2a..8de30d104112 100644 --- a/pkg/kv/kvserver/tenantrate/metrics.go +++ b/pkg/kv/kvserver/tenantrate/metrics.go @@ -18,11 +18,12 @@ import ( // Metrics is a metric.Struct for the LimiterFactory. type Metrics struct { - Tenants *metric.Gauge - CurrentBlocked *aggmetric.AggGauge - RequestsAdmitted *aggmetric.AggCounter - WriteBytesAdmitted *aggmetric.AggCounter - ReadBytesAdmitted *aggmetric.AggCounter + Tenants *metric.Gauge + CurrentBlocked *aggmetric.AggGauge + ReadRequestsAdmitted *aggmetric.AggCounter + WriteRequestsAdmitted *aggmetric.AggCounter + ReadBytesAdmitted *aggmetric.AggCounter + WriteBytesAdmitted *aggmetric.AggCounter } var _ metric.Struct = (*Metrics)(nil) @@ -40,17 +41,17 @@ var ( Measurement: "Requests", Unit: metric.Unit_COUNT, } - metaRequestsAdmitted = metric.Metadata{ - Name: "kv.tenant_rate_limit.requests_admitted", - Help: "Number of requests admitted by the rate limiter", + metaReadRequestsAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.read_requests_admitted", + Help: "Number of read requests admitted by the rate limiter", Measurement: "Requests", Unit: metric.Unit_COUNT, } - metaWriteBytesAdmitted = metric.Metadata{ - Name: "kv.tenant_rate_limit.write_bytes_admitted", - Help: "Number of write bytes admitted by the rate limiter", - Measurement: "Bytes", - Unit: metric.Unit_BYTES, + metaWriteRequestsAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.write_requests_admitted", + Help: "Number of write requests admitted by the rate limiter", + Measurement: "Requests", + Unit: metric.Unit_COUNT, } metaReadBytesAdmitted = metric.Metadata{ Name: "kv.tenant_rate_limit.read_bytes_admitted", @@ -58,6 +59,12 @@ var ( Measurement: "Bytes", Unit: metric.Unit_BYTES, } + metaWriteBytesAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.write_bytes_admitted", + Help: "Number of write bytes admitted by the rate limiter", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } ) // TenantIDLabel is the label used with metrics associated with a tenant. @@ -67,11 +74,12 @@ const TenantIDLabel = "tenant_id" func makeMetrics() Metrics { b := aggmetric.MakeBuilder(TenantIDLabel) return Metrics{ - Tenants: metric.NewGauge(metaTenants), - CurrentBlocked: b.Gauge(metaCurrentBlocked), - RequestsAdmitted: b.Counter(metaRequestsAdmitted), - WriteBytesAdmitted: b.Counter(metaWriteBytesAdmitted), - ReadBytesAdmitted: b.Counter(metaReadBytesAdmitted), + Tenants: metric.NewGauge(metaTenants), + CurrentBlocked: b.Gauge(metaCurrentBlocked), + ReadRequestsAdmitted: b.Counter(metaReadRequestsAdmitted), + WriteRequestsAdmitted: b.Counter(metaWriteRequestsAdmitted), + ReadBytesAdmitted: b.Counter(metaReadBytesAdmitted), + WriteBytesAdmitted: b.Counter(metaWriteBytesAdmitted), } } @@ -80,25 +88,28 @@ func (m *Metrics) MetricStruct() {} // tenantMetrics represent metrics for an individual tenant. type tenantMetrics struct { - currentBlocked *aggmetric.Gauge - requestsAdmitted *aggmetric.Counter - writeBytesAdmitted *aggmetric.Counter - readBytesAdmitted *aggmetric.Counter + currentBlocked *aggmetric.Gauge + readRequestsAdmitted *aggmetric.Counter + writeRequestsAdmitted *aggmetric.Counter + readBytesAdmitted *aggmetric.Counter + writeBytesAdmitted *aggmetric.Counter } func (m *Metrics) tenantMetrics(tenantID roachpb.TenantID) tenantMetrics { tid := tenantID.String() return tenantMetrics{ - currentBlocked: m.CurrentBlocked.AddChild(tid), - requestsAdmitted: m.RequestsAdmitted.AddChild(tid), - writeBytesAdmitted: m.WriteBytesAdmitted.AddChild(tid), - readBytesAdmitted: m.ReadBytesAdmitted.AddChild(tid), + currentBlocked: m.CurrentBlocked.AddChild(tid), + readRequestsAdmitted: m.ReadRequestsAdmitted.AddChild(tid), + writeRequestsAdmitted: m.WriteRequestsAdmitted.AddChild(tid), + readBytesAdmitted: m.ReadBytesAdmitted.AddChild(tid), + writeBytesAdmitted: m.WriteBytesAdmitted.AddChild(tid), } } func (tm *tenantMetrics) destroy() { tm.currentBlocked.Destroy() - tm.requestsAdmitted.Destroy() - tm.writeBytesAdmitted.Destroy() + tm.readRequestsAdmitted.Destroy() + tm.writeRequestsAdmitted.Destroy() tm.readBytesAdmitted.Destroy() + tm.writeBytesAdmitted.Destroy() } diff --git a/pkg/kv/kvserver/tenantrate/settings.go b/pkg/kv/kvserver/tenantrate/settings.go index 50ac9c4b4ffc..33903a075e58 100644 --- a/pkg/kv/kvserver/tenantrate/settings.go +++ b/pkg/kv/kvserver/tenantrate/settings.go @@ -28,18 +28,23 @@ type LimitConfig struct { // It is exported for convenience and testing. // The values are derived from cluster settings. type LimitConfigs struct { - Requests LimitConfig - ReadBytes LimitConfig - WriteBytes LimitConfig + ReadRequests LimitConfig + WriteRequests LimitConfig + ReadBytes LimitConfig + WriteBytes LimitConfig } // LimitConfigsFromSettings constructs LimitConfigs from the values stored in // the settings. func LimitConfigsFromSettings(settings *cluster.Settings) LimitConfigs { return LimitConfigs{ - Requests: LimitConfig{ - Rate: Limit(requestRateLimit.Get(&settings.SV)), - Burst: requestBurstLimit.Get(&settings.SV), + ReadRequests: LimitConfig{ + Rate: Limit(readRequestRateLimit.Get(&settings.SV)), + Burst: readRequestBurstLimit.Get(&settings.SV), + }, + WriteRequests: LimitConfig{ + Rate: Limit(writeRequestRateLimit.Get(&settings.SV)), + Burst: writeRequestBurstLimit.Get(&settings.SV), }, ReadBytes: LimitConfig{ Rate: Limit(readRateLimit.Get(&settings.SV)), @@ -53,14 +58,24 @@ func LimitConfigsFromSettings(settings *cluster.Settings) LimitConfigs { } var ( - requestRateLimit = settings.RegisterPositiveFloatSetting( - "kv.tenant_rate_limiter.requests.rate_limit", - "per-tenant request rate limit in requests per second", + readRequestRateLimit = settings.RegisterPositiveFloatSetting( + "kv.tenant_rate_limiter.read_requests.rate_limit", + "per-tenant read request rate limit in requests per second", + 128) + + readRequestBurstLimit = settings.RegisterPositiveIntSetting( + "kv.tenant_rate_limiter.read_requests.burst_limit", + "per-tenant read request burst limit in requests", + 512) + + writeRequestRateLimit = settings.RegisterPositiveFloatSetting( + "kv.tenant_rate_limiter.write_requests.rate_limit", + "per-tenant write request rate limit in requests per second", 128) - requestBurstLimit = settings.RegisterPositiveIntSetting( - "kv.tenant_rate_limiter.request.burst_limit", - "per-tenant request burst limit in requests", + writeRequestBurstLimit = settings.RegisterPositiveIntSetting( + "kv.tenant_rate_limiter.write_requests.burst_limit", + "per-tenant write request burst limit in requests", 512) readRateLimit = settings.RegisterByteSizeSetting( @@ -86,8 +101,10 @@ var ( // settingsSetOnChangeFuncs are the functions used to register the factory to // be notified of changes to any of the settings which configure it. settingsSetOnChangeFuncs = [...]func(*settings.Values, func()){ - requestRateLimit.SetOnChange, - requestBurstLimit.SetOnChange, + readRequestRateLimit.SetOnChange, + readRequestBurstLimit.SetOnChange, + writeRequestRateLimit.SetOnChange, + writeRequestBurstLimit.SetOnChange, readRateLimit.SetOnChange, readBurstLimit.SetOnChange, writeRateLimit.SetOnChange, diff --git a/pkg/kv/kvserver/tenantrate/system_limiter.go b/pkg/kv/kvserver/tenantrate/system_limiter.go index e065f8da01f7..ad8e48e60891 100644 --- a/pkg/kv/kvserver/tenantrate/system_limiter.go +++ b/pkg/kv/kvserver/tenantrate/system_limiter.go @@ -18,9 +18,13 @@ type systemLimiter struct { tenantMetrics } -func (s systemLimiter) Wait(ctx context.Context, writeBytes int64) error { +func (s systemLimiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error { + if isWrite { + s.writeRequestsAdmitted.Inc(1) + } else { + s.readRequestsAdmitted.Inc(1) + } s.writeBytesAdmitted.Inc(writeBytes) - s.requestsAdmitted.Inc(1) return nil } diff --git a/pkg/kv/kvserver/tenantrate/testdata/basic b/pkg/kv/kvserver/tenantrate/testdata/basic index b428f36bb9aa..77a8db5de8d8 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/basic +++ b/pkg/kv/kvserver/tenantrate/testdata/basic @@ -1,7 +1,8 @@ init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 1024, burst: 2048 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 1024, burst: 2048 } ---- 00:00:00.000 @@ -12,31 +13,35 @@ get_tenants ---- [2#2, 3#1, 5#3, system#1] -# Launch two requests on behalf of tenant 2 and one on behalf of 3. +# Launch four requests on behalf of tenant 2, one on behalf of 3, and one on +# behalf of the system tenant. launch - { id: g0, tenant: 1 } - { id: g1, tenant: 2 } - { id: g2, tenant: 2 } -- { id: g3, tenant: 3 } +- { id: g3, tenant: 2, iswrite: true } +- { id: g4, tenant: 2, iswrite: true } +- { id: g5, tenant: 3 } ---- -[g0@system, g1@2, g2@2, g3@3] +[g0@system, g1@2, g2@2, g3@2, g4@2, g5@3] # Ensure that none of the above requests get blocked because they use less # than the configured burst for their respective limiters. await -[g0, g1, g2, g3] +[g0, g1, g2, g3, g4, g5] ---- [] -# Launch another request on behalf of tenant 2, it will block due to the request -# rate limit. +# Launch another read and another write request on behalf of tenant 2, it will +# block due to the request rate limit. launch -- { id: g4, tenant: 2 } +- { id: g6, tenant: 2 } +- { id: g7, tenant: 2, iswrite: true } ---- -[g4@2] +[g6@2, g7@2] # Ensure that it the above request was blocked by observing the timer it creates # to wait for available quota. @@ -46,29 +51,34 @@ timers 00:00:01.000 # Observe that the "current_blocked" counter has appropriate values to indicate -# that there is one blocked request for tenant 2 and total. +# that there are two blocked request for tenant 2 and total. metrics current_blocked ---- -kv_tenant_rate_limit_current_blocked 1 -kv_tenant_rate_limit_current_blocked{tenant_id="2"} 1 +kv_tenant_rate_limit_current_blocked 2 +kv_tenant_rate_limit_current_blocked{tenant_id="2"} 2 kv_tenant_rate_limit_current_blocked{tenant_id="3"} 0 kv_tenant_rate_limit_current_blocked{tenant_id="5"} 0 kv_tenant_rate_limit_current_blocked{tenant_id="system"} 0 -# Observe that the "requests_admitted" counter has appropriate values to +# Observe that the "requests_admitted" counters has appropriate values to # indicate that requests have been admitted. metrics -kv_tenant_rate_limit_requests_admitted +kv_tenant_rate_limit_.*_requests_admitted ---- -kv_tenant_rate_limit_requests_admitted 4 -kv_tenant_rate_limit_requests_admitted{tenant_id="2"} 2 -kv_tenant_rate_limit_requests_admitted{tenant_id="3"} 1 -kv_tenant_rate_limit_requests_admitted{tenant_id="5"} 0 -kv_tenant_rate_limit_requests_admitted{tenant_id="system"} 1 +kv_tenant_rate_limit_read_requests_admitted 4 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 2 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="3"} 1 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="5"} 0 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 1 +kv_tenant_rate_limit_write_requests_admitted 2 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 2 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="3"} 0 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="5"} 0 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="system"} 0 # Release the tenant 3 rate limiter. @@ -91,9 +101,9 @@ advance ---- 00:00:01.001 -# Observe that the blocked request is now unblocked. +# Observe that the blocked requests are now unblocked. await -- g4 +[g6, g7] ---- [] diff --git a/pkg/kv/kvserver/tenantrate/testdata/burst b/pkg/kv/kvserver/tenantrate/testdata/burst index 6624677e9b00..9ed29bd12113 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/burst +++ b/pkg/kv/kvserver/tenantrate/testdata/burst @@ -2,9 +2,10 @@ # into debt. init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 10, burst: 20 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 10, burst: 20 } ---- 00:00:00.000 @@ -19,7 +20,7 @@ get_tenants # limit. This will not block but will put the limiter into debt. launch -- { id: g1, tenant: 2, writebytes: 30 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } ---- [g1@2] @@ -33,7 +34,7 @@ await # 10/s. launch -- { id: g1, tenant: 2, writebytes: 10 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 10 } ---- [g1@2] @@ -67,7 +68,7 @@ await # is above the burst of 20, we'll need to wait for the bucket to be full. launch -- { id: g1, tenant: 2, writebytes: 30 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } ---- [g1@2] diff --git a/pkg/kv/kvserver/tenantrate/testdata/cancel b/pkg/kv/kvserver/tenantrate/testdata/cancel index 6cc28b14da76..77ce14d13d5f 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/cancel +++ b/pkg/kv/kvserver/tenantrate/testdata/cancel @@ -1,9 +1,10 @@ # This tests cancellation and unblocking subsequent requests. init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 1024, burst: 2048 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 1024, burst: 2048 } ---- 00:00:00.000 @@ -15,7 +16,7 @@ get_tenants # Launch a request to consume half of the 1024 capacity. launch -- { id: g1, tenant: 2, writebytes: 1024 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 1024 } ---- [g1@2] @@ -27,7 +28,7 @@ await # Launch a request requiring more quota than exists. launch -- { id: g2, tenant: 2, writebytes: 1536 } +- { id: g2, tenant: 2, iswrite: true, writebytes: 1536 } ---- [g2@2] @@ -40,7 +41,7 @@ timers # Launch another request which could be fulfilled by the existing quota. launch -- { id: g3, tenant: 2, writebytes: 1024 } +- { id: g3, tenant: 2, iswrite: true, writebytes: 1024 } ---- [g2@2, g3@2] diff --git a/pkg/kv/kvserver/tenantrate/testdata/reads b/pkg/kv/kvserver/tenantrate/testdata/reads index e9b686ffb8dd..9f99d85c5c17 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/reads +++ b/pkg/kv/kvserver/tenantrate/testdata/reads @@ -2,8 +2,8 @@ # into debt init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 10, burst: 100 } +readrequests: { rate: 1, burst: 2 } +readbytes: { rate: 10, burst: 100 } ---- 00:00:00.000 diff --git a/pkg/kv/kvserver/tenantrate/testdata/update b/pkg/kv/kvserver/tenantrate/testdata/update index 88dda006b8fa..fefc8e88364e 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/update +++ b/pkg/kv/kvserver/tenantrate/testdata/update @@ -1,9 +1,10 @@ # Test updating the configuration of the rate limiter. init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 10, burst: 20 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 10, burst: 20 } ---- 00:00:00.000 @@ -15,7 +16,7 @@ get_tenants # Launch a request that puts the limiter in debt by 10. launch -- { id: g1, tenant: 2, writebytes: 30 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } ---- [g1@2] @@ -28,7 +29,7 @@ await # with the current debt. launch -- { id: g1, tenant: 2, writebytes: 20 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 20 } ---- [g1@2] diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index b14529e5bf27..17e00d2d0e9c 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1348,8 +1348,10 @@ func (t *logicTest) setup(cfg testClusterConfig, serverArgs TestServerArgs) { // Increase tenant rate limits for faster tests. conn := t.cluster.ServerConn(0) for _, settingName := range []string{ - "kv.tenant_rate_limiter.requests.rate_limit", - "kv.tenant_rate_limiter.request.burst_limit", + "kv.tenant_rate_limiter.read_requests.rate_limit", + "kv.tenant_rate_limiter.read_requests.burst_limit", + "kv.tenant_rate_limiter.write_requests.rate_limit", + "kv.tenant_rate_limiter.write_requests.burst_limit", } { if _, err := conn.Exec( fmt.Sprintf("SET CLUSTER SETTING %s = %d", settingName, 100000), diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index d17831605091..5466f5962fe7 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -743,16 +743,16 @@ var charts = []sectionDescription{ Metrics: []string{"kv.tenant_rate_limit.num_tenants"}, }, { - Title: "Requests Admitted by Rate Limiter", + Title: "Read Requests Admitted by Rate Limiter", Downsampler: DescribeAggregator_MAX, Percentiles: false, - Metrics: []string{"kv.tenant_rate_limit.requests_admitted"}, + Metrics: []string{"kv.tenant_rate_limit.read_requests_admitted"}, }, { - Title: "Write Bytes Admitted by Rate Limiter", + Title: "Write Requests Admitted by Rate Limiter", Downsampler: DescribeAggregator_MAX, Percentiles: false, - Metrics: []string{"kv.tenant_rate_limit.write_bytes_admitted"}, + Metrics: []string{"kv.tenant_rate_limit.write_requests_admitted"}, }, { Title: "Read Bytes Admitted by Rate Limiter", @@ -760,6 +760,12 @@ var charts = []sectionDescription{ Percentiles: false, Metrics: []string{"kv.tenant_rate_limit.read_bytes_admitted"}, }, + { + Title: "Write Bytes Admitted by Rate Limiter", + Downsampler: DescribeAggregator_MAX, + Percentiles: false, + Metrics: []string{"kv.tenant_rate_limit.write_bytes_admitted"}, + }, }, }, {