diff --git a/pkg/cmd/roachtest/disk_stall.go b/pkg/cmd/roachtest/disk_stall.go index 2e151b42a196..2d5a65b95b57 100644 --- a/pkg/cmd/roachtest/disk_stall.go +++ b/pkg/cmd/roachtest/disk_stall.go @@ -33,7 +33,7 @@ func registerDiskStalledDetection(r *testRegistry) { "disk-stalled/log=%t,data=%t", affectsLogDir, affectsDataDir, ), - Owner: OwnerKV, + Owner: OwnerStorage, MinVersion: "v19.2.0", Cluster: makeClusterSpec(1), Run: func(ctx context.Context, t *test, c *cluster) { diff --git a/pkg/cmd/roachtest/gossip.go b/pkg/cmd/roachtest/gossip.go index 1df5077c3965..988e36754f31 100644 --- a/pkg/cmd/roachtest/gossip.go +++ b/pkg/cmd/roachtest/gossip.go @@ -271,6 +271,9 @@ func runGossipPeerings(ctx context.Context, t *test, c *cluster) { t.l.Printf("%d: restarting node %d\n", i, node[0]) c.Stop(ctx, node) c.Start(ctx, t, node) + // Sleep a bit to avoid hitting: + // https://github.com/cockroachdb/cockroach/issues/48005 + time.Sleep(3 * time.Second) } } diff --git a/pkg/cmd/roachtest/jepsen.go b/pkg/cmd/roachtest/jepsen.go index 871fc85f5311..a8fc3c003457 100644 --- a/pkg/cmd/roachtest/jepsen.go +++ b/pkg/cmd/roachtest/jepsen.go @@ -312,8 +312,10 @@ func registerJepsen(r *testRegistry) { for _, nemesis := range jepsenNemeses { nemesis := nemesis // copy for closure spec := testSpec{ - Name: fmt.Sprintf("jepsen/%s/%s", testName, nemesis.name), - Owner: OwnerKV, + Name: fmt.Sprintf("jepsen/%s/%s", testName, nemesis.name), + // We don't run jepsen on older releases due to the high rate of flakes. + MinVersion: "v20.1.0", + Owner: OwnerKV, // The Jepsen tests do funky things to machines, like muck with the // system clock; therefore, their clusters cannot be reused other tests // except the Jepsen ones themselves which reset all this state when diff --git a/pkg/cmd/roachtest/tpcc.go b/pkg/cmd/roachtest/tpcc.go index b82ae6431227..bc42e55aa3d8 100644 --- a/pkg/cmd/roachtest/tpcc.go +++ b/pkg/cmd/roachtest/tpcc.go @@ -414,7 +414,7 @@ func registerTPCC(r *testRegistry) { LoadWarehouses: 5000, EstimatedMax: 3000, - MinVersion: "v19.1.0", + MinVersion: "v20.1.0", }) registerTPCCBenchSpec(r, tpccBenchSpec{ Nodes: 9, @@ -425,7 +425,7 @@ func registerTPCC(r *testRegistry) { LoadWarehouses: 2000, EstimatedMax: 900, - MinVersion: "v19.1.0", + MinVersion: "v20.1.0", }) } diff --git a/pkg/kv/kvserver/client_tenant_test.go b/pkg/kv/kvserver/client_tenant_test.go index 2ed7fd887d9e..e824ff1eb521 100644 --- a/pkg/kv/kvserver/client_tenant_test.go +++ b/pkg/kv/kvserver/client_tenant_test.go @@ -175,20 +175,20 @@ func TestTenantRateLimiter(t *testing.T) { // the tenant range. tenantCtx := roachpb.NewContextForTenant(ctx, tenantID) cfg := tenantrate.LimitConfigsFromSettings(s.ClusterSettings()) - for i := 0; i < int(cfg.Requests.Burst); i++ { + for i := 0; i < int(cfg.WriteRequests.Burst); i++ { require.NoError(t, db.Put(ctx, mkKey(), 0)) } // Now ensure that in the same instant the write QPS limit does affect the // tenant. Issuing up to the burst limit of requests can happen without // blocking. - for i := 0; i < int(cfg.Requests.Burst); i++ { + for i := 0; i < int(cfg.WriteRequests.Burst); i++ { require.NoError(t, db.Put(tenantCtx, mkKey(), 0)) } // Attempt to issue another request, make sure that it gets blocked by // observing a timer. errCh := make(chan error, 1) go func() { errCh <- db.Put(tenantCtx, mkKey(), 0) }() - expectedTimer := t0.Add(time.Duration(float64(1/cfg.Requests.Rate) * float64(time.Second))) + expectedTimer := t0.Add(time.Duration(float64(1/cfg.WriteRequests.Rate) * float64(time.Second))) testutils.SucceedsSoon(t, func() error { timers := timeSource.Timers() if len(timers) != 1 { @@ -213,18 +213,18 @@ func TestTenantRateLimiter(t *testing.T) { return string(read) } makeMetricStr := func(expCount int64) string { - const tenantMetricStr = `kv_tenant_rate_limit_requests_admitted{store="1",tenant_id="10"}` + const tenantMetricStr = `kv_tenant_rate_limit_write_requests_admitted{store="1",tenant_id="10"}` return fmt.Sprintf("%s %d", tenantMetricStr, expCount) } // Ensure that the metric for the admitted requests is equal to the number of // requests which we've admitted. - require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst)) + require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst)) // Allow the blocked request to proceed. timeSource.Advance(time.Second) require.NoError(t, <-errCh) // Ensure that it is now reflected in the metrics. - require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst+1)) + require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst+1)) } diff --git a/pkg/kv/kvserver/replica_rate_limit.go b/pkg/kv/kvserver/replica_rate_limit.go index b53149271f26..8e5efa7848ef 100644 --- a/pkg/kv/kvserver/replica_rate_limit.go +++ b/pkg/kv/kvserver/replica_rate_limit.go @@ -27,7 +27,7 @@ func (r *Replica) maybeRateLimitBatch(ctx context.Context, ba *roachpb.BatchRequ if !ok || tenantID == roachpb.SystemTenantID { return nil } - return r.tenantLimiter.Wait(ctx, bytesWrittenFromRequest(ba)) + return r.tenantLimiter.Wait(ctx, ba.IsWrite(), bytesWrittenFromRequest(ba)) } // bytesWrittenFromBatchRequest returns an approximation of the number of bytes diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 509ca9d21a90..1d0fdd65b7b0 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -65,8 +65,6 @@ func (r *Replica) sendWithRangeID( r.maybeInitializeRaftGroup(ctx) isReadOnly := ba.IsReadOnly() - useRaft := !isReadOnly && ba.IsWrite() - if err := r.checkBatchRequest(ba, isReadOnly); err != nil { return nil, roachpb.NewError(err) } @@ -92,14 +90,14 @@ func (r *Replica) sendWithRangeID( // Differentiate between read-write, read-only, and admin. var pErr *roachpb.Error - if useRaft { - log.Event(ctx, "read-write path") - fn := (*Replica).executeWriteBatch - br, pErr = r.executeBatchWithConcurrencyRetries(ctx, ba, fn) - } else if isReadOnly { + if isReadOnly { log.Event(ctx, "read-only path") fn := (*Replica).executeReadOnlyBatch br, pErr = r.executeBatchWithConcurrencyRetries(ctx, ba, fn) + } else if ba.IsWrite() { + log.Event(ctx, "read-write path") + fn := (*Replica).executeWriteBatch + br, pErr = r.executeBatchWithConcurrencyRetries(ctx, ba, fn) } else if ba.IsAdmin() { log.Event(ctx, "admin path") br, pErr = r.executeAdminBatch(ctx, ba) diff --git a/pkg/kv/kvserver/tenantrate/helpers_test.go b/pkg/kv/kvserver/tenantrate/helpers_test.go index b8a02b4ecbb2..81f29ec0ff94 100644 --- a/pkg/kv/kvserver/tenantrate/helpers_test.go +++ b/pkg/kv/kvserver/tenantrate/helpers_test.go @@ -15,8 +15,10 @@ import "github.com/cockroachdb/cockroach/pkg/settings/cluster" // OverrideSettingsWithRateLimits utilizes LimitConfigs from the values stored in the // settings. func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) { - requestRateLimit.Override(&settings.SV, float64(rl.Requests.Rate)) - requestBurstLimit.Override(&settings.SV, rl.Requests.Burst) + readRequestRateLimit.Override(&settings.SV, float64(rl.ReadRequests.Rate)) + readRequestBurstLimit.Override(&settings.SV, rl.ReadRequests.Burst) + writeRequestRateLimit.Override(&settings.SV, float64(rl.WriteRequests.Rate)) + writeRequestBurstLimit.Override(&settings.SV, rl.WriteRequests.Burst) readRateLimit.Override(&settings.SV, int64(rl.ReadBytes.Rate)) readBurstLimit.Override(&settings.SV, rl.ReadBytes.Burst) writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate)) diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go index a6fc7e14a2ec..5276840e0f22 100644 --- a/pkg/kv/kvserver/tenantrate/limiter.go +++ b/pkg/kv/kvserver/tenantrate/limiter.go @@ -48,14 +48,15 @@ import ( type Limiter interface { // Wait acquires n quota from the limiter. This acquisition cannot be - // released. Each call to wait will consume 1 read byte, 1 request, and - // writeBytes from the token buckets. Calls to Wait will block until the - // buckets contain adequate resources. If a request attempts to write more - // than the burst limit, it will wait until the bucket is completely full - // before acquiring the requested quantity and putting the limiter in debt. + // released. Each call to wait will consume 1 read or write request + // depending on isWrite, 1 read byte, and writeBytes from the token buckets. + // Calls to Wait will block until the buckets contain adequate resources. If + // a request attempts to write more than the burst limit, it will wait until + // the bucket is completely full before acquiring the requested quantity and + // putting the limiter in debt. // // The only errors which should be returned are due to the context. - Wait(ctx context.Context, writeBytes int64) error + Wait(ctx context.Context, isWrite bool, writeBytes int64) error // RecordRead subtracts the bytes read by a request from the token bucket. // This call may push the Limiter into debt in the ReadBytes dimensions @@ -85,28 +86,38 @@ func (rl *limiter) init( metrics: metrics, } buckets := tokenBuckets{ - requests: makeTokenBucket(conf.Requests), - readBytes: makeTokenBucket(conf.ReadBytes), - writeBytes: makeTokenBucket(conf.WriteBytes), + readRequests: makeTokenBucket(conf.ReadRequests), + writeRequests: makeTokenBucket(conf.WriteRequests), + readBytes: makeTokenBucket(conf.ReadBytes), + writeBytes: makeTokenBucket(conf.WriteBytes), } options = append(options, quotapool.OnAcquisition(func( ctx context.Context, poolName string, r quotapool.Request, start time.Time, ) { req := r.(*waitRequest) + if req.readRequests > 0 { + rl.metrics.readRequestsAdmitted.Inc(req.readRequests) + } + if req.writeRequests > 0 { + rl.metrics.writeRequestsAdmitted.Inc(req.writeRequests) + } + // Accounted for in limiter.RecordRead. + // if req.readBytes > 0 { + // rl.metrics.readBytesAdmitted.Inc(req.readBytes) + // } if req.writeBytes > 0 { rl.metrics.writeBytesAdmitted.Inc(req.writeBytes) } - rl.metrics.requestsAdmitted.Inc(1) })) rl.qp = quotapool.New(tenantID.String(), &buckets, options...) buckets.clock = rl.qp.TimeSource() buckets.lastUpdated = buckets.clock.Now() } -func (rl *limiter) Wait(ctx context.Context, writeBytes int64) error { +func (rl *limiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error { rl.metrics.currentBlocked.Inc(1) defer rl.metrics.currentBlocked.Dec(1) - r := newWaitRequest(writeBytes) + r := newWaitRequest(isWrite, writeBytes) defer putWaitRequest(r) if err := rl.qp.Acquire(ctx, r); err != nil { return err @@ -130,11 +141,12 @@ func (rl *limiter) updateLimits(limits LimitConfigs) { // tokenBuckets is the implementation of Resource which remains in the quotapool // for a limiter. type tokenBuckets struct { - clock timeutil.TimeSource - lastUpdated time.Time - requests tokenBucket - readBytes tokenBucket - writeBytes tokenBucket + clock timeutil.TimeSource + lastUpdated time.Time + readRequests tokenBucket + writeRequests tokenBucket + readBytes tokenBucket + writeBytes tokenBucket } var _ quotapool.Resource = (*tokenBuckets)(nil) @@ -146,7 +158,8 @@ func (rb *tokenBuckets) update() { // TODO(ajwerner): Consider instituting a minimum update frequency to avoid // spinning too fast on timers for tons of tiny allocations at a fast rate. if since := now.Sub(rb.lastUpdated); since > 0 { - rb.requests.update(since) + rb.readRequests.update(since) + rb.writeRequests.update(since) rb.readBytes.update(since) rb.writeBytes.update(since) rb.lastUpdated = now @@ -166,14 +179,16 @@ func (rb *tokenBuckets) check(req *waitRequest) (fulfilled bool, tryAgainAfter t } } } - check(&rb.requests, req.requests) + check(&rb.readRequests, req.readRequests) + check(&rb.writeRequests, req.writeRequests) check(&rb.readBytes, req.readBytes) check(&rb.writeBytes, req.writeBytes) return fulfilled, tryAgainAfter } func (rb *tokenBuckets) subtract(req *waitRequest) { - rb.requests.tokens -= float64(req.requests) + rb.readRequests.tokens -= float64(req.readRequests) + rb.writeRequests.tokens -= float64(req.writeRequests) rb.readBytes.tokens -= float64(req.readBytes) rb.writeBytes.tokens -= float64(req.writeBytes) } @@ -185,7 +200,8 @@ func (rb *tokenBuckets) Merge(val interface{}) (shouldNotify bool) { // configuration. rb.update() - rb.requests.setConf(toAdd.Requests) + rb.readRequests.setConf(toAdd.ReadRequests) + rb.writeRequests.setConf(toAdd.WriteRequests) rb.readBytes.setConf(toAdd.ReadBytes) rb.writeBytes.setConf(toAdd.WriteBytes) return true @@ -264,9 +280,10 @@ func (t *tokenBucket) clampTokens() { // waitRequest is used to wait for adequate resources in the tokenBuckets. type waitRequest struct { - requests int64 - writeBytes int64 - readBytes int64 + readRequests int64 + writeRequests int64 + writeBytes int64 + readBytes int64 } var waitRequestSyncPool = sync.Pool{ @@ -275,12 +292,18 @@ var waitRequestSyncPool = sync.Pool{ // newWaitRequest allocates a waitRequest from the sync.Pool. // It should be returned with putWaitRequest. -func newWaitRequest(writeBytes int64) *waitRequest { +func newWaitRequest(isWrite bool, writeBytes int64) *waitRequest { r := waitRequestSyncPool.Get().(*waitRequest) *r = waitRequest{ - requests: 1, - readBytes: 1, - writeBytes: writeBytes, + readRequests: 0, + writeRequests: 0, + readBytes: 1, + writeBytes: writeBytes, + } + if isWrite { + r.writeRequests = 1 + } else { + r.readRequests = 1 } return r } diff --git a/pkg/kv/kvserver/tenantrate/limiter_test.go b/pkg/kv/kvserver/tenantrate/limiter_test.go index 20648b4ab107..2b46c9d3f225 100644 --- a/pkg/kv/kvserver/tenantrate/limiter_test.go +++ b/pkg/kv/kvserver/tenantrate/limiter_test.go @@ -48,9 +48,9 @@ func TestCloser(t *testing.T) { limiter := factory.GetTenant(tenant, closer) ctx := context.Background() // First Wait call will not block. - require.NoError(t, limiter.Wait(ctx, 1)) + require.NoError(t, limiter.Wait(ctx, false, 1)) errCh := make(chan error, 1) - go func() { errCh <- limiter.Wait(ctx, 1<<30) }() + go func() { errCh <- limiter.Wait(ctx, false, 1<<30) }() testutils.SucceedsSoon(t, func() error { if timers := timeSource.Timers(); len(timers) != 1 { return errors.Errorf("expected 1 timer, found %d", len(timers)) @@ -84,6 +84,7 @@ type launchState struct { tenantID roachpb.TenantID ctx context.Context cancel context.CancelFunc + isWrite bool writeBytes int64 reserveCh chan error } @@ -202,6 +203,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { var cmds []struct { ID string Tenant uint64 + IsWrite bool WriteBytes int64 } if err := yaml.UnmarshalStrict([]byte(d.Input), &cmds); err != nil { @@ -213,6 +215,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { s.tenantID = roachpb.MakeTenantID(cmd.Tenant) s.ctx, s.cancel = context.WithCancel(context.Background()) s.reserveCh = make(chan error, 1) + s.isWrite = cmd.IsWrite s.writeBytes = cmd.WriteBytes ts.running[s.id] = &s lims := ts.tenants[s.tenantID] @@ -221,7 +224,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string { } go func() { // We'll not worry about ever releasing tenant Limiters. - s.reserveCh <- lims[0].Wait(s.ctx, s.writeBytes) + s.reserveCh <- lims[0].Wait(s.ctx, s.isWrite, s.writeBytes) }() } return ts.FormatRunning() @@ -337,12 +340,15 @@ func (ts *testState) recordRead(t *testing.T, d *datadriven.TestData) string { // kv_tenant_rate_limit_read_bytes_admitted 0 // kv_tenant_rate_limit_read_bytes_admitted{tenant_id="2"} 0 // kv_tenant_rate_limit_read_bytes_admitted{tenant_id="system"} 100 -// kv_tenant_rate_limit_requests_admitted 0 -// kv_tenant_rate_limit_requests_admitted{tenant_id="2"} 0 -// kv_tenant_rate_limit_requests_admitted{tenant_id="system"} 0 +// kv_tenant_rate_limit_read_requests_admitted 0 +// kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 0 +// kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 0 // kv_tenant_rate_limit_write_bytes_admitted 50 // kv_tenant_rate_limit_write_bytes_admitted{tenant_id="2"} 50 // kv_tenant_rate_limit_write_bytes_admitted{tenant_id="system"} 0 +// kv_tenant_rate_limit_write_requests_admitted 0 +// kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 0 +// kv_tenant_rate_limit_write_requests_admitted{tenant_id="system"} 0 // // Or with a regular expression: // diff --git a/pkg/kv/kvserver/tenantrate/metrics.go b/pkg/kv/kvserver/tenantrate/metrics.go index b16344c64c2a..8de30d104112 100644 --- a/pkg/kv/kvserver/tenantrate/metrics.go +++ b/pkg/kv/kvserver/tenantrate/metrics.go @@ -18,11 +18,12 @@ import ( // Metrics is a metric.Struct for the LimiterFactory. type Metrics struct { - Tenants *metric.Gauge - CurrentBlocked *aggmetric.AggGauge - RequestsAdmitted *aggmetric.AggCounter - WriteBytesAdmitted *aggmetric.AggCounter - ReadBytesAdmitted *aggmetric.AggCounter + Tenants *metric.Gauge + CurrentBlocked *aggmetric.AggGauge + ReadRequestsAdmitted *aggmetric.AggCounter + WriteRequestsAdmitted *aggmetric.AggCounter + ReadBytesAdmitted *aggmetric.AggCounter + WriteBytesAdmitted *aggmetric.AggCounter } var _ metric.Struct = (*Metrics)(nil) @@ -40,17 +41,17 @@ var ( Measurement: "Requests", Unit: metric.Unit_COUNT, } - metaRequestsAdmitted = metric.Metadata{ - Name: "kv.tenant_rate_limit.requests_admitted", - Help: "Number of requests admitted by the rate limiter", + metaReadRequestsAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.read_requests_admitted", + Help: "Number of read requests admitted by the rate limiter", Measurement: "Requests", Unit: metric.Unit_COUNT, } - metaWriteBytesAdmitted = metric.Metadata{ - Name: "kv.tenant_rate_limit.write_bytes_admitted", - Help: "Number of write bytes admitted by the rate limiter", - Measurement: "Bytes", - Unit: metric.Unit_BYTES, + metaWriteRequestsAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.write_requests_admitted", + Help: "Number of write requests admitted by the rate limiter", + Measurement: "Requests", + Unit: metric.Unit_COUNT, } metaReadBytesAdmitted = metric.Metadata{ Name: "kv.tenant_rate_limit.read_bytes_admitted", @@ -58,6 +59,12 @@ var ( Measurement: "Bytes", Unit: metric.Unit_BYTES, } + metaWriteBytesAdmitted = metric.Metadata{ + Name: "kv.tenant_rate_limit.write_bytes_admitted", + Help: "Number of write bytes admitted by the rate limiter", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } ) // TenantIDLabel is the label used with metrics associated with a tenant. @@ -67,11 +74,12 @@ const TenantIDLabel = "tenant_id" func makeMetrics() Metrics { b := aggmetric.MakeBuilder(TenantIDLabel) return Metrics{ - Tenants: metric.NewGauge(metaTenants), - CurrentBlocked: b.Gauge(metaCurrentBlocked), - RequestsAdmitted: b.Counter(metaRequestsAdmitted), - WriteBytesAdmitted: b.Counter(metaWriteBytesAdmitted), - ReadBytesAdmitted: b.Counter(metaReadBytesAdmitted), + Tenants: metric.NewGauge(metaTenants), + CurrentBlocked: b.Gauge(metaCurrentBlocked), + ReadRequestsAdmitted: b.Counter(metaReadRequestsAdmitted), + WriteRequestsAdmitted: b.Counter(metaWriteRequestsAdmitted), + ReadBytesAdmitted: b.Counter(metaReadBytesAdmitted), + WriteBytesAdmitted: b.Counter(metaWriteBytesAdmitted), } } @@ -80,25 +88,28 @@ func (m *Metrics) MetricStruct() {} // tenantMetrics represent metrics for an individual tenant. type tenantMetrics struct { - currentBlocked *aggmetric.Gauge - requestsAdmitted *aggmetric.Counter - writeBytesAdmitted *aggmetric.Counter - readBytesAdmitted *aggmetric.Counter + currentBlocked *aggmetric.Gauge + readRequestsAdmitted *aggmetric.Counter + writeRequestsAdmitted *aggmetric.Counter + readBytesAdmitted *aggmetric.Counter + writeBytesAdmitted *aggmetric.Counter } func (m *Metrics) tenantMetrics(tenantID roachpb.TenantID) tenantMetrics { tid := tenantID.String() return tenantMetrics{ - currentBlocked: m.CurrentBlocked.AddChild(tid), - requestsAdmitted: m.RequestsAdmitted.AddChild(tid), - writeBytesAdmitted: m.WriteBytesAdmitted.AddChild(tid), - readBytesAdmitted: m.ReadBytesAdmitted.AddChild(tid), + currentBlocked: m.CurrentBlocked.AddChild(tid), + readRequestsAdmitted: m.ReadRequestsAdmitted.AddChild(tid), + writeRequestsAdmitted: m.WriteRequestsAdmitted.AddChild(tid), + readBytesAdmitted: m.ReadBytesAdmitted.AddChild(tid), + writeBytesAdmitted: m.WriteBytesAdmitted.AddChild(tid), } } func (tm *tenantMetrics) destroy() { tm.currentBlocked.Destroy() - tm.requestsAdmitted.Destroy() - tm.writeBytesAdmitted.Destroy() + tm.readRequestsAdmitted.Destroy() + tm.writeRequestsAdmitted.Destroy() tm.readBytesAdmitted.Destroy() + tm.writeBytesAdmitted.Destroy() } diff --git a/pkg/kv/kvserver/tenantrate/settings.go b/pkg/kv/kvserver/tenantrate/settings.go index 50ac9c4b4ffc..33903a075e58 100644 --- a/pkg/kv/kvserver/tenantrate/settings.go +++ b/pkg/kv/kvserver/tenantrate/settings.go @@ -28,18 +28,23 @@ type LimitConfig struct { // It is exported for convenience and testing. // The values are derived from cluster settings. type LimitConfigs struct { - Requests LimitConfig - ReadBytes LimitConfig - WriteBytes LimitConfig + ReadRequests LimitConfig + WriteRequests LimitConfig + ReadBytes LimitConfig + WriteBytes LimitConfig } // LimitConfigsFromSettings constructs LimitConfigs from the values stored in // the settings. func LimitConfigsFromSettings(settings *cluster.Settings) LimitConfigs { return LimitConfigs{ - Requests: LimitConfig{ - Rate: Limit(requestRateLimit.Get(&settings.SV)), - Burst: requestBurstLimit.Get(&settings.SV), + ReadRequests: LimitConfig{ + Rate: Limit(readRequestRateLimit.Get(&settings.SV)), + Burst: readRequestBurstLimit.Get(&settings.SV), + }, + WriteRequests: LimitConfig{ + Rate: Limit(writeRequestRateLimit.Get(&settings.SV)), + Burst: writeRequestBurstLimit.Get(&settings.SV), }, ReadBytes: LimitConfig{ Rate: Limit(readRateLimit.Get(&settings.SV)), @@ -53,14 +58,24 @@ func LimitConfigsFromSettings(settings *cluster.Settings) LimitConfigs { } var ( - requestRateLimit = settings.RegisterPositiveFloatSetting( - "kv.tenant_rate_limiter.requests.rate_limit", - "per-tenant request rate limit in requests per second", + readRequestRateLimit = settings.RegisterPositiveFloatSetting( + "kv.tenant_rate_limiter.read_requests.rate_limit", + "per-tenant read request rate limit in requests per second", + 128) + + readRequestBurstLimit = settings.RegisterPositiveIntSetting( + "kv.tenant_rate_limiter.read_requests.burst_limit", + "per-tenant read request burst limit in requests", + 512) + + writeRequestRateLimit = settings.RegisterPositiveFloatSetting( + "kv.tenant_rate_limiter.write_requests.rate_limit", + "per-tenant write request rate limit in requests per second", 128) - requestBurstLimit = settings.RegisterPositiveIntSetting( - "kv.tenant_rate_limiter.request.burst_limit", - "per-tenant request burst limit in requests", + writeRequestBurstLimit = settings.RegisterPositiveIntSetting( + "kv.tenant_rate_limiter.write_requests.burst_limit", + "per-tenant write request burst limit in requests", 512) readRateLimit = settings.RegisterByteSizeSetting( @@ -86,8 +101,10 @@ var ( // settingsSetOnChangeFuncs are the functions used to register the factory to // be notified of changes to any of the settings which configure it. settingsSetOnChangeFuncs = [...]func(*settings.Values, func()){ - requestRateLimit.SetOnChange, - requestBurstLimit.SetOnChange, + readRequestRateLimit.SetOnChange, + readRequestBurstLimit.SetOnChange, + writeRequestRateLimit.SetOnChange, + writeRequestBurstLimit.SetOnChange, readRateLimit.SetOnChange, readBurstLimit.SetOnChange, writeRateLimit.SetOnChange, diff --git a/pkg/kv/kvserver/tenantrate/system_limiter.go b/pkg/kv/kvserver/tenantrate/system_limiter.go index e065f8da01f7..ad8e48e60891 100644 --- a/pkg/kv/kvserver/tenantrate/system_limiter.go +++ b/pkg/kv/kvserver/tenantrate/system_limiter.go @@ -18,9 +18,13 @@ type systemLimiter struct { tenantMetrics } -func (s systemLimiter) Wait(ctx context.Context, writeBytes int64) error { +func (s systemLimiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error { + if isWrite { + s.writeRequestsAdmitted.Inc(1) + } else { + s.readRequestsAdmitted.Inc(1) + } s.writeBytesAdmitted.Inc(writeBytes) - s.requestsAdmitted.Inc(1) return nil } diff --git a/pkg/kv/kvserver/tenantrate/testdata/basic b/pkg/kv/kvserver/tenantrate/testdata/basic index b428f36bb9aa..77a8db5de8d8 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/basic +++ b/pkg/kv/kvserver/tenantrate/testdata/basic @@ -1,7 +1,8 @@ init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 1024, burst: 2048 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 1024, burst: 2048 } ---- 00:00:00.000 @@ -12,31 +13,35 @@ get_tenants ---- [2#2, 3#1, 5#3, system#1] -# Launch two requests on behalf of tenant 2 and one on behalf of 3. +# Launch four requests on behalf of tenant 2, one on behalf of 3, and one on +# behalf of the system tenant. launch - { id: g0, tenant: 1 } - { id: g1, tenant: 2 } - { id: g2, tenant: 2 } -- { id: g3, tenant: 3 } +- { id: g3, tenant: 2, iswrite: true } +- { id: g4, tenant: 2, iswrite: true } +- { id: g5, tenant: 3 } ---- -[g0@system, g1@2, g2@2, g3@3] +[g0@system, g1@2, g2@2, g3@2, g4@2, g5@3] # Ensure that none of the above requests get blocked because they use less # than the configured burst for their respective limiters. await -[g0, g1, g2, g3] +[g0, g1, g2, g3, g4, g5] ---- [] -# Launch another request on behalf of tenant 2, it will block due to the request -# rate limit. +# Launch another read and another write request on behalf of tenant 2, it will +# block due to the request rate limit. launch -- { id: g4, tenant: 2 } +- { id: g6, tenant: 2 } +- { id: g7, tenant: 2, iswrite: true } ---- -[g4@2] +[g6@2, g7@2] # Ensure that it the above request was blocked by observing the timer it creates # to wait for available quota. @@ -46,29 +51,34 @@ timers 00:00:01.000 # Observe that the "current_blocked" counter has appropriate values to indicate -# that there is one blocked request for tenant 2 and total. +# that there are two blocked request for tenant 2 and total. metrics current_blocked ---- -kv_tenant_rate_limit_current_blocked 1 -kv_tenant_rate_limit_current_blocked{tenant_id="2"} 1 +kv_tenant_rate_limit_current_blocked 2 +kv_tenant_rate_limit_current_blocked{tenant_id="2"} 2 kv_tenant_rate_limit_current_blocked{tenant_id="3"} 0 kv_tenant_rate_limit_current_blocked{tenant_id="5"} 0 kv_tenant_rate_limit_current_blocked{tenant_id="system"} 0 -# Observe that the "requests_admitted" counter has appropriate values to +# Observe that the "requests_admitted" counters has appropriate values to # indicate that requests have been admitted. metrics -kv_tenant_rate_limit_requests_admitted +kv_tenant_rate_limit_.*_requests_admitted ---- -kv_tenant_rate_limit_requests_admitted 4 -kv_tenant_rate_limit_requests_admitted{tenant_id="2"} 2 -kv_tenant_rate_limit_requests_admitted{tenant_id="3"} 1 -kv_tenant_rate_limit_requests_admitted{tenant_id="5"} 0 -kv_tenant_rate_limit_requests_admitted{tenant_id="system"} 1 +kv_tenant_rate_limit_read_requests_admitted 4 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 2 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="3"} 1 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="5"} 0 +kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 1 +kv_tenant_rate_limit_write_requests_admitted 2 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 2 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="3"} 0 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="5"} 0 +kv_tenant_rate_limit_write_requests_admitted{tenant_id="system"} 0 # Release the tenant 3 rate limiter. @@ -91,9 +101,9 @@ advance ---- 00:00:01.001 -# Observe that the blocked request is now unblocked. +# Observe that the blocked requests are now unblocked. await -- g4 +[g6, g7] ---- [] diff --git a/pkg/kv/kvserver/tenantrate/testdata/burst b/pkg/kv/kvserver/tenantrate/testdata/burst index 6624677e9b00..9ed29bd12113 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/burst +++ b/pkg/kv/kvserver/tenantrate/testdata/burst @@ -2,9 +2,10 @@ # into debt. init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 10, burst: 20 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 10, burst: 20 } ---- 00:00:00.000 @@ -19,7 +20,7 @@ get_tenants # limit. This will not block but will put the limiter into debt. launch -- { id: g1, tenant: 2, writebytes: 30 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } ---- [g1@2] @@ -33,7 +34,7 @@ await # 10/s. launch -- { id: g1, tenant: 2, writebytes: 10 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 10 } ---- [g1@2] @@ -67,7 +68,7 @@ await # is above the burst of 20, we'll need to wait for the bucket to be full. launch -- { id: g1, tenant: 2, writebytes: 30 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } ---- [g1@2] diff --git a/pkg/kv/kvserver/tenantrate/testdata/cancel b/pkg/kv/kvserver/tenantrate/testdata/cancel index 6cc28b14da76..77ce14d13d5f 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/cancel +++ b/pkg/kv/kvserver/tenantrate/testdata/cancel @@ -1,9 +1,10 @@ # This tests cancellation and unblocking subsequent requests. init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 1024, burst: 2048 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 1024, burst: 2048 } ---- 00:00:00.000 @@ -15,7 +16,7 @@ get_tenants # Launch a request to consume half of the 1024 capacity. launch -- { id: g1, tenant: 2, writebytes: 1024 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 1024 } ---- [g1@2] @@ -27,7 +28,7 @@ await # Launch a request requiring more quota than exists. launch -- { id: g2, tenant: 2, writebytes: 1536 } +- { id: g2, tenant: 2, iswrite: true, writebytes: 1536 } ---- [g2@2] @@ -40,7 +41,7 @@ timers # Launch another request which could be fulfilled by the existing quota. launch -- { id: g3, tenant: 2, writebytes: 1024 } +- { id: g3, tenant: 2, iswrite: true, writebytes: 1024 } ---- [g2@2, g3@2] diff --git a/pkg/kv/kvserver/tenantrate/testdata/reads b/pkg/kv/kvserver/tenantrate/testdata/reads index e9b686ffb8dd..9f99d85c5c17 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/reads +++ b/pkg/kv/kvserver/tenantrate/testdata/reads @@ -2,8 +2,8 @@ # into debt init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 10, burst: 100 } +readrequests: { rate: 1, burst: 2 } +readbytes: { rate: 10, burst: 100 } ---- 00:00:00.000 diff --git a/pkg/kv/kvserver/tenantrate/testdata/update b/pkg/kv/kvserver/tenantrate/testdata/update index 88dda006b8fa..fefc8e88364e 100644 --- a/pkg/kv/kvserver/tenantrate/testdata/update +++ b/pkg/kv/kvserver/tenantrate/testdata/update @@ -1,9 +1,10 @@ # Test updating the configuration of the rate limiter. init -requests: { rate: 1, burst: 2 } -readbytes: { rate: 1024, burst: 2048 } -writebytes: { rate: 10, burst: 20 } +readrequests: { rate: 1, burst: 2 } +writerequests: { rate: 1, burst: 2 } +readbytes: { rate: 1024, burst: 2048 } +writebytes: { rate: 10, burst: 20 } ---- 00:00:00.000 @@ -15,7 +16,7 @@ get_tenants # Launch a request that puts the limiter in debt by 10. launch -- { id: g1, tenant: 2, writebytes: 30 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 30 } ---- [g1@2] @@ -28,7 +29,7 @@ await # with the current debt. launch -- { id: g1, tenant: 2, writebytes: 20 } +- { id: g1, tenant: 2, iswrite: true, writebytes: 20 } ---- [g1@2] diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index b14529e5bf27..17e00d2d0e9c 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1348,8 +1348,10 @@ func (t *logicTest) setup(cfg testClusterConfig, serverArgs TestServerArgs) { // Increase tenant rate limits for faster tests. conn := t.cluster.ServerConn(0) for _, settingName := range []string{ - "kv.tenant_rate_limiter.requests.rate_limit", - "kv.tenant_rate_limiter.request.burst_limit", + "kv.tenant_rate_limiter.read_requests.rate_limit", + "kv.tenant_rate_limiter.read_requests.burst_limit", + "kv.tenant_rate_limiter.write_requests.rate_limit", + "kv.tenant_rate_limiter.write_requests.burst_limit", } { if _, err := conn.Exec( fmt.Sprintf("SET CLUSTER SETTING %s = %d", settingName, 100000), diff --git a/pkg/sql/logictest/testdata/logic_test/partial_index b/pkg/sql/logictest/testdata/logic_test/partial_index index 175896212ffe..21013ae481ed 100644 --- a/pkg/sql/logictest/testdata/logic_test/partial_index +++ b/pkg/sql/logictest/testdata/logic_test/partial_index @@ -900,7 +900,7 @@ statement ok CREATE UNIQUE INDEX i2 ON u (a) WHERE b < 0 # Error if the arbiter predicate does not imply the partial index predicate. -statement error pgcode 42P10 there are multiple unique or exclusion constraints matching the ON CONFLICT specification +statement error pgcode 0A000 there are multiple unique or exclusion constraints matching the ON CONFLICT specification INSERT INTO u VALUES (1, 1) ON CONFLICT (a) WHERE b < 0 AND b > 0 DO UPDATE SET b = 10 statement ok diff --git a/pkg/sql/opt/optbuilder/insert.go b/pkg/sql/opt/optbuilder/insert.go index 455924e1520a..22a0e3630524 100644 --- a/pkg/sql/opt/optbuilder/insert.go +++ b/pkg/sql/opt/optbuilder/insert.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/errors" ) @@ -830,7 +831,7 @@ func (mb *mutationBuilder) buildInputForUpsert( // TODO(mgartner): Add support for multiple arbiter indexes, similar to // buildInputForDoNothing. if arbiterIndexes.Len() > 1 { - panic(pgerror.Newf(pgcode.InvalidColumnReference, + panic(unimplemented.NewWithIssue(53170, "there are multiple unique or exclusion constraints matching the ON CONFLICT specification")) } diff --git a/pkg/sql/opt/optbuilder/testdata/upsert b/pkg/sql/opt/optbuilder/testdata/upsert index 219bc6496e1d..2f34df5fadd3 100644 --- a/pkg/sql/opt/optbuilder/testdata/upsert +++ b/pkg/sql/opt/optbuilder/testdata/upsert @@ -2381,7 +2381,7 @@ DROP INDEX u4 build INSERT INTO unique_partial_indexes VALUES (1, 1, 'bar') ON CONFLICT (b) WHERE c = 'foo' AND c = 'bar' DO UPDATE SET b = 10 ---- -error (42P10): there are multiple unique or exclusion constraints matching the ON CONFLICT specification +error (0A000): unimplemented: there are multiple unique or exclusion constraints matching the ON CONFLICT specification build INSERT INTO unique_partial_indexes VALUES (1, 1, 'bar') ON CONFLICT (b) WHERE c = 'foo' DO UPDATE SET b = 10 diff --git a/pkg/sql/opt/partialidx/implicator.go b/pkg/sql/opt/partialidx/implicator.go index 789044136575..27f9c6c9466b 100644 --- a/pkg/sql/opt/partialidx/implicator.go +++ b/pkg/sql/opt/partialidx/implicator.go @@ -113,8 +113,8 @@ import ( // remaining filters. It would be difficult to support this case without // breaking the other cases prevented by each of the three rules. // -// A set of opt.Expr keeps track of exact matches encountered while exploring -// the filters and predicate expressions. If implication is proven, the filters +// An exprSet keeps track of exact matches encountered while exploring the +// filters and predicate expressions. If implication is proven, the filters // expression is traversed and the expressions in the opt.Expr set are removed. // While proving implication this set is not passed to recursive calls when a // disjunction is encountered in the predicate (rule #2), and disjunctions in @@ -176,9 +176,9 @@ func (im *Implicator) FiltersImplyPredicate( // If no exact match was found, recursively check the sub-expressions of the // filters and predicate. Use exactMatches to keep track of expressions in - // filters that exactly matches expressions in pred, so that the can be + // filters that exactly match expressions in pred, so that they can be // removed from the remaining filters. - exactMatches := make(map[opt.Expr]struct{}) + exactMatches := make(exprSet) if im.scalarExprImpliesPredicate(&filters, &pred, exactMatches) { remainingFilters = im.simplifyFiltersExpr(filters, exactMatches) return remainingFilters, true @@ -254,14 +254,12 @@ func (im *Implicator) filtersImplyPredicateFastPath( // Also note that exactMatches is optional, and nil can be passed when it is not // necessary to keep track of exactly matching expressions. func (im *Implicator) scalarExprImpliesPredicate( - e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches map[opt.Expr]struct{}, + e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches exprSet, ) bool { // If the expressions are an exact match, then e implies pred. if e == pred { - if exactMatches != nil { - exactMatches[e] = struct{}{} - } + exactMatches.add(e) return true } @@ -287,7 +285,7 @@ func (im *Implicator) scalarExprImpliesPredicate( // filtersExprImpliesPredicate returns true if the FiltersExpr e implies the // ScalarExpr pred. func (im *Implicator) filtersExprImpliesPredicate( - e *memo.FiltersExpr, pred opt.ScalarExpr, exactMatches map[opt.Expr]struct{}, + e *memo.FiltersExpr, pred opt.ScalarExpr, exactMatches exprSet, ) bool { switch pt := pred.(type) { case *memo.FiltersExpr: @@ -351,7 +349,7 @@ func (im *Implicator) filtersExprImpliesPredicate( // passed to filtersExprImpliesPredicate to prevent duplicating logic for both // types of conjunctions. func (im *Implicator) andExprImpliesPredicate( - e *memo.AndExpr, pred opt.ScalarExpr, exactMatches map[opt.Expr]struct{}, + e *memo.AndExpr, pred opt.ScalarExpr, exactMatches exprSet, ) bool { f := make(memo.FiltersExpr, 2) f[0] = memo.FiltersItem{Condition: e.Left} @@ -419,7 +417,7 @@ func (im *Implicator) orExprImpliesPredicate(e *memo.OrExpr, pred opt.ScalarExpr // ScalarExpr pred. The atom e cannot be an AndExpr, OrExpr, RangeExpr, or // FiltersExpr. func (im *Implicator) atomImpliesPredicate( - e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches map[opt.Expr]struct{}, + e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches exprSet, ) bool { switch pt := pred.(type) { case *memo.FiltersExpr: @@ -456,14 +454,14 @@ func (im *Implicator) atomImpliesPredicate( } // atomImpliesAtom returns true if the predicate atom expression, pred, contains -// atom expression a, meaning that all values for variables in which e evaluates +// atom expression e, meaning that all values for variables in which e evaluates // to true, pred also evaluates to true. // // Constraints are used to prove containment because they make it easy to assess // if one expression contains another, handling many types of expressions // including comparison operators, IN operators, and tuples. func (im *Implicator) atomImpliesAtom( - e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches map[opt.Expr]struct{}, + e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches exprSet, ) bool { // Check for containment of comparison expressions with two variables, like // a = b. @@ -512,7 +510,29 @@ func (im *Implicator) atomImpliesAtom( return false } - return predConstraint.Contains(im.evalCtx, eConstraint) + // If predConstraint contains eConstraint, then eConstraint implies + // predConstraint. + if predConstraint.Contains(im.evalCtx, eConstraint) { + // If the constraints contain each other, then they are semantically + // equal and the filter atom can be removed from the remaining filters. + // For example: + // + // (a::INT > 17) + // => + // (a::INT >= 18) + // + // (a > 17) is not the same expression as (a >= 18) syntactically, but + // they are semantically equivalent because there are no integers + // between 17 and 18. Therefore, there is no need to apply (a > 17) as a + // filter after the partial index scan. + exactMatches.addIf(e, func() bool { + return eConstraint.Columns.IsPrefixOf(&predConstraint.Columns) && + eConstraint.Contains(im.evalCtx, predConstraint) + }) + return true + } + + return false } // twoVarComparisonImpliesTwoVarComparison returns true if pred contains e, @@ -524,7 +544,7 @@ func (im *Implicator) atomImpliesAtom( // values of a and b that satisfy the first expression also satisfy the second // expression. func (im *Implicator) twoVarComparisonImpliesTwoVarComparison( - e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches map[opt.Expr]struct{}, + e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches exprSet, ) (containment bool, ok bool) { if !isTwoVarComparison(e) || !isTwoVarComparison(pred) { return false, false @@ -585,9 +605,9 @@ func (im *Implicator) twoVarComparisonImpliesTwoVarComparison( // pred's operator, then e is an exact match to pred and it should be // removed from the remaining filters. For example, (a > b) and // (b < a) both individually imply (a > b) with no remaining filters. - if exactMatches != nil && (e.Op() == pred.Op() || commutedOp(e.Op()) == pred.Op()) { - exactMatches[e] = struct{}{} - } + exactMatches.addIf(e, func() bool { + return e.Op() == pred.Op() || commutedOp(e.Op()) == pred.Op() + }) return true, true } @@ -634,14 +654,14 @@ func (im *Implicator) warmCache(filters memo.FiltersExpr) { // is omitted from the returned FiltersItem. If not, the FiltersItem is // recursively searched. See simplifyScalarExpr for more details. func (im *Implicator) simplifyFiltersExpr( - e memo.FiltersExpr, exactMatches map[opt.Expr]struct{}, + e memo.FiltersExpr, exactMatches exprSet, ) memo.FiltersExpr { filters := make(memo.FiltersExpr, 0, len(e)) for i := range e { // If an entire FiltersItem exists in exactMatches, don't add it to the // output filters. - if _, ok := exactMatches[e[i].Condition]; ok { + if exactMatches.contains(e[i].Condition) { continue } @@ -667,9 +687,7 @@ func (im *Implicator) simplifyFiltersExpr( // // Also note that we do not attempt to traverse OrExprs. See // FiltersImplyPredicate (rule #3) for more details. -func (im *Implicator) simplifyScalarExpr( - e opt.ScalarExpr, exactMatches map[opt.Expr]struct{}, -) opt.ScalarExpr { +func (im *Implicator) simplifyScalarExpr(e opt.ScalarExpr, exactMatches exprSet) opt.ScalarExpr { switch t := e.(type) { case *memo.RangeExpr: @@ -677,8 +695,8 @@ func (im *Implicator) simplifyScalarExpr( return im.f.ConstructRange(and) case *memo.AndExpr: - _, leftIsExactMatch := exactMatches[t.Left] - _, rightIsExactMatch := exactMatches[t.Right] + leftIsExactMatch := exactMatches.contains(t.Left) + rightIsExactMatch := exactMatches.contains(t.Right) if leftIsExactMatch && rightIsExactMatch { return memo.TrueSingleton } @@ -734,3 +752,35 @@ func isTwoVarComparison(e opt.ScalarExpr) bool { e.Child(0).Op() == opt.VariableOp && e.Child(1).Op() == opt.VariableOp } + +// exprSet represents a set of opt.Expr. It prevents nil pointer exceptions when +// tracking exact expression matches during implication. The nil value is passed +// in several places to avoid adding child expressions to the exact matches set, +// such as when traversing an OrExpr. +type exprSet map[opt.Expr]struct{} + +// add adds an expression to the set if the set is non-nil. +func (s exprSet) add(e opt.Expr) { + if s != nil { + s[e] = struct{}{} + } +} + +// addIf adds an expression to the set if the set is non-nil and the given +// function returns true. addIf short-circuits and does not call the function if +// the set is nil. +func (s exprSet) addIf(e opt.Expr, fn func() bool) { + if s != nil && fn() { + s[e] = struct{}{} + } +} + +// contains returns true if the set is non-nil and the given expression exists +// in the set. +func (s exprSet) contains(e opt.Expr) bool { + if s != nil { + _, ok := s[e] + return ok + } + return false +} diff --git a/pkg/sql/opt/partialidx/testdata/implicator/atom b/pkg/sql/opt/partialidx/testdata/implicator/atom index ed43dc2ccc15..0e0817af4d4e 100644 --- a/pkg/sql/opt/partialidx/testdata/implicator/atom +++ b/pkg/sql/opt/partialidx/testdata/implicator/atom @@ -64,7 +64,7 @@ predtest vars=(bool) @1 = true ---- true -└── remaining filters: @1 +└── remaining filters: none predtest vars=(bool) @1 @@ -72,7 +72,7 @@ predtest vars=(bool) @1 IN (true) ---- true -└── remaining filters: @1 +└── remaining filters: none predtest vars=(bool) NOT @1 @@ -179,6 +179,14 @@ predtest vars=(int) true └── remaining filters: @1 > 10 +predtest vars=(int) +@1 > 17 +=> +@1 >= 18 +---- +true +└── remaining filters: none + predtest vars=(int, int) @1 > @2 => diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 2a55daeed517..c8d544e3b492 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -743,16 +743,16 @@ var charts = []sectionDescription{ Metrics: []string{"kv.tenant_rate_limit.num_tenants"}, }, { - Title: "Requests Admitted by Rate Limiter", + Title: "Read Requests Admitted by Rate Limiter", Downsampler: DescribeAggregator_MAX, Percentiles: false, - Metrics: []string{"kv.tenant_rate_limit.requests_admitted"}, + Metrics: []string{"kv.tenant_rate_limit.read_requests_admitted"}, }, { - Title: "Write Bytes Admitted by Rate Limiter", + Title: "Write Requests Admitted by Rate Limiter", Downsampler: DescribeAggregator_MAX, Percentiles: false, - Metrics: []string{"kv.tenant_rate_limit.write_bytes_admitted"}, + Metrics: []string{"kv.tenant_rate_limit.write_requests_admitted"}, }, { Title: "Read Bytes Admitted by Rate Limiter", @@ -760,6 +760,12 @@ var charts = []sectionDescription{ Percentiles: false, Metrics: []string{"kv.tenant_rate_limit.read_bytes_admitted"}, }, + { + Title: "Write Bytes Admitted by Rate Limiter", + Downsampler: DescribeAggregator_MAX, + Percentiles: false, + Metrics: []string{"kv.tenant_rate_limit.write_bytes_admitted"}, + }, }, }, {