Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
53507: partialidx: reduce remaining filters of semantically equivalent expressions r=rytaft a=mgartner

#### partialidx: reduce remaining filters of semantically equivalent expressions

This commit reduces the remaining filters returned from the Implicator
in cases where expressions in the filter are semantically equivalent to
expression in the predicate, but not syntactically equivalent.

For example:

    (a::INT > 17)
    =>
    (a::INT >= 18)

Prior to this commit, `(a > 17)` would remain as a filter to be applied
after a partial index scan. However, this filter is unnecessary, because
there are no integers between `17` and `18`. With this change,
`(a > 17)` is removed from the remaining filters.

Release justification: Low risk update to new functionality, partial
indexes.

Release note (performance improvement): The optimizer reduces filters
applied after partial index scans in more cases where the filters are
implicitly applied by the partial index predicate. This could lead to
more efficient query plans in some cases.

#### partialidx: add safer set operations for tracking exact matches

This commit adds a typed set for tracking filter expressions that are
exact matches of predicate expressions. This will help prevent a common
developer mistake of not checking whether or not the set it `nil` before
manipulating it, such as the bug fixed by #53222.

Benchmarks show performance improvements of the Implicator for some
cases as a result of this change.

    name                                               old time/op  new time/op  delta
    Implicator/single-exact-match-16                   81.2ns ± 1%  77.8ns ± 1%  -4.24%  (p=0.008 n=5+5)
    Implicator/single-inexact-match-16                  916ns ± 1%   914ns ± 0%    ~     (p=0.254 n=5+5)
    Implicator/range-inexact-match-16                  2.40µs ± 0%  2.46µs ± 1%  +2.56%  (p=0.008 n=5+5)
    Implicator/single-exact-match-extra-filters-16      313ns ± 0%   305ns ± 1%  -2.74%  (p=0.008 n=5+5)
    Implicator/single-inexact-match-extra-filters-16   3.85µs ± 1%  3.85µs ± 1%    ~     (p=0.952 n=5+5)
    Implicator/multi-column-and-exact-match-16         90.0ns ± 1%  84.3ns ± 1%  -6.29%  (p=0.008 n=5+5)
    Implicator/multi-column-and-inexact-match-16       1.99µs ± 0%  2.02µs ± 1%  +1.53%  (p=0.016 n=4+5)
    Implicator/multi-column-or-exact-match-16          82.3ns ± 3%  78.1ns ± 1%  -5.10%  (p=0.008 n=5+5)
    Implicator/multi-column-or-exact-match-reverse-16  1.72µs ± 1%  1.74µs ± 2%    ~     (p=0.397 n=5+5)
    Implicator/multi-column-or-inexact-match-16        2.23µs ± 2%  2.31µs ± 3%  +3.74%  (p=0.008 n=5+5)
    Implicator/and-filters-do-not-imply-pred-16        3.61µs ± 1%  3.63µs ± 4%    ~     (p=0.548 n=5+5)
    Implicator/or-filters-do-not-imply-pred-16          916ns ± 2%   905ns ± 2%    ~     (p=0.095 n=5+5)
    Implicator/many-columns-exact-match10-16            305ns ± 1%   278ns ± 1%  -8.97%  (p=0.008 n=5+5)
    Implicator/many-columns-inexact-match10-16         12.9µs ± 1%  12.9µs ± 1%    ~     (p=0.841 n=5+5)
    Implicator/many-columns-exact-match100-16          21.3µs ± 1%  19.2µs ± 2%  -9.94%  (p=0.008 n=5+5)
    Implicator/many-columns-inexact-match100-16         502µs ± 1%   503µs ± 1%    ~     (p=0.421 n=5+5)

Release justification: Low risk update to new functionality, partial
indexes.

Release note: None


53510: kv: split rate limits and metrics for read and write requests r=nvanbenschoten a=nvanbenschoten

Fixes #53483.

This commit splits the existing request rate limit into two categories, read requests and write requests. Experimentation has shown that the fixed cost of a request is dramatically different between these two categories, primarily because write requests need to go through Raft while read requests do not. By splitting the limits and metrics along this dimension, we expect to be able to more accurately model the cost of KV traffic and more effectively tune rate limits.

In making the split, the commit replaces the existing metric:
```
kv.tenant_rate_limit.requests_admitted
```
with the following two new metrics:
```
kv.tenant_rate_limit.read_requests_admitted
kv.tenant_rate_limit.write_requests_admitted
```

