Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113719: kvstreamer: adjust recently added tracing r=yuzefovich a=yuzefovich

This commit makes some minor adjustments to the recently added tracing in the streamer:
- `singleRangeBatch.String()` now has a more sane behavior when it contains many requests (previously, we would truncate the requests but would keep everything else, now we include the full information only about the first 5 and the last 5 "sub-requests")
- that method also no longer includes `r.reqsKeys` because this field is redundant with `r.reqs` and is likely to be empty anyway
- the "exit" message in `GetResults` now specifies the number of results and the error if present
- redundant "incomplete Get" and "incomplete Scan" messages are removed (they add very little additional information - the number of incomplete Gets is already printed elsewhere, plus the KV layer already specifies whether each Get / Scan request resulted in a "resume span" meaning it was incomplete).

Epic: None

Release note: None

113834: workload/tpcc: support Read Committed isolation r=nvanbenschoten a=nvanbenschoten

Closes #100176.

This PR consists of a series of commits which together add support for Read Committed isolation to the TPC-C workload and then use it to add new roachtest variants.

See individual commits, including an interesting change to explicit row-level locking in TPC-C transactions to avoid concurrency anomalies.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Nov 15, 2023
3 parents c15e481 + 1055bb9 + b9bdc43 commit e724061
Show file tree
Hide file tree
Showing 16 changed files with 139 additions and 54 deletions.
62 changes: 54 additions & 8 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ type tpccOptions struct {
// is running, but that feels like jamming too much into the tpcc setup.
Start func(context.Context, test.Test, cluster.Cluster)
// SkipPostRunCheck, if set, skips post TPC-C run checks.
SkipPostRunCheck bool
SkipPostRunCheck bool
// ExpensiveChecks, if set, runs expensive post TPC-C run checks.
ExpensiveChecks bool
DisableDefaultScheduledBackup bool
}

Expand Down Expand Up @@ -164,7 +166,8 @@ func setupTPCC(
return
}

require.NoError(t, WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), crdbNodes[0])))
require.NoError(t, enableIsolationLevels(ctx, t, db))
require.NoError(t, WaitFor3XReplication(ctx, t, db))

estimatedSetupTimeStr := ""
if opts.EstimatedSetupTime != 0 {
Expand Down Expand Up @@ -285,7 +288,8 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio

if !opts.SkipPostRunCheck {
c.Run(ctx, workloadNode, fmt.Sprintf(
"./cockroach workload check tpcc --warehouses=%d {pgurl:1}", opts.Warehouses))
"./cockroach workload check tpcc --warehouses=%d --expensive-checks=%t {pgurl:1}",
opts.Warehouses, opts.ExpensiveChecks))
}

// Check no errors from metrics.
Expand Down Expand Up @@ -470,8 +474,30 @@ func registerTPCC(r registry.Registry) {
})
},
})
mixedHeadroomSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs())
r.Add(registry.TestSpec{
Name: "tpcc/headroom/isolation-level=read-committed/" + headroomSpec.String(),
Owner: registry.OwnerTestEng,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
Tags: registry.Tags(`default`),
Cluster: headroomSpec,
Timeout: 4 * time.Hour,
EncryptionSupport: registry.EncryptionMetamorphic,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
maxWarehouses := maxSupportedTPCCWarehouses(*t.BuildVersion(), c.Cloud(), c.Spec())
headroomWarehouses := int(float64(maxWarehouses) * 0.7)
t.L().Printf("computed headroom warehouses of %d\n", headroomWarehouses)
runTPCC(ctx, t, c, tpccOptions{
Warehouses: headroomWarehouses,
ExtraRunArgs: "--isolation-level=read_committed",
Duration: 120 * time.Minute,
SetupType: usingImport,
})
},
})

mixedHeadroomSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs())
r.Add(registry.TestSpec{
// mixed-headroom is similar to w=headroom, but with an additional
// node and on a mixed version cluster which runs its long-running
Expand Down Expand Up @@ -502,13 +528,33 @@ func registerTPCC(r registry.Registry) {
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCC(ctx, t, c, tpccOptions{
Warehouses: 1,
Duration: 10 * time.Minute,
ExtraRunArgs: "--wait=false",
SetupType: usingImport,
Warehouses: 1,
Duration: 10 * time.Minute,
ExtraRunArgs: "--wait=false",
SetupType: usingImport,
ExpensiveChecks: true,
})
},
})
r.Add(registry.TestSpec{
Name: "tpcc-nowait/isolation-level=read-committed/nodes=3/w=1",
Owner: registry.OwnerTestEng,
Cluster: r.MakeClusterSpec(4, spec.CPU(16)),
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
EncryptionSupport: registry.EncryptionMetamorphic,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCC(ctx, t, c, tpccOptions{
Warehouses: 1,
Duration: 10 * time.Minute,
ExtraRunArgs: "--wait=false --isolation-level=read_committed",
SetupType: usingImport,
ExpensiveChecks: true,
})
},
})

