diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go index 08e6592f5bca..bc4284f33179 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go @@ -348,12 +348,12 @@ func (c *tenantSideCostController) updateRunState(ctx context.Context) { if deltaCPU < 0 { deltaCPU = 0 } - ru := deltaCPU * float64(c.costCfg.PodCPUSecond) + ru := float64(c.costCfg.PodCPUCost(deltaCPU)) var deltaPGWireEgressBytes uint64 if newExternalUsage.PGWireEgressBytes > c.run.externalUsage.PGWireEgressBytes { deltaPGWireEgressBytes = newExternalUsage.PGWireEgressBytes - c.run.externalUsage.PGWireEgressBytes - ru += float64(deltaPGWireEgressBytes) * float64(c.costCfg.PGWireEgressByte) + ru += float64(c.costCfg.PGWireEgressCost(int64(deltaPGWireEgressBytes))) } // KV RUs are not included here, these metrics correspond only to the SQL pod. @@ -657,24 +657,25 @@ func (c *tenantSideCostController) OnResponse( if multitenant.HasTenantCostControlExemption(ctx) { return } - if resp.ReadBytes() > 0 { + if resp.IsRead() { c.limiter.RemoveTokens(c.timeSource.Now(), c.costCfg.ResponseCost(resp)) } c.mu.Lock() defer c.mu.Unlock() - if isWrite, writeBytes := req.IsWrite(); isWrite { - c.mu.consumption.WriteRequests++ - c.mu.consumption.WriteBytes += uint64(writeBytes) - writeRU := float64(c.costCfg.KVWriteCost(writeBytes)) + if req.IsWrite() { + c.mu.consumption.WriteBatches++ + c.mu.consumption.WriteRequests += uint64(req.WriteCount()) + c.mu.consumption.WriteBytes += uint64(req.WriteBytes()) + writeRU := float64(c.costCfg.RequestCost(req)) c.mu.consumption.KVRU += writeRU c.mu.consumption.RU += writeRU - } else { - c.mu.consumption.ReadRequests++ - readBytes := resp.ReadBytes() - c.mu.consumption.ReadBytes += uint64(readBytes) - readRU := float64(c.costCfg.KVReadCost(readBytes)) + } else if resp.IsRead() { + c.mu.consumption.ReadBatches++ + c.mu.consumption.ReadRequests += uint64(resp.ReadCount()) + c.mu.consumption.ReadBytes += uint64(resp.ReadBytes()) + readRU := float64(c.costCfg.ResponseCost(resp)) c.mu.consumption.KVRU += readRU c.mu.consumption.RU += readRU } @@ -692,7 +693,8 @@ func (c *tenantSideCostController) shouldAccountForExternalIORUs() bool { return c.modeMu.externalIORUAccountingMode != externalIORUAccountingOff } -// ExternalIOWriteWait is part of the multitenant.TenantSideExternalIORecorder interface. +// ExternalIOWriteWait is part of the multitenant.TenantSideExternalIORecorder +// interface. func (c *tenantSideCostController) ExternalIOWriteWait(ctx context.Context, bytes int64) error { if !c.shouldWaitForExternalIORUs() { return nil @@ -704,7 +706,8 @@ func (c *tenantSideCostController) ExternalIOWriteWait(ctx context.Context, byte return c.limiter.Wait(ctx, ru) } -// ExternalIOWriteScucess is part of the multitenant.TenantSideExternalIORecorder interface. +// ExternalIOWriteSuccess is part of the multitenant.TenantSideExternalIORecorder +// interface. func (c *tenantSideCostController) ExternalIOWriteSuccess(ctx context.Context, bytes int64) { if multitenant.HasTenantCostControlExemption(ctx) { return @@ -718,7 +721,8 @@ func (c *tenantSideCostController) ExternalIOWriteSuccess(ctx context.Context, b c.mu.Unlock() } -// ExternalIOWriteFailure is part of the multitenant.TenantSideExternalIORecorder interface. +// ExternalIOWriteFailure is part of the multitenant.TenantSideExternalIORecorder +// interface. // // The given byte count should be the number of bytes that were never // actually written. @@ -741,8 +745,8 @@ func (c *tenantSideCostController) ExternalIOWriteFailure( c.mu.Unlock() } -// ExternalIOReadWait is part of the multitenant.TenantSideExternalIORecorder interface. We wait -// after adding the RUs since these reads already happened. +// ExternalIOReadWait is part of the multitenant.TenantSideExternalIORecorder +// interface. We wait after adding the RUs since these reads already happened. func (c *tenantSideCostController) ExternalIOReadWait(ctx context.Context, bytes int64) error { if multitenant.HasTenantCostControlExemption(ctx) { return nil diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go index 509398f6631c..6a45ffe472f9 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go @@ -293,14 +293,16 @@ func (ts *testState) asyncRequest( func (ts *testState) request( t *testing.T, d *datadriven.TestData, isWrite bool, args cmdArgs, ) string { - var writeBytes, readBytes int64 + var writeCount, readCount, writeBytes, readBytes int64 if isWrite { + writeCount = 1 writeBytes = args.bytes } else { + readCount = 1 readBytes = args.bytes } - reqInfo := tenantcostmodel.TestingRequestInfo(isWrite, writeBytes) - respInfo := tenantcostmodel.TestingResponseInfo(readBytes) + reqInfo := tenantcostmodel.TestingRequestInfo(writeCount, writeBytes) + respInfo := tenantcostmodel.TestingResponseInfo(readCount, readBytes) if args.label == "" { ts.syncRequest(t, d, reqInfo, respInfo) } else { @@ -525,8 +527,8 @@ func (ts *testState) usage(t *testing.T, d *datadriven.TestData, args cmdArgs) s return fmt.Sprintf(""+ "RU: %.2f\n"+ "KVRU: %.2f\n"+ - "Reads: %d requests (%d bytes)\n"+ - "Writes: %d requests (%d bytes)\n"+ + "Reads: %d requests in %d batches (%d bytes)\n"+ + "Writes: %d requests in %d batches (%d bytes)\n"+ "SQL Pods CPU seconds: %.2f\n"+ "PGWire egress: %d bytes\n"+ "ExternalIO egress: %d bytes\n"+ @@ -534,8 +536,10 @@ func (ts *testState) usage(t *testing.T, d *datadriven.TestData, args cmdArgs) s c.RU, c.KVRU, c.ReadRequests, + c.ReadBatches, c.ReadBytes, c.WriteRequests, + c.WriteBatches, c.WriteBytes, c.SQLPodsCPUSeconds, c.PGWireEgressBytes, @@ -668,19 +672,21 @@ func TestConsumption(t *testing.T) { }, }) r := sqlutils.MakeSQLRunner(tenantDB) - r.Exec(t, "CREATE TABLE t (v STRING)") + // Create a secondary index to ensure that writes to both indexes are + // recorded in metrics. + r.Exec(t, "CREATE TABLE t (v STRING, w STRING, INDEX (w, v))") // Do some writes and reads and check the reported consumption. Repeat the // test a few times, since background requests can trick the test into // passing. for repeat := 0; repeat < 5; repeat++ { beforeWrite := testProvider.waitForConsumption(t) - r.Exec(t, "INSERT INTO t SELECT repeat('1234567890', 1024) FROM generate_series(1, 10) AS g(i)") + r.Exec(t, "INSERT INTO t (v) SELECT repeat('1234567890', 1024) FROM generate_series(1, 10) AS g(i)") const expectedBytes = 10 * 10 * 1024 afterWrite := testProvider.waitForConsumption(t) delta := afterWrite delta.Sub(&beforeWrite) - if delta.WriteRequests < 1 || delta.WriteBytes < expectedBytes { + if delta.WriteBatches < 1 || delta.WriteRequests < 2 || delta.WriteBytes < expectedBytes*2 { t.Errorf("usage after write: %s", delta.String()) } @@ -689,7 +695,7 @@ func TestConsumption(t *testing.T) { afterRead := testProvider.waitForConsumption(t) delta = afterRead delta.Sub(&afterWrite) - if delta.ReadRequests < 1 || delta.ReadBytes < expectedBytes { + if delta.ReadBatches < 1 || delta.ReadRequests < 1 || delta.ReadBytes < expectedBytes { t.Errorf("usage after read: %s", delta.String()) } r.Exec(t, "DELETE FROM t WHERE true") diff --git a/pkg/ccl/multitenantccl/tenantcostclient/testdata/consumption b/pkg/ccl/multitenantccl/tenantcostclient/testdata/consumption index 88648ec8636d..db70fa31f85d 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/testdata/consumption +++ b/pkg/ccl/multitenantccl/tenantcostclient/testdata/consumption @@ -4,8 +4,8 @@ usage ---- RU: 0.00 KVRU: 0.00 -Reads: 0 requests (0 bytes) -Writes: 0 requests (0 bytes) +Reads: 0 requests in 0 batches (0 bytes) +Writes: 0 requests in 0 batches (0 bytes) SQL Pods CPU seconds: 0.00 PGWire egress: 0 bytes ExternalIO egress: 0 bytes @@ -16,10 +16,10 @@ read bytes=1024000 usage ---- -RU: 105.83 -KVRU: 105.83 -Reads: 1 requests (1024000 bytes) -Writes: 0 requests (0 bytes) +RU: 5.77 +KVRU: 5.77 +Reads: 0 requests in 0 batches (0 bytes) +Writes: 0 requests in 1 batches (0 bytes) SQL Pods CPU seconds: 0.00 PGWire egress: 0 bytes ExternalIO egress: 0 bytes @@ -30,10 +30,10 @@ write bytes=1024 usage ---- -RU: 113.58 -KVRU: 113.58 -Reads: 1 requests (1024000 bytes) -Writes: 1 requests (1024 bytes) +RU: 19.30 +KVRU: 19.30 +Reads: 0 requests in 0 batches (0 bytes) +Writes: 1 requests in 2 batches (1024 bytes) SQL Pods CPU seconds: 0.00 PGWire egress: 0 bytes ExternalIO egress: 0 bytes @@ -45,10 +45,10 @@ cpu usage ---- -RU: 1113.58 -KVRU: 113.58 -Reads: 1 requests (1024000 bytes) -Writes: 1 requests (1024 bytes) +RU: 1019.30 +KVRU: 19.30 +Reads: 0 requests in 0 batches (0 bytes) +Writes: 1 requests in 2 batches (1024 bytes) SQL Pods CPU seconds: 1.00 PGWire egress: 0 bytes ExternalIO egress: 0 bytes @@ -65,10 +65,10 @@ write bytes=4096 usage ---- -RU: 1148.39 -KVRU: 148.39 -Reads: 2 requests (1089536 bytes) -Writes: 3 requests (9216 bytes) +RU: 1064.00 +KVRU: 64.00 +Reads: 0 requests in 0 batches (0 bytes) +Writes: 3 requests in 5 batches (9216 bytes) SQL Pods CPU seconds: 1.00 PGWire egress: 0 bytes ExternalIO egress: 0 bytes @@ -80,10 +80,10 @@ cpu usage ---- -RU: 3601148.39 -KVRU: 148.39 -Reads: 2 requests (1089536 bytes) -Writes: 3 requests (9216 bytes) +RU: 3601064.00 +KVRU: 64.00 +Reads: 0 requests in 0 batches (0 bytes) +Writes: 3 requests in 5 batches (9216 bytes) SQL Pods CPU seconds: 3601.00 PGWire egress: 0 bytes ExternalIO egress: 0 bytes @@ -95,10 +95,10 @@ pgwire-egress usage ---- -RU: 3601158.74 -KVRU: 148.39 -Reads: 2 requests (1089536 bytes) -Writes: 3 requests (9216 bytes) +RU: 3601074.34 +KVRU: 64.00 +Reads: 0 requests in 0 batches (0 bytes) +Writes: 3 requests in 5 batches (9216 bytes) SQL Pods CPU seconds: 3601.00 PGWire egress: 12345 bytes ExternalIO egress: 0 bytes diff --git a/pkg/ccl/multitenantccl/tenantcostclient/testdata/debt b/pkg/ccl/multitenantccl/tenantcostclient/testdata/debt index 4703894e5402..809fe6a3c53e 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/testdata/debt +++ b/pkg/ccl/multitenantccl/tenantcostclient/testdata/debt @@ -17,7 +17,7 @@ token-bucket-response timers ---- -00:00:01.600 +00:00:01.606 00:00:09.000 advance @@ -43,7 +43,7 @@ write bytes=1024 label=w2 timers ---- -00:00:02.011 +00:00:02.019 00:00:09.000 advance diff --git a/pkg/ccl/multitenantccl/tenantcostclient/testdata/fallback b/pkg/ccl/multitenantccl/tenantcostclient/testdata/fallback index ca6687e9ebe5..7befe3b0b9af 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/testdata/fallback +++ b/pkg/ccl/multitenantccl/tenantcostclient/testdata/fallback @@ -40,7 +40,7 @@ tick timers ---- -00:00:29.653 +00:00:29.660 advance 10s diff --git a/pkg/ccl/multitenantccl/tenantcostclient/testdata/fallback-throttled b/pkg/ccl/multitenantccl/tenantcostclient/testdata/fallback-throttled index 9412917e4880..713f60b2f9df 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/testdata/fallback-throttled +++ b/pkg/ccl/multitenantccl/tenantcostclient/testdata/fallback-throttled @@ -26,7 +26,7 @@ token-bucket-response timers ---- 00:00:09.000 -00:04:46.544 +00:04:46.601 configure error: true @@ -45,7 +45,7 @@ low-ru timers ---- -00:04:46.544 +00:04:46.601 advance 2s @@ -60,7 +60,7 @@ tick timers ---- -00:00:39.454 +00:00:39.460 not-completed label=w2 ---- diff --git a/pkg/ccl/multitenantccl/tenantcostclient/testdata/throttling b/pkg/ccl/multitenantccl/tenantcostclient/testdata/throttling index ae9bf3f5c04a..38489b5b28ec 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/testdata/throttling +++ b/pkg/ccl/multitenantccl/tenantcostclient/testdata/throttling @@ -43,7 +43,7 @@ token-bucket-response timers ---- 00:00:11.000 -00:00:11.359 +00:00:11.391 advance 1s @@ -75,7 +75,7 @@ token-bucket-response timers ---- 00:00:22.000 -00:00:50.013 +00:00:50.051 advance 20s @@ -88,7 +88,7 @@ await label=w1 timers ---- 00:00:42.000 -00:00:50.013 +00:00:50.051 not-completed label=w2 ---- diff --git a/pkg/ccl/multitenantccl/tenantcostserver/metrics.go b/pkg/ccl/multitenantccl/tenantcostserver/metrics.go index d5e7cb89fc4d..939f16804acb 100644 --- a/pkg/ccl/multitenantccl/tenantcostserver/metrics.go +++ b/pkg/ccl/multitenantccl/tenantcostserver/metrics.go @@ -27,8 +27,10 @@ import ( type Metrics struct { TotalRU *aggmetric.AggGaugeFloat64 TotalKVRU *aggmetric.AggGaugeFloat64 + TotalReadBatches *aggmetric.AggGauge TotalReadRequests *aggmetric.AggGauge TotalReadBytes *aggmetric.AggGauge + TotalWriteBatches *aggmetric.AggGauge TotalWriteRequests *aggmetric.AggGauge TotalWriteBytes *aggmetric.AggGauge TotalSQLPodsCPUSeconds *aggmetric.AggGaugeFloat64 @@ -63,6 +65,12 @@ var ( Measurement: "Request Units", Unit: metric.Unit_COUNT, } + metaTotalReadBatches = metric.Metadata{ + Name: "tenant.consumption.read_batches", + Help: "Total number of KV read batches", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } metaTotalReadRequests = metric.Metadata{ Name: "tenant.consumption.read_requests", Help: "Total number of KV read requests", @@ -75,6 +83,12 @@ var ( Measurement: "Bytes", Unit: metric.Unit_COUNT, } + metaTotalWriteBatches = metric.Metadata{ + Name: "tenant.consumption.write_batches", + Help: "Total number of KV write batches", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } metaTotalWriteRequests = metric.Metadata{ Name: "tenant.consumption.write_requests", Help: "Total number of KV write requests", @@ -118,8 +132,10 @@ func (m *Metrics) init() { *m = Metrics{ TotalRU: b.GaugeFloat64(metaTotalRU), TotalKVRU: b.GaugeFloat64(metaTotalKVRU), + TotalReadBatches: b.Gauge(metaTotalReadBatches), TotalReadRequests: b.Gauge(metaTotalReadRequests), TotalReadBytes: b.Gauge(metaTotalReadBytes), + TotalWriteBatches: b.Gauge(metaTotalWriteBatches), TotalWriteRequests: b.Gauge(metaTotalWriteRequests), TotalWriteBytes: b.Gauge(metaTotalWriteBytes), TotalSQLPodsCPUSeconds: b.GaugeFloat64(metaTotalSQLPodsCPUSeconds), @@ -134,8 +150,10 @@ func (m *Metrics) init() { type tenantMetrics struct { totalRU *aggmetric.GaugeFloat64 totalKVRU *aggmetric.GaugeFloat64 + totalReadBatches *aggmetric.Gauge totalReadRequests *aggmetric.Gauge totalReadBytes *aggmetric.Gauge + totalWriteBatches *aggmetric.Gauge totalWriteRequests *aggmetric.Gauge totalWriteBytes *aggmetric.Gauge totalSQLPodsCPUSeconds *aggmetric.GaugeFloat64 @@ -158,8 +176,10 @@ func (m *Metrics) getTenantMetrics(tenantID roachpb.TenantID) tenantMetrics { tm = tenantMetrics{ totalRU: m.TotalRU.AddChild(tid), totalKVRU: m.TotalKVRU.AddChild(tid), + totalReadBatches: m.TotalReadBatches.AddChild(tid), totalReadRequests: m.TotalReadRequests.AddChild(tid), totalReadBytes: m.TotalReadBytes.AddChild(tid), + totalWriteBatches: m.TotalWriteBatches.AddChild(tid), totalWriteRequests: m.TotalWriteRequests.AddChild(tid), totalWriteBytes: m.TotalWriteBytes.AddChild(tid), totalSQLPodsCPUSeconds: m.TotalSQLPodsCPUSeconds.AddChild(tid), diff --git a/pkg/ccl/multitenantccl/tenantcostserver/server_test.go b/pkg/ccl/multitenantccl/tenantcostserver/server_test.go index 9eae789fa335..d0d8e018827a 100644 --- a/pkg/ccl/multitenantccl/tenantcostserver/server_test.go +++ b/pkg/ccl/multitenantccl/tenantcostserver/server_test.go @@ -141,8 +141,10 @@ func (ts *testState) tokenBucketRequest(t *testing.T, d *datadriven.TestData) st Consumption struct { RU float64 `yaml:"ru"` KVRU float64 `yaml:"kvru"` + ReadBatches uint64 `yaml:"read_batches"` ReadReq uint64 `yaml:"read_req"` ReadBytes uint64 `yaml:"read_bytes"` + WriteBatches uint64 `yaml:"write_batches"` WriteReq uint64 `yaml:"write_req"` WriteBytes uint64 `yaml:"write_bytes"` SQLPodsCPUUsage float64 `yaml:"sql_pods_cpu_usage"` @@ -177,8 +179,10 @@ func (ts *testState) tokenBucketRequest(t *testing.T, d *datadriven.TestData) st ConsumptionSinceLastRequest: roachpb.TenantConsumption{ RU: args.Consumption.RU, KVRU: args.Consumption.KVRU, + ReadBatches: args.Consumption.ReadBatches, ReadRequests: args.Consumption.ReadReq, ReadBytes: args.Consumption.ReadBytes, + WriteBatches: args.Consumption.WriteBatches, WriteRequests: args.Consumption.WriteReq, WriteBytes: args.Consumption.WriteBytes, SQLPodsCPUSeconds: args.Consumption.SQLPodsCPUUsage, diff --git a/pkg/ccl/multitenantccl/tenantcostserver/system_table.go b/pkg/ccl/multitenantccl/tenantcostserver/system_table.go index 963701579a1e..f519b0bc418f 100644 --- a/pkg/ccl/multitenantccl/tenantcostserver/system_table.go +++ b/pkg/ccl/multitenantccl/tenantcostserver/system_table.go @@ -581,12 +581,14 @@ func InspectTenantMetadata( tenant.Bucket.RUCurrent, tenant.Bucket.CurrentShareSum, ) - fmt.Fprintf(&buf, "Consumption: ru=%.12g kvru=%.12g reads=%d req/%d bytes writes=%d req/%d bytes pod-cpu-usage: %g secs pgwire-egress=%d bytes external-egress=%d bytes external-ingress=%d bytes\n", + fmt.Fprintf(&buf, "Consumption: ru=%.12g kvru=%.12g reads=%d in %d batches (%d bytes) writes=%d in %d batches (%d bytes) pod-cpu-usage: %g secs pgwire-egress=%d bytes external-egress=%d bytes external-ingress=%d bytes\n", tenant.Consumption.RU, tenant.Consumption.KVRU, tenant.Consumption.ReadRequests, + tenant.Consumption.ReadBatches, tenant.Consumption.ReadBytes, tenant.Consumption.WriteRequests, + tenant.Consumption.WriteBatches, tenant.Consumption.WriteBytes, tenant.Consumption.SQLPodsCPUSeconds, tenant.Consumption.PGWireEgressBytes, diff --git a/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go b/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go index d2be281cdf8f..3dc64e83b630 100644 --- a/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go +++ b/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go @@ -124,8 +124,10 @@ func (s *instance) TokenBucketRequest( // Report current consumption. metrics.totalRU.Update(consumption.RU) metrics.totalKVRU.Update(consumption.KVRU) + metrics.totalReadBatches.Update(int64(consumption.ReadBatches)) metrics.totalReadRequests.Update(int64(consumption.ReadRequests)) metrics.totalReadBytes.Update(int64(consumption.ReadBytes)) + metrics.totalWriteBatches.Update(int64(consumption.WriteBatches)) metrics.totalWriteRequests.Update(int64(consumption.WriteRequests)) metrics.totalWriteBytes.Update(int64(consumption.WriteBytes)) metrics.totalSQLPodsCPUSeconds.Update(consumption.SQLPodsCPUSeconds) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index f55c496b07be..80f54c40076c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -872,7 +872,7 @@ func (ds *DistSender) Send( reply.BatchResponse_Header = lastHeader if ds.kvInterceptor != nil { - respInfo := tenantcostmodel.MakeResponseInfo(reply) + respInfo := tenantcostmodel.MakeResponseInfo(reply, !reqInfo.IsWrite()) ds.kvInterceptor.OnResponse(ctx, reqInfo, respInfo) } } diff --git a/pkg/kv/kvserver/replica_rate_limit.go b/pkg/kv/kvserver/replica_rate_limit.go index a617da7f079a..363b1af743c9 100644 --- a/pkg/kv/kvserver/replica_rate_limit.go +++ b/pkg/kv/kvserver/replica_rate_limit.go @@ -31,11 +31,14 @@ func (r *Replica) maybeRateLimitBatch(ctx context.Context, ba *roachpb.BatchRequ return r.tenantLimiter.Wait(ctx, tenantcostmodel.MakeRequestInfo(ba)) } -// recordImpactOnRateLimiter is used to record a read against the tenant rate limiter. -func (r *Replica) recordImpactOnRateLimiter(ctx context.Context, br *roachpb.BatchResponse) { - if r.tenantLimiter == nil || br == nil { +// recordImpactOnRateLimiter is used to record a read against the tenant rate +// limiter. +func (r *Replica) recordImpactOnRateLimiter( + ctx context.Context, br *roachpb.BatchResponse, isReadOnly bool, +) { + if r.tenantLimiter == nil || br == nil || !isReadOnly { return } - r.tenantLimiter.RecordRead(ctx, tenantcostmodel.MakeResponseInfo(br)) + r.tenantLimiter.RecordRead(ctx, tenantcostmodel.MakeResponseInfo(br, isReadOnly)) } diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index b2736207ffa0..4ac84feccb7b 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -205,7 +205,7 @@ func (r *Replica) sendWithoutRangeID( r.maybeAddRangeInfoToResponse(ctx, ba, br) } - r.recordImpactOnRateLimiter(ctx, br) + r.recordImpactOnRateLimiter(ctx, br, isReadOnly) return br, pErr } diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go index 3de7c94c16c1..b6cb201533bb 100644 --- a/pkg/kv/kvserver/tenantrate/limiter.go +++ b/pkg/kv/kvserver/tenantrate/limiter.go @@ -117,13 +117,10 @@ func (rl *limiter) Wait(ctx context.Context, reqInfo tenantcostmodel.RequestInfo return err } - if isWrite, writeBytes := reqInfo.IsWrite(); isWrite { - rl.metrics.writeRequestsAdmitted.Inc(1) - rl.metrics.writeBytesAdmitted.Inc(writeBytes) - } else { - // We don't know how much we will read; the bytes will be accounted for - // after the fact in RecordRead. - rl.metrics.readRequestsAdmitted.Inc(1) + if reqInfo.IsWrite() { + rl.metrics.writeBatchesAdmitted.Inc(1) + rl.metrics.writeRequestsAdmitted.Inc(reqInfo.WriteCount()) + rl.metrics.writeBytesAdmitted.Inc(reqInfo.WriteBytes()) } return nil @@ -131,10 +128,14 @@ func (rl *limiter) Wait(ctx context.Context, reqInfo tenantcostmodel.RequestInfo // RecordRead is part of the Limiter interface. func (rl *limiter) RecordRead(ctx context.Context, respInfo tenantcostmodel.ResponseInfo) { + rl.metrics.readBatchesAdmitted.Inc(1) + rl.metrics.readRequestsAdmitted.Inc(respInfo.ReadCount()) rl.metrics.readBytesAdmitted.Inc(respInfo.ReadBytes()) rl.qp.Update(func(res quotapool.Resource) (shouldNotify bool) { tb := res.(*tokenBucket) - amount := float64(respInfo.ReadBytes()) * tb.config.ReadUnitsPerByte + amount := tb.config.ReadBatchUnits + amount += float64(respInfo.ReadCount()) * tb.config.ReadRequestUnits + amount += float64(respInfo.ReadBytes()) * tb.config.ReadUnitsPerByte tb.Adjust(quotapool.Tokens(-amount)) // Do not notify the head of the queue. In the best case we did not disturb // the time at which it can be fulfilled and in the worst case, we made it @@ -199,10 +200,15 @@ func (req *waitRequest) Acquire( ) (fulfilled bool, tryAgainAfter time.Duration) { tb := res.(*tokenBucket) var needed float64 - if isWrite, writeBytes := req.info.IsWrite(); isWrite { - needed = tb.config.WriteRequestUnits + float64(writeBytes)*tb.config.WriteUnitsPerByte + if req.info.IsWrite() { + needed = tb.config.WriteBatchUnits + needed += float64(req.info.WriteCount()) * tb.config.WriteRequestUnits + needed += float64(req.info.WriteBytes()) * tb.config.WriteUnitsPerByte } else { - needed = tb.config.ReadRequestUnits + // Only acquire tokens for read requests once the response has been + // received. However, TryToFulfill still needs to be called with a zero + // value, in case the quota pool is in debt and the read should block. + needed = 0 } return tb.TryToFulfill(quotapool.Tokens(needed)) } diff --git a/pkg/kv/kvserver/tenantrate/limiter_test.go b/pkg/kv/kvserver/tenantrate/limiter_test.go index ab59841d2397..9d857f5df4bd 100644 --- a/pkg/kv/kvserver/tenantrate/limiter_test.go +++ b/pkg/kv/kvserver/tenantrate/limiter_test.go @@ -50,9 +50,9 @@ func TestCloser(t *testing.T) { ctx := context.Background() limiter := factory.GetTenant(ctx, tenant, closer) // First Wait call will not block. - require.NoError(t, limiter.Wait(ctx, tenantcostmodel.TestingRequestInfo(true, 1))) + require.NoError(t, limiter.Wait(ctx, tenantcostmodel.TestingRequestInfo(1, 1))) errCh := make(chan error, 1) - go func() { errCh <- limiter.Wait(ctx, tenantcostmodel.TestingRequestInfo(true, 1<<30)) }() + go func() { errCh <- limiter.Wait(ctx, tenantcostmodel.TestingRequestInfo(1, 1<<30)) }() testutils.SucceedsSoon(t, func() error { if timers := timeSource.Timers(); len(timers) != 1 { return errors.Errorf("expected 1 timer, found %d", len(timers)) @@ -87,7 +87,7 @@ type launchState struct { tenantID roachpb.TenantID ctx context.Context cancel context.CancelFunc - isWrite bool + writeCount int64 writeBytes int64 reserveCh chan error } @@ -208,7 +208,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { var cmds []struct { ID string Tenant uint64 - IsWrite bool + WriteCount int64 WriteBytes int64 } if err := yaml.UnmarshalStrict([]byte(d.Input), &cmds); err != nil { @@ -220,7 +220,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { s.tenantID = roachpb.MakeTenantID(cmd.Tenant) s.ctx, s.cancel = context.WithCancel(context.Background()) s.reserveCh = make(chan error, 1) - s.isWrite = cmd.IsWrite + s.writeCount = cmd.WriteCount s.writeBytes = cmd.WriteBytes ts.running[s.id] = &s lims := ts.tenants[s.tenantID] @@ -230,7 +230,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { go func() { // We'll not worry about ever releasing tenant Limiters. s.reserveCh <- lims[0].Wait( - s.ctx, tenantcostmodel.TestingRequestInfo(s.isWrite, s.writeBytes), + s.ctx, tenantcostmodel.TestingRequestInfo(s.writeCount, s.writeBytes), ) }() } @@ -303,19 +303,20 @@ func (ts *testState) cancel(t *testing.T, d *datadriven.TestData) string { } // recordRead accounts for bytes read from a request. It takes as input a -// yaml list with fields tenant and readbytes. It returns the set of tasks -// currently running like launch, await, and cancel. +// yaml list with fields tenant, readcount, and readbytes. It returns the set of +// tasks currently running like launch, await, and cancel. // // For example: // // record_read -// - { tenant: 2, readbytes: 32 } +// - { tenant: 2, readcount: 1, readbytes: 32 } // ---- // [a@2] // func (ts *testState) recordRead(t *testing.T, d *datadriven.TestData) string { var reads []struct { Tenant uint64 + ReadCount int64 ReadBytes int64 } if err := yaml.UnmarshalStrict([]byte(d.Input), &reads); err != nil { @@ -327,7 +328,7 @@ func (ts *testState) recordRead(t *testing.T, d *datadriven.TestData) string { if len(lims) == 0 { d.Fatalf(t, "no outstanding limiters for %v", tid) } - lims[0].RecordRead(context.Background(), tenantcostmodel.TestingResponseInfo(r.ReadBytes)) + lims[0].RecordRead(context.Background(), tenantcostmodel.TestingResponseInfo(r.ReadCount, r.ReadBytes)) } return ts.FormatRunning() } @@ -519,8 +520,10 @@ func (ts *testState) estimateIOPS(t *testing.T, d *datadriven.TestData) string { config := tenantrate.DefaultConfig() calculateIOPS := func(rate float64) float64 { - readCost := config.ReadRequestUnits + float64(workload.ReadSize)*config.ReadUnitsPerByte - writeCost := config.WriteRequestUnits + float64(workload.WriteSize)*config.WriteUnitsPerByte + readCost := config.ReadBatchUnits + config.ReadRequestUnits + + float64(workload.ReadSize)*config.ReadUnitsPerByte + writeCost := config.WriteBatchUnits + config.WriteRequestUnits + + float64(workload.WriteSize)*config.WriteUnitsPerByte readFraction := float64(workload.ReadPercentage) / 100.0 avgCost := readFraction*readCost + (1-readFraction)*writeCost return rate / avgCost @@ -621,8 +624,10 @@ func parseSettings(t *testing.T, d *datadriven.TestData, config *tenantrate.Conf } override(&config.Rate, vals.Rate) override(&config.Burst, vals.Burst) + override(&config.ReadBatchUnits, vals.Read.Base) override(&config.ReadRequestUnits, vals.Read.Base) override(&config.ReadUnitsPerByte, vals.Read.PerByte) + override(&config.WriteBatchUnits, vals.Write.Base) override(&config.WriteRequestUnits, vals.Write.Base) override(&config.WriteUnitsPerByte, vals.Write.PerByte) } diff --git a/pkg/kv/kvserver/tenantrate/metrics.go b/pkg/kv/kvserver/tenantrate/metrics.go index 585c569bda84..9f1008999782 100644 --- a/pkg/kv/kvserver/tenantrate/metrics.go +++ b/pkg/kv/kvserver/tenantrate/metrics.go @@ -21,6 +21,8 @@ import ( type Metrics struct { Tenants *metric.Gauge CurrentBlocked *aggmetric.AggGauge + ReadBatchesAdmitted *aggmetric.AggCounter + WriteBatchesAdmitted *aggmetric.AggCounter ReadRequestsAdmitted *aggmetric.AggCounter WriteRequestsAdmitted *aggmetric.AggCounter ReadBytesAdmitted *aggmetric.AggCounter @@ -42,6 +44,18 @@ var ( Measurement: "Requests", Unit: metric.Unit_COUNT, } + metaReadBatchesAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.read_batches_admitted", + Help: "Number of read batches admitted by the rate limiter", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } + metaWriteBatchesAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.write_batches_admitted", + Help: "Number of write batches admitted by the rate limiter", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } metaReadRequestsAdmitted = metric.Metadata{ Name: "kv.tenant_rate_limit.read_requests_admitted", Help: "Number of read requests admitted by the rate limiter", @@ -73,6 +87,8 @@ func makeMetrics() Metrics { return Metrics{ Tenants: metric.NewGauge(metaTenants), CurrentBlocked: b.Gauge(metaCurrentBlocked), + ReadBatchesAdmitted: b.Counter(metaReadBatchesAdmitted), + WriteBatchesAdmitted: b.Counter(metaWriteBatchesAdmitted), ReadRequestsAdmitted: b.Counter(metaReadRequestsAdmitted), WriteRequestsAdmitted: b.Counter(metaWriteRequestsAdmitted), ReadBytesAdmitted: b.Counter(metaReadBytesAdmitted), @@ -86,6 +102,8 @@ func (m *Metrics) MetricStruct() {} // tenantMetrics represent metrics for an individual tenant. type tenantMetrics struct { currentBlocked *aggmetric.Gauge + readBatchesAdmitted *aggmetric.Counter + writeBatchesAdmitted *aggmetric.Counter readRequestsAdmitted *aggmetric.Counter writeRequestsAdmitted *aggmetric.Counter readBytesAdmitted *aggmetric.Counter @@ -96,6 +114,8 @@ func (m *Metrics) tenantMetrics(tenantID roachpb.TenantID) tenantMetrics { tid := tenantID.String() return tenantMetrics{ currentBlocked: m.CurrentBlocked.AddChild(tid), + readBatchesAdmitted: m.ReadBatchesAdmitted.AddChild(tid), + writeBatchesAdmitted: m.WriteBatchesAdmitted.AddChild(tid), readRequestsAdmitted: m.ReadRequestsAdmitted.AddChild(tid), writeRequestsAdmitted: m.WriteRequestsAdmitted.AddChild(tid), readBytesAdmitted: m.ReadBytesAdmitted.AddChild(tid), @@ -105,6 +125,8 @@ func (m *Metrics) tenantMetrics(tenantID roachpb.TenantID) tenantMetrics { func (tm *tenantMetrics) destroy() { tm.currentBlocked.Destroy() + tm.readBatchesAdmitted.Destroy() + tm.writeBatchesAdmitted.Destroy() tm.readRequestsAdmitted.Destroy() tm.writeRequestsAdmitted.Destroy() tm.readBytesAdmitted.Destroy() diff --git a/pkg/kv/kvserver/tenantrate/settings.go b/pkg/kv/kvserver/tenantrate/settings.go index e4945f27f729..9f2fe8cd3a96 100644 --- a/pkg/kv/kvserver/tenantrate/settings.go +++ b/pkg/kv/kvserver/tenantrate/settings.go @@ -28,11 +28,15 @@ type Config struct { // up to this limit. Burst float64 + // ReadBatchUnits is the baseline cost of a read batch, in KV Compute Units. + ReadBatchUnits float64 // ReadRequestUnits is the baseline cost of a read, in KV Compute Units. ReadRequestUnits float64 // ReadRequestUnits is the size-dependent cost of a read, in KV Compute Units // per byte. ReadUnitsPerByte float64 + // WriteBatchUnits is the baseline cost of a write batch, in KV Compute Units. + WriteBatchUnits float64 // WriteRequestUnits is the baseline cost of a write, in KV Compute Units. WriteRequestUnits float64 // WriteRequestUnits is the size-dependent cost of a write, in KV Compute @@ -85,6 +89,14 @@ var ( settings.PositiveFloat, ) + readBatchCost = settings.RegisterFloatSetting( + settings.TenantWritable, + "kv.tenant_rate_limiter.read_batch_cost", + "base cost of a read batch in KV Compute Units", + 0.1, + settings.PositiveFloat, + ) + readRequestCost = settings.RegisterFloatSetting( settings.TenantWritable, "kv.tenant_rate_limiter.read_request_cost", @@ -101,6 +113,14 @@ var ( settings.PositiveFloat, ) + writeBatchCost = settings.RegisterFloatSetting( + settings.TenantWritable, + "kv.tenant_rate_limiter.write_batch_cost", + "base cost of a write batch in KV Compute Units", + 0.1, + settings.PositiveFloat, + ) + writeRequestCost = settings.RegisterFloatSetting( settings.TenantWritable, "kv.tenant_rate_limiter.write_request_cost", @@ -121,8 +141,10 @@ var ( configSettings = [...]settings.NonMaskedSetting{ kvcuRateLimit, kvcuBurstLimitSeconds, + readBatchCost, readRequestCost, readCostPerMB, + writeBatchCost, writeRequestCost, writeCostPerMB, } @@ -145,8 +167,10 @@ func ConfigFromSettings(sv *settings.Values) Config { return Config{ Rate: rate, Burst: rate * kvcuBurstLimitSeconds.Get(sv), + ReadBatchUnits: readBatchCost.Get(sv), ReadRequestUnits: readRequestCost.Get(sv), ReadUnitsPerByte: readCostPerMB.Get(sv) / (1024 * 1024), + WriteBatchUnits: writeBatchCost.Get(sv), WriteRequestUnits: writeRequestCost.Get(sv), WriteUnitsPerByte: writeCostPerMB.Get(sv) / (1024 * 1024), } @@ -159,8 +183,10 @@ func DefaultConfig() Config { return Config{ Rate: rate, Burst: rate * kvcuBurstLimitSeconds.Default(), + ReadBatchUnits: readBatchCost.Default(), ReadRequestUnits: readRequestCost.Default(), ReadUnitsPerByte: readCostPerMB.Default() / (1024 * 1024), + WriteBatchUnits: writeBatchCost.Default(), WriteRequestUnits: writeRequestCost.Default(), WriteUnitsPerByte: writeCostPerMB.Default() / (1024 * 1024), } diff --git a/pkg/kv/kvserver/tenantrate/system_limiter.go b/pkg/kv/kvserver/tenantrate/system_limiter.go index 11778ef26e86..d89bf68da103 100644 --- a/pkg/kv/kvserver/tenantrate/system_limiter.go +++ b/pkg/kv/kvserver/tenantrate/system_limiter.go @@ -23,17 +23,20 @@ type systemLimiter struct { } func (s systemLimiter) Wait(ctx context.Context, reqInfo tenantcostmodel.RequestInfo) error { - if isWrite, writeBytes := reqInfo.IsWrite(); isWrite { - s.writeRequestsAdmitted.Inc(1) - s.writeBytesAdmitted.Inc(writeBytes) - } else { - s.readRequestsAdmitted.Inc(1) + if reqInfo.IsWrite() { + s.writeBatchesAdmitted.Inc(1) + s.writeRequestsAdmitted.Inc(reqInfo.WriteCount()) + s.writeBytesAdmitted.Inc(reqInfo.WriteBytes()) } return nil } func (s systemLimiter) RecordRead(ctx context.Context, respInfo tenantcostmodel.ResponseInfo) { - s.readBytesAdmitted.Inc(respInfo.ReadBytes()) + if respInfo.IsRead() { + s.readBatchesAdmitted.Inc(1) + s.readRequestsAdmitted.Inc(respInfo.ReadCount()) + s.readBytesAdmitted.Inc(respInfo.ReadBytes()) + } } var _ Limiter = (*systemLimiter)(nil) diff --git a/pkg/kv/kvserver/tenantrate/testdata/basic b/pkg/kv/kvserver/tenantrate/testdata/basic index 6d891611f210..6298c24a8889 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/basic +++ b/pkg/kv/kvserver/tenantrate/testdata/basic @@ -13,13 +13,14 @@ get_tenants ---- [2#2, 3#1, 5#3, system#1] -# Launch two requests on behalf of tenant 2, one on behalf of 3, and one on -# behalf of the system tenant. +# Launch three requests on behalf of tenant 2, one on behalf of 3, and one on +# behalf of the system tenant. Note that read requests do not consume quota; +# only read responses do. launch - { id: g0, tenant: 1 } - { id: g1, tenant: 2 } -- { id: g2, tenant: 2, iswrite: true } +- { id: g2, tenant: 2, writecount: 2 } - { id: g3, tenant: 3 } ---- [g0@system, g1@2, g2@2, g3@3] @@ -32,14 +33,13 @@ await ---- [] -# Launch another read and another write request on behalf of tenant 2; they -# will block because the burst limit only supports two requests. +# Launch another write request on behalf of tenant 2; it will block because the +# burst limit only supports two requests. launch -- { id: g4, tenant: 2 } -- { id: g5, tenant: 2, iswrite: true } +- { id: g4, tenant: 2, writecount: 1 } ---- -[g4@2, g5@2] +[g4@2] # Ensure that it the above request was blocked by observing the timer it creates # to wait for available quota. @@ -49,7 +49,27 @@ timers 00:00:01.000 # Observe that the "current_blocked" counter has appropriate values to indicate -# that there are two blocked request for tenant 2 and total. +# that there is one blocked request for tenant 2 and total. + +metrics +current_blocked +---- +kv_tenant_rate_limit_current_blocked 1 +kv_tenant_rate_limit_current_blocked{tenant_id="2"} 1 +kv_tenant_rate_limit_current_blocked{tenant_id="3"} 0 +kv_tenant_rate_limit_current_blocked{tenant_id="5"} 0 +kv_tenant_rate_limit_current_blocked{tenant_id="system"} 0 + +# Ensure that a read request will be blocked if there is no available quota +# left. Note that we don't actually subtract the cost of the read until the +# response, but we don't even want to attempt the read if there's no quota left. + +launch +- { id: g5, tenant: 2 } +---- +[g4@2, g5@2] + +# Observe that the "current_blocked" counter has been updated accordingly. metrics current_blocked @@ -60,20 +80,20 @@ kv_tenant_rate_limit_current_blocked{tenant_id="3"} 0 kv_tenant_rate_limit_current_blocked{tenant_id="5"} 0 kv_tenant_rate_limit_current_blocked{tenant_id="system"} 0 - # Observe that the "requests_admitted" counters has appropriate values to -# indicate that requests have been admitted. +# indicate that write requests have been admitted, but that no read requests +# have yet been admitted (since that happens during response). metrics kv_tenant_rate_limit_.*_requests_admitted ---- -kv_tenant_rate_limit_read_requests_admitted 3 -kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 1 -kv_tenant_rate_limit_read_requests_admitted{tenant_id="3"} 1 +kv_tenant_rate_limit_read_requests_admitted 0 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 0 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="3"} 0 kv_tenant_rate_limit_read_requests_admitted{tenant_id="5"} 0 -kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 1 -kv_tenant_rate_limit_write_requests_admitted 1 -kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 1 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 0 +kv_tenant_rate_limit_write_requests_admitted 2 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 2 kv_tenant_rate_limit_write_requests_admitted{tenant_id="3"} 0 kv_tenant_rate_limit_write_requests_admitted{tenant_id="5"} 0 kv_tenant_rate_limit_write_requests_admitted{tenant_id="system"} 0 @@ -91,7 +111,6 @@ metrics \{tenant_id="3"\} ---- - # Advance time to the point where there should be enough units for both # requests to go through. diff --git a/pkg/kv/kvserver/tenantrate/testdata/burst b/pkg/kv/kvserver/tenantrate/testdata/burst index 96a96bc71f9f..ac462490bf0b 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/burst +++ b/pkg/kv/kvserver/tenantrate/testdata/burst @@ -20,7 +20,7 @@ get_tenants # limit. This will not block but will put the limiter into debt. launch -- { id: g1, tenant: 2, iswrite: true, writebytes: 20 } +- { id: g1, tenant: 2, writecount: 1, writebytes: 20 } ---- [g1@2] @@ -33,7 +33,7 @@ await # This will be 2s because we're in debt 1 and the rate is 1/s. launch -- { id: g2, tenant: 2, iswrite: true, writebytes: 0 } +- { id: g2, tenant: 2, writecount: 1, writebytes: 0 } ---- [g2@2] @@ -67,7 +67,7 @@ await # is above the burst of 2, we'll need to wait for the bucket to be full. launch -- { id: g3, tenant: 2, iswrite: true, writebytes: 30 } +- { id: g3, tenant: 2, writecount: 1, writebytes: 30 } ---- [g3@2] diff --git a/pkg/kv/kvserver/tenantrate/testdata/cancel b/pkg/kv/kvserver/tenantrate/testdata/cancel index 6d04abed90b3..587745deebc7 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/cancel +++ b/pkg/kv/kvserver/tenantrate/testdata/cancel @@ -16,7 +16,7 @@ get_tenants # Launch a request to consume one unit. launch -- { id: g1, tenant: 2, iswrite: true, writebytes: 0 } +- { id: g1, tenant: 2, writecount: 1, writebytes: 0 } ---- [g1@2] @@ -28,7 +28,7 @@ await # Launch a request requiring more quota than exists. launch -- { id: g2, tenant: 2, iswrite: true, writebytes: 100 } +- { id: g2, tenant: 2, writecount: 1, writebytes: 100 } ---- [g2@2] @@ -41,7 +41,7 @@ timers # Launch another request which could be fulfilled by the existing quota. launch -- { id: g3, tenant: 2, iswrite: true, writebytes: 0 } +- { id: g3, tenant: 2, writecount: 1, writebytes: 0 } ---- [g2@2, g3@2] diff --git a/pkg/kv/kvserver/tenantrate/testdata/reads b/pkg/kv/kvserver/tenantrate/testdata/reads index c715382fe8b2..b6a8d15da690 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/reads +++ b/pkg/kv/kvserver/tenantrate/testdata/reads @@ -1,5 +1,5 @@ # This tests bursting and how requests above the burst limit put the limiter -# into debt +# into debt. init rate: 2 @@ -16,30 +16,41 @@ get_tenants [2#1, system#1] # Read the entire burst worth of bytes plus 0.4 which should put the limiter -# in debt by 0.4. Also record a system read. We'll verify both show up in metrics. +# in debt by 0.4. Also record a system read. We'll verify both show up in +# metrics. record_read -- { tenant: 2, readbytes: 34 } -- { tenant: 1, readbytes: 10 } +- { tenant: 2, readcount: 1, readbytes: 34 } +- { tenant: 1, readcount: 1, readbytes: 10 } ---- [] # Verify that the above reads made it to the metrics. metrics -kv_tenant_rate_limit_read_bytes_admitted +kv_tenant_rate_limit_read_.*_admitted ---- kv_tenant_rate_limit_read_bytes_admitted 44 kv_tenant_rate_limit_read_bytes_admitted{tenant_id="2"} 34 kv_tenant_rate_limit_read_bytes_admitted{tenant_id="system"} 10 +kv_tenant_rate_limit_read_requests_admitted 2 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 1 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 1 -# Launch a request which will block because it needs 1 unit. +# Launch a read request which will block because we're in debt. launch - { id: g1, tenant: 2 } ---- [g1@2] +metrics +current_blocked +---- +kv_tenant_rate_limit_current_blocked 1 +kv_tenant_rate_limit_current_blocked{tenant_id="2"} 1 +kv_tenant_rate_limit_current_blocked{tenant_id="system"} 0 + timers ---- 00:00:00.200 @@ -47,7 +58,7 @@ timers # Record more reads, putting the limiter further into debt. record_read -- { tenant: 2, readbytes: 16 } +- { tenant: 2, readcount: 1, readbytes: 16 } ---- [g1@2] @@ -69,12 +80,12 @@ advance timers ---- -00:00:01.000 +00:00:01.500 advance -800ms +1300ms ---- -00:00:01.001 +00:00:01.501 await - g1 diff --git a/pkg/kv/kvserver/tenantrate/testdata/update b/pkg/kv/kvserver/tenantrate/testdata/update index 6ad5178f35d7..4d61ff320831 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/update +++ b/pkg/kv/kvserver/tenantrate/testdata/update @@ -16,7 +16,7 @@ get_tenants # Launch a request that puts the limiter in debt by 2. launch -- { id: g1, tenant: 2, iswrite: true, writebytes: 50 } +- { id: g1, tenant: 2, writecount: 1, writebytes: 50 } ---- [g1@2] @@ -29,7 +29,7 @@ await # with the current debt. launch -- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } +- { id: g1, tenant: 2, writecount: 1, writebytes: 30 } ---- [g1@2] diff --git a/pkg/multitenant/tenantcostmodel/model.go b/pkg/multitenant/tenantcostmodel/model.go index 882c99c1b611..7a345b74360b 100644 --- a/pkg/multitenant/tenantcostmodel/model.go +++ b/pkg/multitenant/tenantcostmodel/model.go @@ -29,23 +29,34 @@ type RU float64 // // The cost model takes into account the following activities: // -// - KV "read" and "write" operations. KV operations that read or write data -// have a base cost and a per-byte cost. Specifically, the cost of a read is: -// RUs = ReadRequest + * ReadByte -// The cost of a write is: -// RUs = WriteRequest + * WriteByte +// - KV "read" and "write" batches. KV batches that read or write data have a +// base cost, a per-request cost, and a per-byte cost. Specifically, the +// cost of a read batch is: +// RUs = KVReadBatch + +// * KVReadRequest + +// * KVReadByte +// The cost of a write batch is: +// RUs = KVWriteBatch + +// * KVWriteRequest + +// * KVWriteByte // // - CPU usage on the tenant's SQL pods. -// // - Writes to external storage services such as S3. +// - Count of bytes returned from SQL to the client (network egress). // type Config struct { + // KVReadBatch is the baseline cost of a batch of KV reads. + KVReadBatch RU + // KVReadRequest is the baseline cost of a KV read. KVReadRequest RU // KVReadByte is the per-byte cost of a KV read. KVReadByte RU + // KVWriteBatch is the baseline cost of a batch of KV writes. + KVWriteBatch RU + // KVWriteRequest is the baseline cost of a KV write. KVWriteRequest RU @@ -59,23 +70,34 @@ type Config struct { // client. PGWireEgressByte RU - // ExternalIOEgressByte is the cost of transferring one - // byte from a SQL pod to external services. + // ExternalIOEgressByte is the cost of transferring one byte from a SQL pod + // to external services. ExternalIOEgressByte RU - // ExternalIOIngressByte is the cost of transferring one - // byte from an external service into the SQL pod. + // ExternalIOIngressByte is the cost of transferring one byte from an external + // service into the SQL pod. ExternalIOIngressByte RU } // KVReadCost calculates the cost of a KV read operation. -func (c *Config) KVReadCost(bytes int64) RU { - return c.KVReadRequest + RU(bytes)*c.KVReadByte +func (c *Config) KVReadCost(count, bytes int64) RU { + return c.KVReadBatch + RU(count)*c.KVReadRequest + RU(bytes)*c.KVReadByte } // KVWriteCost calculates the cost of a KV write operation. -func (c *Config) KVWriteCost(bytes int64) RU { - return c.KVWriteRequest + RU(bytes)*c.KVWriteByte +func (c *Config) KVWriteCost(count, bytes int64) RU { + return c.KVWriteBatch + RU(count)*c.KVWriteRequest + RU(bytes)*c.KVWriteByte +} + +// PodCPUCost calculates the cost of CPU seconds consumed in the SQL pod. +func (c *Config) PodCPUCost(seconds float64) RU { + return RU(seconds) * c.PodCPUSecond +} + +// PGWireEgressCost calculates the cost of bytes leaving the SQL pod to external +// services. +func (c *Config) PGWireEgressCost(bytes int64) RU { + return RU(bytes) * c.PGWireEgressByte } // ExternalWriteCost calculates the cost of an external write operation. @@ -88,85 +110,139 @@ func (c *Config) ExternalReadCost(bytes int64) RU { return RU(bytes) * c.ExternalIOIngressByte } -// RequestCost returns the portion of the cost that can be calculated upfront: -// the per-request cost (for both reads and writes) and the per-byte write cost. +// RequestCost returns the portion of the cost that is calculated upfront: the +// per-request and per-byte write cost. func (c *Config) RequestCost(bri RequestInfo) RU { - if isWrite, writeBytes := bri.IsWrite(); isWrite { - return c.KVWriteCost(writeBytes) + if !bri.IsWrite() { + return 0 } - return c.KVReadRequest + return c.KVWriteCost(bri.writeCount, bri.writeBytes) } -// ResponseCost returns the portion of the cost that can only be calculated -// after-the-fact: the per-byte read cost. +// ResponseCost returns the portion of the cost that is calculated +// after-the-fact: the per-request and per-byte read cost. func (c *Config) ResponseCost(bri ResponseInfo) RU { - return RU(bri.ReadBytes()) * c.KVReadByte + if !bri.IsRead() { + return 0 + } + return c.KVReadCost(bri.readCount, bri.readBytes) } -// RequestInfo captures the request information that is used (together with -// the cost model) to determine the portion of the cost that can be calculated -// upfront. Specifically: whether it is a read or a write and the write size (if -// it's a write). +// RequestInfo captures the BatchRequeset information that is used (together +// with the cost model) to determine the portion of the cost that can be +// calculated upfront. Specifically: how many writes were batched together and +// their total size (if the request is a write batch). type RequestInfo struct { - // writeBytes is the write size if the request is a write, or -1 if it is a read. + // writeCount is the number of writes that were batched together. This is -1 + // if it is a read-only batch. + writeCount int64 + // writeBytes is the total size of all batched writes in the request, in + // bytes, or 0 if it is a read-only batch. writeBytes int64 } // MakeRequestInfo extracts the relevant information from a BatchRequest. func MakeRequestInfo(ba *roachpb.BatchRequest) RequestInfo { + // The cost of read-only batches is captured by MakeResponseInfo. if !ba.IsWrite() { - return RequestInfo{writeBytes: -1} + return RequestInfo{writeCount: -1} } - var writeBytes int64 + var writeCount, writeBytes int64 for i := range ba.Requests { - if swr, isSizedWrite := ba.Requests[i].GetInner().(roachpb.SizedWriteRequest); isSizedWrite { - writeBytes += swr.WriteBytes() + req := ba.Requests[i].GetInner() + + // Only count non-admin requests in the batch that write user data. Other + // requests are considered part of the "base" cost of a batch. + switch req.(type) { + case *roachpb.PutRequest, *roachpb.ConditionalPutRequest, *roachpb.IncrementRequest, + *roachpb.DeleteRequest, *roachpb.DeleteRangeRequest, *roachpb.ClearRangeRequest, + *roachpb.RevertRangeRequest, *roachpb.InitPutRequest, *roachpb.AddSSTableRequest: + writeCount++ + if swr, isSizedWrite := req.(roachpb.SizedWriteRequest); isSizedWrite { + writeBytes += swr.WriteBytes() + } } } - return RequestInfo{writeBytes: writeBytes} + return RequestInfo{writeCount: writeCount, writeBytes: writeBytes} } -// IsWrite returns whether the request is a write, and if so the write size in -// bytes. -func (bri RequestInfo) IsWrite() (isWrite bool, writeBytes int64) { - if bri.writeBytes == -1 { - return false, 0 - } - return true, bri.writeBytes +// IsWrite is true if this was a write batch rather than a read-only batch. +func (bri RequestInfo) IsWrite() bool { + return bri.writeCount != -1 +} + +// WriteCount is the number of writes that were batched together. This is -1 if +// it is a read-only batch. +func (bri RequestInfo) WriteCount() int64 { + return bri.writeCount +} + +// WriteBytes is the total size of all batched writes in the request, in bytes, +// or 0 if it is a read-only batch. +func (bri RequestInfo) WriteBytes() int64 { + return bri.writeBytes } // TestingRequestInfo creates a RequestInfo for testing purposes. -func TestingRequestInfo(isWrite bool, writeBytes int64) RequestInfo { - if !isWrite { - return RequestInfo{writeBytes: -1} - } - return RequestInfo{writeBytes: writeBytes} +func TestingRequestInfo(writeCount, writeBytes int64) RequestInfo { + return RequestInfo{writeCount: writeCount, writeBytes: writeBytes} } // ResponseInfo captures the BatchResponse information that is used (together // with the cost model) to determine the portion of the cost that can only be -// calculated after-the-fact. Specifically: the read size (if the request is a -// read). +// calculated after-the-fact. Specifically: how many reads were batched together +// and their total size (if the request is a read-only batch). type ResponseInfo struct { + // readCount is the number of reads that were batched together. This is -1 + // if it is a write batch. + readCount int64 + // readBytes is the total size of all batched reads in the response, in + // bytes, or 0 if it is a write batch. readBytes int64 } // MakeResponseInfo extracts the relevant information from a BatchResponse. -func MakeResponseInfo(br *roachpb.BatchResponse) ResponseInfo { - var readBytes int64 - for _, ru := range br.Responses { - readBytes += ru.GetInner().Header().NumBytes +func MakeResponseInfo(br *roachpb.BatchResponse, isReadOnly bool) ResponseInfo { + // The cost of non read-only batches is captured by MakeRequestInfo. + if !isReadOnly { + return ResponseInfo{readCount: -1} + } + + var readCount, readBytes int64 + for i := range br.Responses { + resp := br.Responses[i].GetInner() + + // Only count requests in the batch that read user data. Other requests + // are considered part of the "base" cost of a batch. + switch resp.(type) { + case *roachpb.GetResponse, *roachpb.ScanResponse, *roachpb.ReverseScanResponse, + *roachpb.ExportResponse: + readCount++ + readBytes += resp.Header().NumBytes + } } - return ResponseInfo{readBytes: readBytes} + return ResponseInfo{readCount: readCount, readBytes: readBytes} +} + +// IsRead is true if this was a read-only batch rather than a write batch. +func (bri ResponseInfo) IsRead() bool { + return bri.readCount != -1 +} + +// ReadCount is the number of reads that were batched together. This is -1 if it +// is a write batch. +func (bri ResponseInfo) ReadCount() int64 { + return bri.readCount } -// ReadBytes returns the bytes read, or 0 if the request was a write. +// ReadBytes is the total size of all batched reads in the response, in bytes, +// or 0 if it is a write batch. func (bri ResponseInfo) ReadBytes() int64 { return bri.readBytes } // TestingResponseInfo creates a ResponseInfo for testing purposes. -func TestingResponseInfo(readBytes int64) ResponseInfo { - return ResponseInfo{readBytes: readBytes} +func TestingResponseInfo(readCount, readBytes int64) ResponseInfo { + return ResponseInfo{readCount: readCount, readBytes: readBytes} } diff --git a/pkg/multitenant/tenantcostmodel/settings.go b/pkg/multitenant/tenantcostmodel/settings.go index b08fcfe9b034..678cbd03ba75 100644 --- a/pkg/multitenant/tenantcostmodel/settings.go +++ b/pkg/multitenant/tenantcostmodel/settings.go @@ -26,6 +26,14 @@ import ( // only the defaults are used. Ideally, the tenant would always get the values // from the host cluster. var ( + readBatchCost = settings.RegisterFloatSetting( + settings.TenantReadOnly, + "tenant_cost_model.kv_read_batch_cost", + "base cost of a read batch in Request Units", + 0.6993, + settings.PositiveFloat, + ) + readRequestCost = settings.RegisterFloatSetting( settings.TenantReadOnly, "tenant_cost_model.kv_read_request_cost", @@ -42,6 +50,14 @@ var ( settings.PositiveFloat, ) + writeBatchCost = settings.RegisterFloatSetting( + settings.TenantReadOnly, + "tenant_cost_model.kv_write_batch_cost", + "base cost of a write batch in Request Units", + 5.7733, + settings.PositiveFloat, + ) + writeRequestCost = settings.RegisterFloatSetting( settings.TenantReadOnly, "tenant_cost_model.kv_write_request_cost", @@ -96,8 +112,10 @@ var ( // List of config settings, used by SetOnChange. configSettings = [...]settings.NonMaskedSetting{ + readBatchCost, readRequestCost, readCostPerMB, + writeBatchCost, writeRequestCost, writeCostPerMB, podCPUSecondCost, @@ -112,8 +130,10 @@ const perMBToPerByte = float64(1) / (1024 * 1024) // ConfigFromSettings constructs a Config using the cluster setting values. func ConfigFromSettings(sv *settings.Values) Config { return Config{ + KVReadBatch: RU(readBatchCost.Get(sv)), KVReadRequest: RU(readRequestCost.Get(sv)), KVReadByte: RU(readCostPerMB.Get(sv) * perMBToPerByte), + KVWriteBatch: RU(writeBatchCost.Get(sv)), KVWriteRequest: RU(writeRequestCost.Get(sv)), KVWriteByte: RU(writeCostPerMB.Get(sv) * perMBToPerByte), PodCPUSecond: RU(podCPUSecondCost.Get(sv)), @@ -127,8 +147,10 @@ func ConfigFromSettings(sv *settings.Values) Config { // setting values. func DefaultConfig() Config { return Config{ + KVReadBatch: RU(readBatchCost.Default()), KVReadRequest: RU(readRequestCost.Default()), KVReadByte: RU(readCostPerMB.Default() * perMBToPerByte), + KVWriteBatch: RU(writeBatchCost.Default()), KVWriteRequest: RU(writeRequestCost.Default()), KVWriteByte: RU(writeCostPerMB.Default() * perMBToPerByte), PodCPUSecond: RU(podCPUSecondCost.Default()), diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 06cb67400d17..5eba5ae37a20 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1693,8 +1693,10 @@ var _ = (*TenantConsumption).Equal func (c *TenantConsumption) Add(other *TenantConsumption) { c.RU += other.RU c.KVRU += other.KVRU + c.ReadBatches += other.ReadBatches c.ReadRequests += other.ReadRequests c.ReadBytes += other.ReadBytes + c.WriteBatches += other.WriteBatches c.WriteRequests += other.WriteRequests c.WriteBytes += other.WriteBytes c.SQLPodsCPUSeconds += other.SQLPodsCPUSeconds @@ -1717,6 +1719,12 @@ func (c *TenantConsumption) Sub(other *TenantConsumption) { c.KVRU -= other.KVRU } + if c.ReadBatches < other.ReadBatches { + c.ReadBatches = 0 + } else { + c.ReadBatches -= other.ReadBatches + } + if c.ReadRequests < other.ReadRequests { c.ReadRequests = 0 } else { @@ -1729,6 +1737,12 @@ func (c *TenantConsumption) Sub(other *TenantConsumption) { c.ReadBytes -= other.ReadBytes } + if c.WriteBatches < other.WriteBatches { + c.WriteBatches = 0 + } else { + c.WriteBatches -= other.WriteBatches + } + if c.WriteRequests < other.WriteRequests { c.WriteRequests = 0 } else { diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 9c7efe2fd597..b423a972a295 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2869,8 +2869,10 @@ message TenantSetting { message TenantConsumption { double r_u = 1; double kv_r_u = 8 [(gogoproto.customname) = "KVRU"]; + uint64 read_batches = 11; uint64 read_requests = 2; uint64 read_bytes = 3; + uint64 write_batches = 12; uint64 write_requests = 4; uint64 write_bytes = 5; double sql_pods_cpu_seconds = 6 [(gogoproto.customname) = "SQLPodsCPUSeconds"]; diff --git a/pkg/roachpb/api_test.go b/pkg/roachpb/api_test.go index 9a76bd45eec2..c48f62213759 100644 --- a/pkg/roachpb/api_test.go +++ b/pkg/roachpb/api_test.go @@ -313,13 +313,15 @@ func TestContentionEvent_SafeFormat(t *testing.T) { func TestTenantConsumptionAddSub(t *testing.T) { a := TenantConsumption{ RU: 1, - ReadRequests: 2, - ReadBytes: 3, - WriteRequests: 4, - WriteBytes: 5, - SQLPodsCPUSeconds: 6, - PGWireEgressBytes: 7, - KVRU: 8, + ReadBatches: 2, + ReadRequests: 3, + ReadBytes: 4, + WriteBatches: 5, + WriteRequests: 6, + WriteBytes: 7, + SQLPodsCPUSeconds: 8, + PGWireEgressBytes: 9, + KVRU: 10, } var b TenantConsumption for i := 0; i < 10; i++ { @@ -327,13 +329,15 @@ func TestTenantConsumptionAddSub(t *testing.T) { } if exp := (TenantConsumption{ RU: 10, - ReadRequests: 20, - ReadBytes: 30, - WriteRequests: 40, - WriteBytes: 50, - SQLPodsCPUSeconds: 60, - PGWireEgressBytes: 70, - KVRU: 80, + ReadBatches: 20, + ReadRequests: 30, + ReadBytes: 40, + WriteBatches: 50, + WriteRequests: 60, + WriteBytes: 70, + SQLPodsCPUSeconds: 80, + PGWireEgressBytes: 90, + KVRU: 100, }); b != exp { t.Errorf("expected\n%#v\ngot\n%#v", exp, b) } @@ -342,13 +346,15 @@ func TestTenantConsumptionAddSub(t *testing.T) { c.Sub(&a) if exp := (TenantConsumption{ RU: 9, - ReadRequests: 18, - ReadBytes: 27, - WriteRequests: 36, - WriteBytes: 45, - SQLPodsCPUSeconds: 54, - PGWireEgressBytes: 63, - KVRU: 72, + ReadBatches: 18, + ReadRequests: 27, + ReadBytes: 36, + WriteBatches: 45, + WriteRequests: 54, + WriteBytes: 63, + SQLPodsCPUSeconds: 72, + PGWireEgressBytes: 81, + KVRU: 90, }); c != exp { t.Errorf("expected\n%#v\ngot\n%#v", exp, c) } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index aeb474598fce..89d5fb809fb1 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -334,12 +334,12 @@ func startTenantInternal( } externalUsageFn := func(ctx context.Context) multitenant.ExternalUsage { - userTimeMillis, _, err := status.GetCPUTime(ctx) + userTimeMillis, sysTimeMillis, err := status.GetCPUTime(ctx) if err != nil { log.Ops.Errorf(ctx, "unable to get cpu usage: %v", err) } return multitenant.ExternalUsage{ - CPUSecs: float64(userTimeMillis) * 1e-3, + CPUSecs: float64(userTimeMillis+sysTimeMillis) * 1e-3, PGWireEgressBytes: s.pgServer.BytesOut(), } } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index fd62e84c776b..37c52d01fa0e 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -945,6 +945,18 @@ var charts = []sectionDescription{ Percentiles: false, Metrics: []string{"kv.tenant_rate_limit.num_tenants"}, }, + { + Title: "Read Batches Admitted by Rate Limiter", + Downsampler: DescribeAggregator_MAX, + Percentiles: false, + Metrics: []string{"kv.tenant_rate_limit.read_batches_admitted"}, + }, + { + Title: "Write Batches Admitted by Rate Limiter", + Downsampler: DescribeAggregator_MAX, + Percentiles: false, + Metrics: []string{"kv.tenant_rate_limit.write_batches_admitted"}, + }, { Title: "Read Requests Admitted by Rate Limiter", Downsampler: DescribeAggregator_MAX, @@ -958,16 +970,16 @@ var charts = []sectionDescription{ Metrics: []string{"kv.tenant_rate_limit.write_requests_admitted"}, }, { - Title: "Read Bytes Admitted by Rate Limiter", + Title: "Write Bytes Admitted by Rate Limiter", Downsampler: DescribeAggregator_MAX, Percentiles: false, - Metrics: []string{"kv.tenant_rate_limit.read_bytes_admitted"}, + Metrics: []string{"kv.tenant_rate_limit.write_bytes_admitted"}, }, { - Title: "Write Bytes Admitted by Rate Limiter", + Title: "Read Bytes Admitted by Rate Limiter", Downsampler: DescribeAggregator_MAX, Percentiles: false, - Metrics: []string{"kv.tenant_rate_limit.write_bytes_admitted"}, + Metrics: []string{"kv.tenant_rate_limit.read_bytes_admitted"}, }, }, },