The commit also replaced the existing two settings:
```
kv.tenant_rate_limiter.requests.rate_limit
kv.tenant_rate_limiter.request.burst_limit
```
with the following four new settings:
```
kv.tenant_rate_limiter.read_requests.rate_limit
kv.tenant_rate_limiter.read_requests.burst_limit
kv.tenant_rate_limiter.write_requests.rate_limit
kv.tenant_rate_limiter.write_requests.burst_limit
```

Release justification: Low-risk, high benefit change.

53517: opt: better error message for multiple arbiters with CONFLICT DO UPDATE r=mgartner a=mgartner

This commit improves the error message and adds telemetry for INSERT ON
CONFLICT DO UPDATE statements that find multiple matching arbiter
indexes.

Release justification: This is a low-risk update to new functionality,
partial indexes.

Release note: None

53544: roachtest: skip and reown tests r=nvanbenschoten,petermattis a=tbg

cc @petermattis, let me know if you're opposed to owning the disk-stalled roachtests. I saw while triaging that you and Bilal were discussing fixing the charybdefs setup and I also saw talk about disk stall work in the weekly notes.

Release justification: testing only
Release note: None

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
4 people committed Aug 27, 2020
5 parents 3fd6e3b + 86140c5 + 4c9110f + c129f14 + 5343d56 commit e43acc9
Show file tree
Hide file tree
Showing 25 changed files with 323 additions and 177 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/disk_stall.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func registerDiskStalledDetection(r *testRegistry) {
"disk-stalled/log=%t,data=%t",
affectsLogDir, affectsDataDir,
),
Owner: OwnerKV,
Owner: OwnerStorage,
MinVersion: "v19.2.0",
Cluster: makeClusterSpec(1),
Run: func(ctx context.Context, t *test, c *cluster) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/roachtest/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ func runGossipPeerings(ctx context.Context, t *test, c *cluster) {
t.l.Printf("%d: restarting node %d\n", i, node[0])
c.Stop(ctx, node)
c.Start(ctx, t, node)
// Sleep a bit to avoid hitting:
// https://github.com/cockroachdb/cockroach/issues/48005
time.Sleep(3 * time.Second)
}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/cmd/roachtest/jepsen.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,10 @@ func registerJepsen(r *testRegistry) {
for _, nemesis := range jepsenNemeses {
nemesis := nemesis // copy for closure
spec := testSpec{
Name: fmt.Sprintf("jepsen/%s/%s", testName, nemesis.name),
Owner: OwnerKV,
Name: fmt.Sprintf("jepsen/%s/%s", testName, nemesis.name),
// We don't run jepsen on older releases due to the high rate of flakes.
MinVersion: "v20.1.0",
Owner: OwnerKV,
// The Jepsen tests do funky things to machines, like muck with the
// system clock; therefore, their clusters cannot be reused other tests
// except the Jepsen ones themselves which reset all this state when
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func registerTPCC(r *testRegistry) {
LoadWarehouses: 5000,
EstimatedMax: 3000,

MinVersion: "v19.1.0",
MinVersion: "v20.1.0",
})
registerTPCCBenchSpec(r, tpccBenchSpec{
Nodes: 9,
Expand All @@ -425,7 +425,7 @@ func registerTPCC(r *testRegistry) {
LoadWarehouses: 2000,
EstimatedMax: 900,

MinVersion: "v19.1.0",
MinVersion: "v20.1.0",
})
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/client_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,20 @@ func TestTenantRateLimiter(t *testing.T) {
// the tenant range.
tenantCtx := roachpb.NewContextForTenant(ctx, tenantID)
cfg := tenantrate.LimitConfigsFromSettings(s.ClusterSettings())
for i := 0; i < int(cfg.Requests.Burst); i++ {
for i := 0; i < int(cfg.WriteRequests.Burst); i++ {
require.NoError(t, db.Put(ctx, mkKey(), 0))
}
// Now ensure that in the same instant the write QPS limit does affect the
// tenant. Issuing up to the burst limit of requests can happen without
// blocking.
for i := 0; i < int(cfg.Requests.Burst); i++ {
for i := 0; i < int(cfg.WriteRequests.Burst); i++ {
require.NoError(t, db.Put(tenantCtx, mkKey(), 0))
}
// Attempt to issue another request, make sure that it gets blocked by
// observing a timer.
errCh := make(chan error, 1)
go func() { errCh <- db.Put(tenantCtx, mkKey(), 0) }()
expectedTimer := t0.Add(time.Duration(float64(1/cfg.Requests.Rate) * float64(time.Second)))
expectedTimer := t0.Add(time.Duration(float64(1/cfg.WriteRequests.Rate) * float64(time.Second)))
testutils.SucceedsSoon(t, func() error {
timers := timeSource.Timers()
if len(timers) != 1 {
Expand All @@ -213,18 +213,18 @@ func TestTenantRateLimiter(t *testing.T) {
return string(read)
}
makeMetricStr := func(expCount int64) string {
const tenantMetricStr = `kv_tenant_rate_limit_requests_admitted{store="1",tenant_id="10"}`
const tenantMetricStr = `kv_tenant_rate_limit_write_requests_admitted{store="1",tenant_id="10"}`
return fmt.Sprintf("%s %d", tenantMetricStr, expCount)
}

// Ensure that the metric for the admitted requests is equal to the number of
// requests which we've admitted.
require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst))
require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst))

// Allow the blocked request to proceed.
timeSource.Advance(time.Second)
require.NoError(t, <-errCh)

// Ensure that it is now reflected in the metrics.
require.Contains(t, getMetrics(), makeMetricStr(cfg.Requests.Burst+1))
require.Contains(t, getMetrics(), makeMetricStr(cfg.WriteRequests.Burst+1))
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (r *Replica) maybeRateLimitBatch(ctx context.Context, ba *roachpb.BatchRequ
if !ok || tenantID == roachpb.SystemTenantID {
return nil
}
return r.tenantLimiter.Wait(ctx, bytesWrittenFromRequest(ba))
return r.tenantLimiter.Wait(ctx, ba.IsWrite(), bytesWrittenFromRequest(ba))
}

// bytesWrittenFromBatchRequest returns an approximation of the number of bytes
Expand Down
12 changes: 5 additions & 7 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func (r *Replica) sendWithRangeID(
r.maybeInitializeRaftGroup(ctx)

isReadOnly := ba.IsReadOnly()
useRaft := !isReadOnly && ba.IsWrite()

if err := r.checkBatchRequest(ba, isReadOnly); err != nil {
return nil, roachpb.NewError(err)
}
Expand All @@ -92,14 +90,14 @@ func (r *Replica) sendWithRangeID(

// Differentiate between read-write, read-only, and admin.
var pErr *roachpb.Error
if useRaft {
log.Event(ctx, "read-write path")
fn := (*Replica).executeWriteBatch
br, pErr = r.executeBatchWithConcurrencyRetries(ctx, ba, fn)
} else if isReadOnly {
if isReadOnly {
log.Event(ctx, "read-only path")
fn := (*Replica).executeReadOnlyBatch
br, pErr = r.executeBatchWithConcurrencyRetries(ctx, ba, fn)
} else if ba.IsWrite() {
log.Event(ctx, "read-write path")
fn := (*Replica).executeWriteBatch
br, pErr = r.executeBatchWithConcurrencyRetries(ctx, ba, fn)
} else if ba.IsAdmin() {
log.Event(ctx, "admin path")
br, pErr = r.executeAdminBatch(ctx, ba)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/tenantrate/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import "github.com/cockroachdb/cockroach/pkg/settings/cluster"
// OverrideSettingsWithRateLimits utilizes LimitConfigs from the values stored in the
// settings.
func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) {
requestRateLimit.Override(&settings.SV, float64(rl.Requests.Rate))
requestBurstLimit.Override(&settings.SV, rl.Requests.Burst)
readRequestRateLimit.Override(&settings.SV, float64(rl.ReadRequests.Rate))
readRequestBurstLimit.Override(&settings.SV, rl.ReadRequests.Burst)
writeRequestRateLimit.Override(&settings.SV, float64(rl.WriteRequests.Rate))
writeRequestBurstLimit.Override(&settings.SV, rl.WriteRequests.Burst)
readRateLimit.Override(&settings.SV, int64(rl.ReadBytes.Rate))
readBurstLimit.Override(&settings.SV, rl.ReadBytes.Burst)
writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate))
Expand Down
79 changes: 51 additions & 28 deletions pkg/kv/kvserver/tenantrate/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ import (
type Limiter interface {

// Wait acquires n quota from the limiter. This acquisition cannot be
// released. Each call to wait will consume 1 read byte, 1 request, and
// writeBytes from the token buckets. Calls to Wait will block until the
// buckets contain adequate resources. If a request attempts to write more
// than the burst limit, it will wait until the bucket is completely full
// before acquiring the requested quantity and putting the limiter in debt.
// released. Each call to wait will consume 1 read or write request
// depending on isWrite, 1 read byte, and writeBytes from the token buckets.
// Calls to Wait will block until the buckets contain adequate resources. If
// a request attempts to write more than the burst limit, it will wait until
// the bucket is completely full before acquiring the requested quantity and
// putting the limiter in debt.
//
// The only errors which should be returned are due to the context.
Wait(ctx context.Context, writeBytes int64) error
Wait(ctx context.Context, isWrite bool, writeBytes int64) error

// RecordRead subtracts the bytes read by a request from the token bucket.
// This call may push the Limiter into debt in the ReadBytes dimensions
Expand Down Expand Up @@ -85,28 +86,38 @@ func (rl *limiter) init(
metrics: metrics,
}
buckets := tokenBuckets{
requests: makeTokenBucket(conf.Requests),
readBytes: makeTokenBucket(conf.ReadBytes),
writeBytes: makeTokenBucket(conf.WriteBytes),
readRequests: makeTokenBucket(conf.ReadRequests),
writeRequests: makeTokenBucket(conf.WriteRequests),
readBytes: makeTokenBucket(conf.ReadBytes),
writeBytes: makeTokenBucket(conf.WriteBytes),
}
options = append(options, quotapool.OnAcquisition(func(
ctx context.Context, poolName string, r quotapool.Request, start time.Time,
) {
req := r.(*waitRequest)
if req.readRequests > 0 {
rl.metrics.readRequestsAdmitted.Inc(req.readRequests)
}
if req.writeRequests > 0 {
rl.metrics.writeRequestsAdmitted.Inc(req.writeRequests)
}
// Accounted for in limiter.RecordRead.
// if req.readBytes > 0 {
// rl.metrics.readBytesAdmitted.Inc(req.readBytes)
// }
if req.writeBytes > 0 {
rl.metrics.writeBytesAdmitted.Inc(req.writeBytes)
}
rl.metrics.requestsAdmitted.Inc(1)
}))
rl.qp = quotapool.New(tenantID.String(), &buckets, options...)
buckets.clock = rl.qp.TimeSource()
buckets.lastUpdated = buckets.clock.Now()
}

func (rl *limiter) Wait(ctx context.Context, writeBytes int64) error {
func (rl *limiter) Wait(ctx context.Context, isWrite bool, writeBytes int64) error {
rl.metrics.currentBlocked.Inc(1)
defer rl.metrics.currentBlocked.Dec(1)
r := newWaitRequest(writeBytes)
r := newWaitRequest(isWrite, writeBytes)
defer putWaitRequest(r)
if err := rl.qp.Acquire(ctx, r); err != nil {
return err
Expand All @@ -130,11 +141,12 @@ func (rl *limiter) updateLimits(limits LimitConfigs) {
// tokenBuckets is the implementation of Resource which remains in the quotapool
// for a limiter.
type tokenBuckets struct {
clock timeutil.TimeSource
lastUpdated time.Time
requests tokenBucket
readBytes tokenBucket
writeBytes tokenBucket
clock timeutil.TimeSource
lastUpdated time.Time
readRequests tokenBucket
writeRequests tokenBucket
readBytes tokenBucket
writeBytes tokenBucket
}

var _ quotapool.Resource = (*tokenBuckets)(nil)
Expand All @@ -146,7 +158,8 @@ func (rb *tokenBuckets) update() {
// TODO(ajwerner): Consider instituting a minimum update frequency to avoid
// spinning too fast on timers for tons of tiny allocations at a fast rate.
if since := now.Sub(rb.lastUpdated); since > 0 {
rb.requests.update(since)
rb.readRequests.update(since)
rb.writeRequests.update(since)
rb.readBytes.update(since)
rb.writeBytes.update(since)
rb.lastUpdated = now
Expand All @@ -166,14 +179,16 @@ func (rb *tokenBuckets) check(req *waitRequest) (fulfilled bool, tryAgainAfter t
}
}
}
check(&rb.requests, req.requests)
check(&rb.readRequests, req.readRequests)
check(&rb.writeRequests, req.writeRequests)
check(&rb.readBytes, req.readBytes)
check(&rb.writeBytes, req.writeBytes)
return fulfilled, tryAgainAfter
}

func (rb *tokenBuckets) subtract(req *waitRequest) {
rb.requests.tokens -= float64(req.requests)
rb.readRequests.tokens -= float64(req.readRequests)
rb.writeRequests.tokens -= float64(req.writeRequests)
rb.readBytes.tokens -= float64(req.readBytes)
rb.writeBytes.tokens -= float64(req.writeBytes)
}
Expand All @@ -185,7 +200,8 @@ func (rb *tokenBuckets) Merge(val interface{}) (shouldNotify bool) {
// configuration.
rb.update()

rb.requests.setConf(toAdd.Requests)
rb.readRequests.setConf(toAdd.ReadRequests)
rb.writeRequests.setConf(toAdd.WriteRequests)
rb.readBytes.setConf(toAdd.ReadBytes)
rb.writeBytes.setConf(toAdd.WriteBytes)
return true
Expand Down Expand Up @@ -264,9 +280,10 @@ func (t *tokenBucket) clampTokens() {

// waitRequest is used to wait for adequate resources in the tokenBuckets.
type waitRequest struct {
requests int64
writeBytes int64
readBytes int64
readRequests int64
writeRequests int64
writeBytes int64
readBytes int64
}

var waitRequestSyncPool = sync.Pool{
Expand All @@ -275,12 +292,18 @@ var waitRequestSyncPool = sync.Pool{

// newWaitRequest allocates a waitRequest from the sync.Pool.
// It should be returned with putWaitRequest.
func newWaitRequest(writeBytes int64) *waitRequest {
func newWaitRequest(isWrite bool, writeBytes int64) *waitRequest {
r := waitRequestSyncPool.Get().(*waitRequest)
*r = waitRequest{
requests: 1,
readBytes: 1,
writeBytes: writeBytes,
readRequests: 0,
writeRequests: 0,
readBytes: 1,
writeBytes: writeBytes,
}
if isWrite {
r.writeRequests = 1
} else {
r.readRequests = 1
}
return r
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/kv/kvserver/tenantrate/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func TestCloser(t *testing.T) {
limiter := factory.GetTenant(tenant, closer)
ctx := context.Background()
// First Wait call will not block.
require.NoError(t, limiter.Wait(ctx, 1))
require.NoError(t, limiter.Wait(ctx, false, 1))
errCh := make(chan error, 1)
go func() { errCh <- limiter.Wait(ctx, 1<<30) }()
go func() { errCh <- limiter.Wait(ctx, false, 1<<30) }()
testutils.SucceedsSoon(t, func() error {
if timers := timeSource.Timers(); len(timers) != 1 {
return errors.Errorf("expected 1 timer, found %d", len(timers))
Expand Down Expand Up @@ -84,6 +84,7 @@ type launchState struct {
tenantID roachpb.TenantID
ctx context.Context
cancel context.CancelFunc
isWrite bool
writeBytes int64
reserveCh chan error
}
Expand Down Expand Up @@ -202,6 +203,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string {
var cmds []struct {
ID string
Tenant uint64
IsWrite bool
WriteBytes int64
}
if err := yaml.UnmarshalStrict([]byte(d.Input), &cmds); err != nil {
Expand All @@ -213,6 +215,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string {
s.tenantID = roachpb.MakeTenantID(cmd.Tenant)
s.ctx, s.cancel = context.WithCancel(context.Background())
s.reserveCh = make(chan error, 1)
s.isWrite = cmd.IsWrite
s.writeBytes = cmd.WriteBytes
ts.running[s.id] = &s
lims := ts.tenants[s.tenantID]
Expand All @@ -221,7 +224,7 @@ func (ts *testState) launch(t *testing.T, d *datadriven.TestData) string {
}
go func() {
// We'll not worry about ever releasing tenant Limiters.
s.reserveCh <- lims[0].Wait(s.ctx, s.writeBytes)
s.reserveCh <- lims[0].Wait(s.ctx, s.isWrite, s.writeBytes)
}()
}
return ts.FormatRunning()
Expand Down Expand Up @@ -337,12 +340,15 @@ func (ts *testState) recordRead(t *testing.T, d *datadriven.TestData) string {
// kv_tenant_rate_limit_read_bytes_admitted 0
// kv_tenant_rate_limit_read_bytes_admitted{tenant_id="2"} 0
// kv_tenant_rate_limit_read_bytes_admitted{tenant_id="system"} 100
// kv_tenant_rate_limit_requests_admitted 0
// kv_tenant_rate_limit_requests_admitted{tenant_id="2"} 0
// kv_tenant_rate_limit_requests_admitted{tenant_id="system"} 0
// kv_tenant_rate_limit_read_requests_admitted 0
// kv_tenant_rate_limit_read_requests_admitted{tenant_id="2"} 0
// kv_tenant_rate_limit_read_requests_admitted{tenant_id="system"} 0
// kv_tenant_rate_limit_write_bytes_admitted 50
// kv_tenant_rate_limit_write_bytes_admitted{tenant_id="2"} 50
// kv_tenant_rate_limit_write_bytes_admitted{tenant_id="system"} 0
// kv_tenant_rate_limit_write_requests_admitted 0
// kv_tenant_rate_limit_write_requests_admitted{tenant_id="2"} 0
// kv_tenant_rate_limit_write_requests_admitted{tenant_id="system"} 0
//
// Or with a regular expression:
//
Expand Down
Loading

0 comments on commit e43acc9

Please sign in to comment.