From dd28090fa3cdc735d9c8c7bb63ed62d58ea3291c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Sat, 4 Nov 2023 17:31:34 +0000 Subject: [PATCH] workload/tpcc: add --isolation-level flag Informs #100176. This commit adds an `--isolation-level` flag to tpcc, which controls the isolation level to run the workload transactions under. If unset, the workload will run with the default isolation level of the database. Release note: None --- pkg/workload/tpcc/delivery.go | 2 +- pkg/workload/tpcc/new_order.go | 2 +- pkg/workload/tpcc/order_status.go | 2 +- pkg/workload/tpcc/payment.go | 2 +- pkg/workload/tpcc/stock_level.go | 2 +- pkg/workload/tpcc/tpcc.go | 22 +++++----------------- 6 files changed, 10 insertions(+), 22 deletions(-) diff --git a/pkg/workload/tpcc/delivery.go b/pkg/workload/tpcc/delivery.go index f9eda6ce3ba1..5ba00a0c759a 100644 --- a/pkg/workload/tpcc/delivery.go +++ b/pkg/workload/tpcc/delivery.go @@ -87,7 +87,7 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) { olDeliveryD := timeutil.Now() err := crdbpgx.ExecuteTx( - ctx, del.mcp.Get(), del.config.txOpts, + ctx, del.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { // 2.7.4.2. For each district: dIDoIDPairs := make(map[int]int) diff --git a/pkg/workload/tpcc/new_order.go b/pkg/workload/tpcc/new_order.go index 5152a2841a61..3fb101439f3d 100644 --- a/pkg/workload/tpcc/new_order.go +++ b/pkg/workload/tpcc/new_order.go @@ -212,7 +212,7 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) { d.oEntryD = timeutil.Now() err := crdbpgx.ExecuteTx( - ctx, n.mcp.Get(), n.config.txOpts, + ctx, n.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { // Select the district tax rate and next available order number, bumping it. var dNextOID int diff --git a/pkg/workload/tpcc/order_status.go b/pkg/workload/tpcc/order_status.go index d8c02745d7b1..54e1234c103a 100644 --- a/pkg/workload/tpcc/order_status.go +++ b/pkg/workload/tpcc/order_status.go @@ -134,7 +134,7 @@ func (o *orderStatus) run(ctx context.Context, wID int) (interface{}, error) { } if err := crdbpgx.ExecuteTx( - ctx, o.mcp.Get(), o.config.txOpts, + ctx, o.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { // 2.6.2.2 explains this entire transaction. diff --git a/pkg/workload/tpcc/payment.go b/pkg/workload/tpcc/payment.go index 38dfdca65cf5..a83ce3134c45 100644 --- a/pkg/workload/tpcc/payment.go +++ b/pkg/workload/tpcc/payment.go @@ -190,7 +190,7 @@ func (p *payment) run(ctx context.Context, wID int) (interface{}, error) { } if err := crdbpgx.ExecuteTx( - ctx, p.mcp.Get(), p.config.txOpts, + ctx, p.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { var wName, dName string // Update warehouse with payment diff --git a/pkg/workload/tpcc/stock_level.go b/pkg/workload/tpcc/stock_level.go index 2049a6bbb91d..9e873423ff30 100644 --- a/pkg/workload/tpcc/stock_level.go +++ b/pkg/workload/tpcc/stock_level.go @@ -97,7 +97,7 @@ func (s *stockLevel) run(ctx context.Context, wID int) (interface{}, error) { } if err := crdbpgx.ExecuteTx( - ctx, s.mcp.Get(), s.config.txOpts, + ctx, s.mcp.Get(), pgx.TxOptions{}, func(tx pgx.Tx) error { var dNextOID int if err := s.selectDNextOID.QueryRowTx( diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 14a90c75aa7b..3e1262dfcc56 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -14,7 +14,6 @@ import ( "context" gosql "database/sql" "fmt" - "net/url" "strconv" "strings" "sync" @@ -43,8 +42,8 @@ type tpcc struct { activeWarehouses int nowString []byte numConns int - - idleConns int + idleConns int + isoLevel string // Used in non-uniform random data generation. cLoad is the value of C at load // time. cCustomerID is the value of C for the customer id generator. cItemID @@ -88,10 +87,6 @@ type tpcc struct { // testing purposes. localWarehouses bool - usePostgres bool - serializable bool - txOpts pgx.TxOptions - expensiveChecks bool replicateStaticColumns bool @@ -170,12 +165,12 @@ var tpccMeta = workload.Meta{ `zones`: {RuntimeOnly: true}, `active-warehouses`: {RuntimeOnly: true}, `scatter`: {RuntimeOnly: true}, - `serializable`: {RuntimeOnly: true}, `split`: {RuntimeOnly: true}, `wait`: {RuntimeOnly: true}, `workers`: {RuntimeOnly: true}, `conns`: {RuntimeOnly: true}, `idle-conns`: {RuntimeOnly: true}, + `isolation-level`: {RuntimeOnly: true}, `expensive-checks`: {RuntimeOnly: true, CheckConsistencyOnly: true}, `local-warehouses`: {RuntimeOnly: true}, `regions`: {RuntimeOnly: true}, @@ -203,6 +198,7 @@ var tpccMeta = workload.Meta{ numConnsPerWarehouse, )) g.flags.IntVar(&g.idleConns, `idle-conns`, 0, `Number of idle connections. Defaults to 0`) + g.flags.StringVar(&g.isoLevel, `isolation-level`, ``, `Isolation level to run workload transactions under [serializable, snapshot, read_committed]. If unset, the workload will run with the default isolation level of the database.`) g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables`) g.flags.IntVar(&g.clientPartitions, `client-partitions`, 0, `Make client behave as if the tables are partitioned, but does not actually partition underlying data. Requires --partition-affinity.`) g.flags.IntSliceVar(&g.affinityPartitions, `partition-affinity`, nil, `Run load generator against specific partition (requires partitions). `+ @@ -214,7 +210,6 @@ var tpccMeta = workload.Meta{ g.flags.Var(&g.multiRegionCfg.survivalGoal, "survival-goal", "Survival goal to use for multi-region setups. Allowed values: [zone, region].") g.flags.IntVar(&g.activeWarehouses, `active-warehouses`, 0, `Run the load generator against a specific number of warehouses. Defaults to --warehouses'`) g.flags.BoolVar(&g.scatter, `scatter`, false, `Scatter ranges`) - g.flags.BoolVar(&g.serializable, `serializable`, false, `Force serializable mode`) g.flags.BoolVar(&g.split, `split`, false, `Split tables`) g.flags.BoolVar(&g.expensiveChecks, `expensive-checks`, false, `Run expensive checks`) g.flags.BoolVar(&g.separateColumnFamilies, `families`, false, `Use separate column families for dynamic and static columns`) @@ -344,10 +339,6 @@ func (w *tpcc) Hooks() workload.Hooks { w.activeWarehouses, w.warehouses*NumWorkersPerWarehouse) } - if w.serializable { - w.txOpts = pgx.TxOptions{IsoLevel: pgx.Serializable} - } - w.auditor = newAuditor(w.activeWarehouses) // Create a partitioner to help us partition the warehouses. The base-case is @@ -755,13 +746,10 @@ func (w *tpcc) Ops( if err != nil { return workload.QueryLoad{}, err } - parsedURL, err := url.Parse(urls[0]) - if err != nil { + if err := workload.SetDefaultIsolationLevel(urls, w.isoLevel); err != nil { return workload.QueryLoad{}, err } - w.usePostgres = parsedURL.Port() == "5432" - // We can't use a single MultiConnPool because we want to implement partition // affinity. Instead we have one MultiConnPool per server. cfg := workload.NewMultiConnPoolCfgFromFlags(w.connFlags)