r.Add(registry.TestSpec{
Name: "weekly/tpcc/headroom",
Owner: registry.OwnerTestEng,
Expand Down
32 changes: 28 additions & 4 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,36 @@ func (r singleRangeBatch) subPriority() int32 {
}

// String implements fmt.Stringer.
//
// Note that the implementation of this method doesn't include r.reqsKeys into
// the output because that field is redundant with r.reqs and is likely to be
// nil'ed out anyway.
func (r singleRangeBatch) String() string {
// We try to limit the size based on the number of requests ourselves, so
// this is just a sane upper-bound.
maxBytes := 10 << 10 /* 10KiB */
if len(r.reqs) > 10 {
// To keep the size of this log message relatively small, if we have
// more than 10 requests, then we only include the information about the
// first 5 and the last 5 requests.
headEndIdx := 5
tailStartIdx := len(r.reqs) - 5
subIdx := "[]"
if len(r.subRequestIdx) > 0 {
subIdx = fmt.Sprintf("%v...%v", r.subRequestIdx[:headEndIdx], r.subRequestIdx[tailStartIdx:])
}
return fmt.Sprintf(
"{reqs:%v...%v pos:%v...%v subIdx:%s start:%v gets:%v reserved:%v overhead:%v minTarget:%v}",
kvpb.TruncatedRequestsString(r.reqs[:headEndIdx], maxBytes),
kvpb.TruncatedRequestsString(r.reqs[tailStartIdx:], maxBytes),
r.positions[:headEndIdx], r.positions[tailStartIdx:],
subIdx, r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor, r.minTargetBytes,
)
}
return fmt.Sprintf(
"{reqs:%s keys:%v pos:%v subIdx:%v start:%v gets:%v reserved:%v overhead:%v minTarget:%v}",
kvpb.TruncatedRequestsString(r.reqs, 1024), r.reqsKeys, r.positions,
r.subRequestIdx, r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor,
r.minTargetBytes,
"{reqs:%v pos:%v subIdx:%v start:%v gets:%v reserved:%v overhead:%v minTarget:%v}",
kvpb.TruncatedRequestsString(r.reqs, maxBytes), r.positions, r.subRequestIdx,
r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor, r.minTargetBytes,
)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type resultsBuffer interface {
//
// It is assumed that the budget's mutex is already being held.
//
// doneAddingLocked returns the naumber of results that have been added but
// doneAddingLocked returns the number of results that have been added but
// not yet returned to the client, and whether the client goroutine was woken.
doneAddingLocked(context.Context) (int, bool)

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,9 +780,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
// returned once all enqueued requests have been responded to.
//
// Calling GetResults() invalidates the results returned on the previous call.
func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) {
func (s *Streamer) GetResults(ctx context.Context) (retResults []Result, retErr error) {
log.VEvent(ctx, 2, "GetResults")
defer log.VEvent(ctx, 2, "exiting GetResults")
defer func() {
log.VEventf(ctx, 2, "exiting GetResults (%d results, err=%v)", len(retResults), retErr)
}()
for {
results, allComplete, err := s.results.get(ctx)
if len(results) > 0 || allComplete || err != nil {
Expand Down Expand Up @@ -1690,7 +1692,6 @@ func processSingleRangeResults(
get := response
if get.ResumeSpan != nil {
// This Get wasn't completed.
log.VEvent(ctx, 2, "incomplete Get")
continue
}
// This Get was completed.
Expand Down Expand Up @@ -1723,7 +1724,6 @@ func processSingleRangeResults(
// multiple ranges and the last range has no data in it - we
// want to be able to set scanComplete field on such an empty
// Result).
log.VEvent(ctx, 2, "incomplete Scan")
continue
}
result := Result{
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/bench/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ var queries = [...]benchQuery{
WHERE no_w_id = $1 AND no_d_id = $2
ORDER BY no_o_id ASC
LIMIT 1
FOR UPDATE
`,
args: []interface{}{10, 100},
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/opt/memo/testdata/stats_quality/tpcc
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,14 @@ SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_05
FROM stock
WHERE (s_i_id, s_w_id) IN ((1000, 4), (900, 4), (1100, 4), (1500, 4), (1400, 4))
ORDER BY s_i_id
FOR UPDATE
----
----
project
├── save-table-name: new_order_04_project_1
├── columns: s_quantity:3(int) s_ytd:14(int) s_order_cnt:15(int) s_remote_cnt:16(int) s_data:17(varchar) s_dist_05:8(char) [hidden: s_i_id:1(int!null)]
├── cardinality: [0 - 5]
├── volatile
├── stats: [rows=4.548552, distinct(1)=4.54855, null(1)=0, distinct(3)=4.43676, null(3)=0, distinct(8)=4.38572, null(8)=0, distinct(14)=0.989418, null(14)=0, distinct(15)=0.989418, null(15)=0, distinct(16)=0.989418, null(16)=0, distinct(17)=4.54833, null(17)=0]
├── key: (1)
├── fd: (1)-->(3,8,14-17)
Expand All @@ -165,7 +167,9 @@ project
│ ├── [/4/1100 - /4/1100]
│ ├── [/4/1400 - /4/1400]
│ └── [/4/1500 - /4/1500]
├── locking: for-update
├── cardinality: [0 - 5]
├── volatile
├── stats: [rows=4.548552, distinct(1)=4.54855, null(1)=0, distinct(2)=1, null(2)=0, distinct(3)=4.43676, null(3)=0, distinct(8)=4.38572, null(8)=0, distinct(14)=0.989418, null(14)=0, distinct(15)=0.989418, null(15)=0, distinct(16)=0.989418, null(16)=0, distinct(17)=4.54833, null(17)=0, distinct(1,2)=4.54855, null(1,2)=0]
│ histogram(1)= 0 0.966 0 0.966 0 0.966 0 0.82528 0 0.82528
│ <--- 900 --- 1000 --- 1100 --- 1400 ---- 1500 -
Expand Down Expand Up @@ -489,12 +493,14 @@ FROM new_order
WHERE no_w_id = 7 AND no_d_id = 6
ORDER BY no_o_id ASC
LIMIT 1
FOR UPDATE
----
----
project
├── save-table-name: delivery_01_project_1
├── columns: no_o_id:1(int!null)
├── cardinality: [0 - 1]
├── volatile
├── stats: [rows=1, distinct(1)=0.999675, null(1)=0]
├── key: ()
├── fd: ()-->(1)
Expand All @@ -503,6 +509,8 @@ project
├── columns: no_o_id:1(int!null) no_d_id:2(int!null) no_w_id:3(int!null)
├── constraint: /3/2/1: [/7/6 - /7/6]
├── limit: 1
├── locking: for-update
├── volatile
├── stats: [rows=1, distinct(1)=0.999675, null(1)=0, distinct(2)=0.632308, null(2)=0, distinct(3)=0.632308, null(3)=0]
├── key: ()
└── fd: ()-->(1-3)
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/opt/xform/testdata/external/tpcc
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_05
FROM stock
WHERE (s_i_id, s_w_id) IN ((1000, 4), (900, 4), (1100, 4), (1500, 4), (1400, 4))
ORDER BY s_i_id
FOR UPDATE
----
project
├── columns: s_quantity:3 s_ytd:14 s_order_cnt:15 s_remote_cnt:16 s_data:17 s_dist_05:8 [hidden: s_i_id:1!null]
├── cardinality: [0 - 5]
├── volatile
├── key: (1)
├── fd: (1)-->(3,8,14-17)
├── ordering: +1
Expand All @@ -134,7 +136,9 @@ project
│ ├── [/4/1100 - /4/1100]
│ ├── [/4/1400 - /4/1400]
│ └── [/4/1500 - /4/1500]
├── locking: for-update
├── cardinality: [0 - 5]
├── volatile
├── key: (1)
├── fd: ()-->(2), (1)-->(3,8,14-17)
└── ordering: +1 opt(2) [actual: +1]
Expand Down Expand Up @@ -801,16 +805,20 @@ FROM new_order
WHERE no_w_id = 10 AND no_d_id = 100
ORDER BY no_o_id ASC
LIMIT 1
FOR UPDATE
----
project
├── columns: no_o_id:1!null
├── cardinality: [0 - 1]
├── volatile
├── key: ()
├── fd: ()-->(1)
└── scan new_order
├── columns: no_o_id:1!null no_d_id:2!null no_w_id:3!null
├── constraint: /3/2/1: [/10/100 - /10/100]
├── limit: 1
├── locking: for-update
├── volatile
├── key: ()
└── fd: ()-->(1-3)

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt/xform/testdata/external/tpcc-later-stats
Original file line number Diff line number Diff line change
Expand Up @@ -804,16 +804,20 @@ FROM new_order
WHERE no_w_id = 10 AND no_d_id = 100
ORDER BY no_o_id ASC
LIMIT 1
FOR UPDATE
----
project
├── columns: no_o_id:1!null
├── cardinality: [0 - 1]
├── volatile
├── key: ()
├── fd: ()-->(1)
└── scan new_order
├── columns: no_o_id:1!null no_d_id:2!null no_w_id:3!null
├── constraint: /3/2/1: [/10/100 - /10/100]
├── limit: 1
├── locking: for-update
├── volatile
├── key: ()
└── fd: ()-->(1-3)

Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/opt/xform/testdata/external/tpcc-no-stats
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,12 @@ SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_05
FROM stock
WHERE (s_i_id, s_w_id) IN ((1000, 4), (900, 4), (1100, 4), (1500, 4), (1400, 4))
ORDER BY s_i_id
FOR UPDATE
----
project
├── columns: s_quantity:3 s_ytd:14 s_order_cnt:15 s_remote_cnt:16 s_data:17 s_dist_05:8 [hidden: s_i_id:1!null]
├── cardinality: [0 - 5]
├── volatile
├── key: (1)
├── fd: (1)-->(3,8,14-17)
├── ordering: +1
Expand All @@ -131,7 +133,9 @@ project
│ ├── [/4/1100 - /4/1100]
│ ├── [/4/1400 - /4/1400]
│ └── [/4/1500 - /4/1500]
├── locking: for-update
├── cardinality: [0 - 5]
├── volatile
├── key: (1)
├── fd: ()-->(2), (1)-->(3,8,14-17)
└── ordering: +1 opt(2) [actual: +1]
Expand Down Expand Up @@ -802,16 +806,20 @@ FROM new_order
WHERE no_w_id = 10 AND no_d_id = 100
ORDER BY no_o_id ASC
LIMIT 1
FOR UPDATE
----
project
├── columns: no_o_id:1!null
├── cardinality: [0 - 1]
├── volatile
├── key: ()
├── fd: ()-->(1)
└── scan new_order
├── columns: no_o_id:1!null no_d_id:2!null no_w_id:3!null
├── constraint: /3/2/1: [/10/100 - /10/100]
├── limit: 1
├── locking: for-update
├── volatile
├── key: ()
└── fd: ()-->(1-3)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func BenchmarkParse(b *testing.B) {
},
{
"tpcc-delivery",
`SELECT no_o_id FROM new_order WHERE no_w_id = $1 AND no_d_id = $2 ORDER BY no_o_id ASC LIMIT 1`,
`SELECT no_o_id FROM new_order WHERE no_w_id = $1 AND no_d_id = $2 ORDER BY no_o_id ASC LIMIT 1 FOR UPDATE`,
},
{
"account",
Expand Down
5 changes: 3 additions & 2 deletions pkg/workload/tpcc/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func createDelivery(
FROM new_order
WHERE no_w_id = $1 AND no_d_id = $2
ORDER BY no_o_id ASC
LIMIT 1`,
LIMIT 1
FOR UPDATE`,
)

del.sumAmount = del.sr.Define(`
Expand All @@ -87,7 +88,7 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
olDeliveryD := timeutil.Now()

err := crdbpgx.ExecuteTx(
ctx, del.mcp.Get(), del.config.txOpts,
ctx, del.mcp.Get(), pgx.TxOptions{},
func(tx pgx.Tx) error {
// 2.7.4.2. For each district:
dIDoIDPairs := make(map[int]int)
Expand Down
5 changes: 3 additions & 2 deletions pkg/workload/tpcc/new_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {
d.oEntryD = timeutil.Now()

err := crdbpgx.ExecuteTx(
ctx, n.mcp.Get(), n.config.txOpts,
ctx, n.mcp.Get(), pgx.TxOptions{},
func(tx pgx.Tx) error {
// Select the district tax rate and next available order number, bumping it.
var dNextOID int
Expand Down Expand Up @@ -302,7 +302,8 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {
SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_%02[1]d
FROM stock
WHERE (s_i_id, s_w_id) IN (%[2]s)
ORDER BY s_i_id`,
ORDER BY s_i_id
FOR UPDATE`,
d.dID, strings.Join(stockIDs, ", "),
),
)
Expand Down
Loading

0 comments on commit e724061

Please sign in to comment.