diff --git a/pkg/cmd/roachtest/tests/tpcc.go b/pkg/cmd/roachtest/tests/tpcc.go index 9ddcc0640918..366193dcaf66 100644 --- a/pkg/cmd/roachtest/tests/tpcc.go +++ b/pkg/cmd/roachtest/tests/tpcc.go @@ -96,7 +96,9 @@ type tpccOptions struct { // is running, but that feels like jamming too much into the tpcc setup. Start func(context.Context, test.Test, cluster.Cluster) // SkipPostRunCheck, if set, skips post TPC-C run checks. - SkipPostRunCheck bool + SkipPostRunCheck bool + // ExpensiveChecks, if set, runs expensive post TPC-C run checks. + ExpensiveChecks bool DisableDefaultScheduledBackup bool } @@ -164,7 +166,8 @@ func setupTPCC( return } - require.NoError(t, WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), crdbNodes[0]))) + require.NoError(t, enableIsolationLevels(ctx, t, db)) + require.NoError(t, WaitFor3XReplication(ctx, t, db)) estimatedSetupTimeStr := "" if opts.EstimatedSetupTime != 0 { @@ -285,7 +288,8 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio if !opts.SkipPostRunCheck { c.Run(ctx, workloadNode, fmt.Sprintf( - "./cockroach workload check tpcc --warehouses=%d {pgurl:1}", opts.Warehouses)) + "./cockroach workload check tpcc --warehouses=%d --expensive-checks=%t {pgurl:1}", + opts.Warehouses, opts.ExpensiveChecks)) } // Check no errors from metrics. @@ -470,8 +474,30 @@ func registerTPCC(r registry.Registry) { }) }, }) - mixedHeadroomSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs()) + r.Add(registry.TestSpec{ + Name: "tpcc/headroom/isolation-level=read-committed/" + headroomSpec.String(), + Owner: registry.OwnerTestEng, + CompatibleClouds: registry.AllExceptAWS, + Suites: registry.Suites(registry.Nightly), + Tags: registry.Tags(`default`), + Cluster: headroomSpec, + Timeout: 4 * time.Hour, + EncryptionSupport: registry.EncryptionMetamorphic, + Leases: registry.MetamorphicLeases, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + maxWarehouses := maxSupportedTPCCWarehouses(*t.BuildVersion(), c.Cloud(), c.Spec()) + headroomWarehouses := int(float64(maxWarehouses) * 0.7) + t.L().Printf("computed headroom warehouses of %d\n", headroomWarehouses) + runTPCC(ctx, t, c, tpccOptions{ + Warehouses: headroomWarehouses, + ExtraRunArgs: "--isolation-level=read_committed", + Duration: 120 * time.Minute, + SetupType: usingImport, + }) + }, + }) + mixedHeadroomSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs()) r.Add(registry.TestSpec{ // mixed-headroom is similar to w=headroom, but with an additional // node and on a mixed version cluster which runs its long-running @@ -502,13 +528,33 @@ func registerTPCC(r registry.Registry) { Leases: registry.MetamorphicLeases, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { runTPCC(ctx, t, c, tpccOptions{ - Warehouses: 1, - Duration: 10 * time.Minute, - ExtraRunArgs: "--wait=false", - SetupType: usingImport, + Warehouses: 1, + Duration: 10 * time.Minute, + ExtraRunArgs: "--wait=false", + SetupType: usingImport, + ExpensiveChecks: true, }) }, }) + r.Add(registry.TestSpec{ + Name: "tpcc-nowait/isolation-level=read-committed/nodes=3/w=1", + Owner: registry.OwnerTestEng, + Cluster: r.MakeClusterSpec(4, spec.CPU(16)), + CompatibleClouds: registry.AllExceptAWS, + Suites: registry.Suites(registry.Nightly), + EncryptionSupport: registry.EncryptionMetamorphic, + Leases: registry.MetamorphicLeases, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runTPCC(ctx, t, c, tpccOptions{ + Warehouses: 1, + Duration: 10 * time.Minute, + ExtraRunArgs: "--wait=false --isolation-level=read_committed", + SetupType: usingImport, + ExpensiveChecks: true, + }) + }, + }) + r.Add(registry.TestSpec{ Name: "weekly/tpcc/headroom", Owner: registry.OwnerTestEng, diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider.go b/pkg/kv/kvclient/kvstreamer/requests_provider.go index 2592215eb551..dd8cc62560e8 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider.go @@ -146,12 +146,36 @@ func (r singleRangeBatch) subPriority() int32 { } // String implements fmt.Stringer. +// +// Note that the implementation of this method doesn't include r.reqsKeys into +// the output because that field is redundant with r.reqs and is likely to be +// nil'ed out anyway. func (r singleRangeBatch) String() string { + // We try to limit the size based on the number of requests ourselves, so + // this is just a sane upper-bound. + maxBytes := 10 << 10 /* 10KiB */ + if len(r.reqs) > 10 { + // To keep the size of this log message relatively small, if we have + // more than 10 requests, then we only include the information about the + // first 5 and the last 5 requests. + headEndIdx := 5 + tailStartIdx := len(r.reqs) - 5 + subIdx := "[]" + if len(r.subRequestIdx) > 0 { + subIdx = fmt.Sprintf("%v...%v", r.subRequestIdx[:headEndIdx], r.subRequestIdx[tailStartIdx:]) + } + return fmt.Sprintf( + "{reqs:%v...%v pos:%v...%v subIdx:%s start:%v gets:%v reserved:%v overhead:%v minTarget:%v}", + kvpb.TruncatedRequestsString(r.reqs[:headEndIdx], maxBytes), + kvpb.TruncatedRequestsString(r.reqs[tailStartIdx:], maxBytes), + r.positions[:headEndIdx], r.positions[tailStartIdx:], + subIdx, r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor, r.minTargetBytes, + ) + } return fmt.Sprintf( - "{reqs:%s keys:%v pos:%v subIdx:%v start:%v gets:%v reserved:%v overhead:%v minTarget:%v}", - kvpb.TruncatedRequestsString(r.reqs, 1024), r.reqsKeys, r.positions, - r.subRequestIdx, r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor, - r.minTargetBytes, + "{reqs:%v pos:%v subIdx:%v start:%v gets:%v reserved:%v overhead:%v minTarget:%v}", + kvpb.TruncatedRequestsString(r.reqs, maxBytes), r.positions, r.subRequestIdx, + r.isScanStarted, r.numGetsInReqs, r.reqsReservedBytes, r.overheadAccountedFor, r.minTargetBytes, ) } diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer.go b/pkg/kv/kvclient/kvstreamer/results_buffer.go index a6cec8c8d6a8..88b57e389d0b 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer.go @@ -98,7 +98,7 @@ type resultsBuffer interface { // // It is assumed that the budget's mutex is already being held. // - // doneAddingLocked returns the naumber of results that have been added but + // doneAddingLocked returns the number of results that have been added but // not yet returned to the client, and whether the client goroutine was woken. doneAddingLocked(context.Context) (int, bool) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index ef82ce64127c..2d1496924c79 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -780,9 +780,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr // returned once all enqueued requests have been responded to. // // Calling GetResults() invalidates the results returned on the previous call. -func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) { +func (s *Streamer) GetResults(ctx context.Context) (retResults []Result, retErr error) { log.VEvent(ctx, 2, "GetResults") - defer log.VEvent(ctx, 2, "exiting GetResults") + defer func() { + log.VEventf(ctx, 2, "exiting GetResults (%d results, err=%v)", len(retResults), retErr) + }() for { results, allComplete, err := s.results.get(ctx) if len(results) > 0 || allComplete || err != nil { @@ -1690,7 +1692,6 @@ func processSingleRangeResults( get := response if get.ResumeSpan != nil { // This Get wasn't completed. - log.VEvent(ctx, 2, "incomplete Get") continue } // This Get was completed. @@ -1723,7 +1724,6 @@ func processSingleRangeResults( // multiple ranges and the last range has no data in it - we // want to be able to set scanComplete field on such an empty // Result). - log.VEvent(ctx, 2, "incomplete Scan") continue } result := Result{ diff --git a/pkg/sql/opt/bench/bench_test.go b/pkg/sql/opt/bench/bench_test.go index 3e22965487e8..a5517487eb54 100644 --- a/pkg/sql/opt/bench/bench_test.go +++ b/pkg/sql/opt/bench/bench_test.go @@ -372,6 +372,7 @@ var queries = [...]benchQuery{ WHERE no_w_id = $1 AND no_d_id = $2 ORDER BY no_o_id ASC LIMIT 1 + FOR UPDATE `, args: []interface{}{10, 100}, }, diff --git a/pkg/sql/opt/memo/testdata/stats_quality/tpcc b/pkg/sql/opt/memo/testdata/stats_quality/tpcc index fb24fb02df82..30b1ea0420ba 100644 --- a/pkg/sql/opt/memo/testdata/stats_quality/tpcc +++ b/pkg/sql/opt/memo/testdata/stats_quality/tpcc @@ -146,12 +146,14 @@ SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_05 FROM stock WHERE (s_i_id, s_w_id) IN ((1000, 4), (900, 4), (1100, 4), (1500, 4), (1400, 4)) ORDER BY s_i_id +FOR UPDATE ---- ---- project ├── save-table-name: new_order_04_project_1 ├── columns: s_quantity:3(int) s_ytd:14(int) s_order_cnt:15(int) s_remote_cnt:16(int) s_data:17(varchar) s_dist_05:8(char) [hidden: s_i_id:1(int!null)] ├── cardinality: [0 - 5] + ├── volatile ├── stats: [rows=4.548552, distinct(1)=4.54855, null(1)=0, distinct(3)=4.43676, null(3)=0, distinct(8)=4.38572, null(8)=0, distinct(14)=0.989418, null(14)=0, distinct(15)=0.989418, null(15)=0, distinct(16)=0.989418, null(16)=0, distinct(17)=4.54833, null(17)=0] ├── key: (1) ├── fd: (1)-->(3,8,14-17) @@ -165,7 +167,9 @@ project │ ├── [/4/1100 - /4/1100] │ ├── [/4/1400 - /4/1400] │ └── [/4/1500 - /4/1500] + ├── locking: for-update ├── cardinality: [0 - 5] + ├── volatile ├── stats: [rows=4.548552, distinct(1)=4.54855, null(1)=0, distinct(2)=1, null(2)=0, distinct(3)=4.43676, null(3)=0, distinct(8)=4.38572, null(8)=0, distinct(14)=0.989418, null(14)=0, distinct(15)=0.989418, null(15)=0, distinct(16)=0.989418, null(16)=0, distinct(17)=4.54833, null(17)=0, distinct(1,2)=4.54855, null(1,2)=0] │ histogram(1)= 0 0.966 0 0.966 0 0.966 0 0.82528 0 0.82528 │ <--- 900 --- 1000 --- 1100 --- 1400 ---- 1500 - @@ -489,12 +493,14 @@ FROM new_order WHERE no_w_id = 7 AND no_d_id = 6 ORDER BY no_o_id ASC LIMIT 1 +FOR UPDATE ---- ---- project ├── save-table-name: delivery_01_project_1 ├── columns: no_o_id:1(int!null) ├── cardinality: [0 - 1] + ├── volatile ├── stats: [rows=1, distinct(1)=0.999675, null(1)=0] ├── key: () ├── fd: ()-->(1) @@ -503,6 +509,8 @@ project ├── columns: no_o_id:1(int!null) no_d_id:2(int!null) no_w_id:3(int!null) ├── constraint: /3/2/1: [/7/6 - /7/6] ├── limit: 1 + ├── locking: for-update + ├── volatile ├── stats: [rows=1, distinct(1)=0.999675, null(1)=0, distinct(2)=0.632308, null(2)=0, distinct(3)=0.632308, null(3)=0] ├── key: () └── fd: ()-->(1-3) diff --git a/pkg/sql/opt/xform/testdata/external/tpcc b/pkg/sql/opt/xform/testdata/external/tpcc index a2cf4ab1f5dd..62f714bce82d 100644 --- a/pkg/sql/opt/xform/testdata/external/tpcc +++ b/pkg/sql/opt/xform/testdata/external/tpcc @@ -119,10 +119,12 @@ SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_05 FROM stock WHERE (s_i_id, s_w_id) IN ((1000, 4), (900, 4), (1100, 4), (1500, 4), (1400, 4)) ORDER BY s_i_id +FOR UPDATE ---- project ├── columns: s_quantity:3 s_ytd:14 s_order_cnt:15 s_remote_cnt:16 s_data:17 s_dist_05:8 [hidden: s_i_id:1!null] ├── cardinality: [0 - 5] + ├── volatile ├── key: (1) ├── fd: (1)-->(3,8,14-17) ├── ordering: +1 @@ -134,7 +136,9 @@ project │ ├── [/4/1100 - /4/1100] │ ├── [/4/1400 - /4/1400] │ └── [/4/1500 - /4/1500] + ├── locking: for-update ├── cardinality: [0 - 5] + ├── volatile ├── key: (1) ├── fd: ()-->(2), (1)-->(3,8,14-17) └── ordering: +1 opt(2) [actual: +1] @@ -801,16 +805,20 @@ FROM new_order WHERE no_w_id = 10 AND no_d_id = 100 ORDER BY no_o_id ASC LIMIT 1 +FOR UPDATE ---- project ├── columns: no_o_id:1!null ├── cardinality: [0 - 1] + ├── volatile ├── key: () ├── fd: ()-->(1) └── scan new_order ├── columns: no_o_id:1!null no_d_id:2!null no_w_id:3!null ├── constraint: /3/2/1: [/10/100 - /10/100] ├── limit: 1 + ├── locking: for-update + ├── volatile ├── key: () └── fd: ()-->(1-3) diff --git a/pkg/sql/opt/xform/testdata/external/tpcc-later-stats b/pkg/sql/opt/xform/testdata/external/tpcc-later-stats index cef40abd28af..d7eb4296cf0b 100644 --- a/pkg/sql/opt/xform/testdata/external/tpcc-later-stats +++ b/pkg/sql/opt/xform/testdata/external/tpcc-later-stats @@ -804,16 +804,20 @@ FROM new_order WHERE no_w_id = 10 AND no_d_id = 100 ORDER BY no_o_id ASC LIMIT 1 +FOR UPDATE ---- project ├── columns: no_o_id:1!null ├── cardinality: [0 - 1] + ├── volatile ├── key: () ├── fd: ()-->(1) └── scan new_order ├── columns: no_o_id:1!null no_d_id:2!null no_w_id:3!null ├── constraint: /3/2/1: [/10/100 - /10/100] ├── limit: 1 + ├── locking: for-update + ├── volatile ├── key: () └── fd: ()-->(1-3) diff --git a/pkg/sql/opt/xform/testdata/external/tpcc-no-stats b/pkg/sql/opt/xform/testdata/external/tpcc-no-stats index 443ecae58ead..b46dabe9cbdb 100644 --- a/pkg/sql/opt/xform/testdata/external/tpcc-no-stats +++ b/pkg/sql/opt/xform/testdata/external/tpcc-no-stats @@ -116,10 +116,12 @@ SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_05 FROM stock WHERE (s_i_id, s_w_id) IN ((1000, 4), (900, 4), (1100, 4), (1500, 4), (1400, 4)) ORDER BY s_i_id +FOR UPDATE ---- project ├── columns: s_quantity:3 s_ytd:14 s_order_cnt:15 s_remote_cnt:16 s_data:17 s_dist_05:8 [hidden: s_i_id:1!null] ├── cardinality: [0 - 5] + ├── volatile ├── key: (1) ├── fd: (1)-->(3,8,14-17) ├── ordering: +1 @@ -131,7 +133,9 @@ project │ ├── [/4/1100 - /4/1100] │ ├── [/4/1400 - /4/1400] │ └── [/4/1500 - /4/1500] + ├── locking: for-update ├── cardinality: [0 - 5] + ├── volatile ├── key: (1) ├── fd: ()-->(2), (1)-->(3,8,14-17) └── ordering: +1 opt(2) [actual: +1] @@ -802,16 +806,20 @@ FROM new_order WHERE no_w_id = 10 AND no_d_id = 100 ORDER BY no_o_id ASC LIMIT 1 +FOR UPDATE ---- project ├── columns: no_o_id:1!null ├── cardinality: [0 - 1] + ├── volatile ├── key: () ├── fd: ()-->(1) └── scan new_order ├── columns: no_o_id:1!null no_d_id:2!null no_w_id:3!null ├── constraint: /3/2/1: [/10/100 - /10/100] ├── limit: 1 + ├── locking: for-update + ├── volatile ├── key: () └── fd: ()-->(1-3) diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 8e506e5fd24c..e06a68376e28 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -736,7 +736,7 @@ func BenchmarkParse(b *testing.B) { }, { "tpcc-delivery", - `SELECT no_o_id FROM new_order WHERE no_w_id = $1 AND no_d_id = $2 ORDER BY no_o_id ASC LIMIT 1`, + `SELECT no_o_id FROM new_order WHERE no_w_id = $1 AND no_d_id = $2 ORDER BY no_o_id ASC LIMIT 1 FOR UPDATE`, }, { "account", diff --git a/pkg/workload/tpcc/delivery.go b/pkg/workload/tpcc/delivery.go index f9eda6ce3ba1..36ce23dc852d 100644 --- a/pkg/workload/tpcc/delivery.go +++ b/pkg/workload/tpcc/delivery.go @@ -63,7 +63,8 @@ func createDelivery( FROM new_order WHERE no_w_id = $1 AND no_d_id = $2 ORDER BY no_o_id ASC - LIMIT 1`, + LIMIT 1 + FOR UPDATE`, ) del.sumAmount = del.sr.Define(` @@ -87,7 +88,7 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) { olDeliveryD := timeutil.Now() err := crdbpgx.ExecuteTx( - ctx, del.mcp.Get(), del.config.txOpts, + ctx, del.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { // 2.7.4.2. For each district: dIDoIDPairs := make(map[int]int) diff --git a/pkg/workload/tpcc/new_order.go b/pkg/workload/tpcc/new_order.go index 5152a2841a61..36dbde4b4381 100644 --- a/pkg/workload/tpcc/new_order.go +++ b/pkg/workload/tpcc/new_order.go @@ -212,7 +212,7 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) { d.oEntryD = timeutil.Now() err := crdbpgx.ExecuteTx( - ctx, n.mcp.Get(), n.config.txOpts, + ctx, n.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { // Select the district tax rate and next available order number, bumping it. var dNextOID int @@ -302,7 +302,8 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) { SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_%02[1]d FROM stock WHERE (s_i_id, s_w_id) IN (%[2]s) - ORDER BY s_i_id`, + ORDER BY s_i_id + FOR UPDATE`, d.dID, strings.Join(stockIDs, ", "), ), ) diff --git a/pkg/workload/tpcc/order_status.go b/pkg/workload/tpcc/order_status.go index d8c02745d7b1..54e1234c103a 100644 --- a/pkg/workload/tpcc/order_status.go +++ b/pkg/workload/tpcc/order_status.go @@ -134,7 +134,7 @@ func (o *orderStatus) run(ctx context.Context, wID int) (interface{}, error) { } if err := crdbpgx.ExecuteTx( - ctx, o.mcp.Get(), o.config.txOpts, + ctx, o.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { // 2.6.2.2 explains this entire transaction. diff --git a/pkg/workload/tpcc/payment.go b/pkg/workload/tpcc/payment.go index 38dfdca65cf5..a83ce3134c45 100644 --- a/pkg/workload/tpcc/payment.go +++ b/pkg/workload/tpcc/payment.go @@ -190,7 +190,7 @@ func (p *payment) run(ctx context.Context, wID int) (interface{}, error) { } if err := crdbpgx.ExecuteTx( - ctx, p.mcp.Get(), p.config.txOpts, + ctx, p.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { var wName, dName string // Update warehouse with payment diff --git a/pkg/workload/tpcc/stock_level.go b/pkg/workload/tpcc/stock_level.go index bd83e92de662..9e873423ff30 100644 --- a/pkg/workload/tpcc/stock_level.go +++ b/pkg/workload/tpcc/stock_level.go @@ -68,19 +68,15 @@ func createStockLevel( // Count the number of recently sold items that have a stock level below // the threshold. - // TODO(radu): we use count(DISTINCT s_i_id) because DISTINCT inside - // aggregates was not supported by the optimizer. This can be cleaned up. s.countRecentlySold = s.sr.Define(` - SELECT count(*) FROM ( - SELECT DISTINCT s_i_id - FROM order_line - JOIN stock - ON s_i_id=ol_i_id AND s_w_id=ol_w_id - WHERE ol_w_id = $1 - AND ol_d_id = $2 - AND ol_o_id BETWEEN $3 - 20 AND $3 - 1 - AND s_quantity < $4 - )`, + SELECT count(DISTINCT s_i_id) + FROM order_line + JOIN stock + ON s_w_id = $1 AND s_i_id = ol_i_id + WHERE ol_w_id = $1 + AND ol_d_id = $2 + AND ol_o_id BETWEEN $3 - 20 AND $3 - 1 + AND s_quantity < $4`, ) if err := s.sr.Init(ctx, "stock-level", mcp); err != nil { @@ -101,7 +97,7 @@ func (s *stockLevel) run(ctx context.Context, wID int) (interface{}, error) { } if err := crdbpgx.ExecuteTx( - ctx, s.mcp.Get(), s.config.txOpts, + ctx, s.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { var dNextOID int if err := s.selectDNextOID.QueryRowTx( diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 14a90c75aa7b..3e1262dfcc56 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -14,7 +14,6 @@ import ( "context" gosql "database/sql" "fmt" - "net/url" "strconv" "strings" "sync" @@ -43,8 +42,8 @@ type tpcc struct { activeWarehouses int nowString []byte numConns int - - idleConns int + idleConns int + isoLevel string // Used in non-uniform random data generation. cLoad is the value of C at load // time. cCustomerID is the value of C for the customer id generator. cItemID @@ -88,10 +87,6 @@ type tpcc struct { // testing purposes. localWarehouses bool - usePostgres bool - serializable bool - txOpts pgx.TxOptions - expensiveChecks bool replicateStaticColumns bool @@ -170,12 +165,12 @@ var tpccMeta = workload.Meta{ `zones`: {RuntimeOnly: true}, `active-warehouses`: {RuntimeOnly: true}, `scatter`: {RuntimeOnly: true}, - `serializable`: {RuntimeOnly: true}, `split`: {RuntimeOnly: true}, `wait`: {RuntimeOnly: true}, `workers`: {RuntimeOnly: true}, `conns`: {RuntimeOnly: true}, `idle-conns`: {RuntimeOnly: true}, + `isolation-level`: {RuntimeOnly: true}, `expensive-checks`: {RuntimeOnly: true, CheckConsistencyOnly: true}, `local-warehouses`: {RuntimeOnly: true}, `regions`: {RuntimeOnly: true}, @@ -203,6 +198,7 @@ var tpccMeta = workload.Meta{ numConnsPerWarehouse, )) g.flags.IntVar(&g.idleConns, `idle-conns`, 0, `Number of idle connections. Defaults to 0`) + g.flags.StringVar(&g.isoLevel, `isolation-level`, ``, `Isolation level to run workload transactions under [serializable, snapshot, read_committed]. If unset, the workload will run with the default isolation level of the database.`) g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables`) g.flags.IntVar(&g.clientPartitions, `client-partitions`, 0, `Make client behave as if the tables are partitioned, but does not actually partition underlying data. Requires --partition-affinity.`) g.flags.IntSliceVar(&g.affinityPartitions, `partition-affinity`, nil, `Run load generator against specific partition (requires partitions). `+ @@ -214,7 +210,6 @@ var tpccMeta = workload.Meta{ g.flags.Var(&g.multiRegionCfg.survivalGoal, "survival-goal", "Survival goal to use for multi-region setups. Allowed values: [zone, region].") g.flags.IntVar(&g.activeWarehouses, `active-warehouses`, 0, `Run the load generator against a specific number of warehouses. Defaults to --warehouses'`) g.flags.BoolVar(&g.scatter, `scatter`, false, `Scatter ranges`) - g.flags.BoolVar(&g.serializable, `serializable`, false, `Force serializable mode`) g.flags.BoolVar(&g.split, `split`, false, `Split tables`) g.flags.BoolVar(&g.expensiveChecks, `expensive-checks`, false, `Run expensive checks`) g.flags.BoolVar(&g.separateColumnFamilies, `families`, false, `Use separate column families for dynamic and static columns`) @@ -344,10 +339,6 @@ func (w *tpcc) Hooks() workload.Hooks { w.activeWarehouses, w.warehouses*NumWorkersPerWarehouse) } - if w.serializable { - w.txOpts = pgx.TxOptions{IsoLevel: pgx.Serializable} - } - w.auditor = newAuditor(w.activeWarehouses) // Create a partitioner to help us partition the warehouses. The base-case is @@ -755,13 +746,10 @@ func (w *tpcc) Ops( if err != nil { return workload.QueryLoad{}, err } - parsedURL, err := url.Parse(urls[0]) - if err != nil { + if err := workload.SetDefaultIsolationLevel(urls, w.isoLevel); err != nil { return workload.QueryLoad{}, err } - w.usePostgres = parsedURL.Port() == "5432" - // We can't use a single MultiConnPool because we want to implement partition // affinity. Instead we have one MultiConnPool per server. cfg := workload.NewMultiConnPoolCfgFromFlags(w.connFlags)