Skip to content

Commit

Permalink
workload/tpcc: add --isolation-level flag
Browse files Browse the repository at this point in the history
Informs #100176.

This commit adds an `--isolation-level` flag to tpcc, which controls the
isolation level to run the workload transactions under. If unset, the
workload will run with the default isolation level of the database.

Release note: None
  • Loading branch information
nvanbenschoten committed Nov 4, 2023
1 parent f3a3fa0 commit dd28090
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/workload/tpcc/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
olDeliveryD := timeutil.Now()

err := crdbpgx.ExecuteTx(
ctx, del.mcp.Get(), del.config.txOpts,
ctx, del.mcp.Get(), pgx.TxOptions{},
func(tx pgx.Tx) error {
// 2.7.4.2. For each district:
dIDoIDPairs := make(map[int]int)
Expand Down
2 changes: 1 addition & 1 deletion pkg/workload/tpcc/new_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {
d.oEntryD = timeutil.Now()

err := crdbpgx.ExecuteTx(
ctx, n.mcp.Get(), n.config.txOpts,
ctx, n.mcp.Get(), pgx.TxOptions{},
func(tx pgx.Tx) error {
// Select the district tax rate and next available order number, bumping it.
var dNextOID int
Expand Down
2 changes: 1 addition & 1 deletion pkg/workload/tpcc/order_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (o *orderStatus) run(ctx context.Context, wID int) (interface{}, error) {
}

if err := crdbpgx.ExecuteTx(
ctx, o.mcp.Get(), o.config.txOpts,
ctx, o.mcp.Get(), pgx.TxOptions{},
func(tx pgx.Tx) error {
// 2.6.2.2 explains this entire transaction.

Expand Down
2 changes: 1 addition & 1 deletion pkg/workload/tpcc/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (p *payment) run(ctx context.Context, wID int) (interface{}, error) {
}

if err := crdbpgx.ExecuteTx(
ctx, p.mcp.Get(), p.config.txOpts,
ctx, p.mcp.Get(), pgx.TxOptions{},
func(tx pgx.Tx) error {
var wName, dName string
// Update warehouse with payment
Expand Down
2 changes: 1 addition & 1 deletion pkg/workload/tpcc/stock_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *stockLevel) run(ctx context.Context, wID int) (interface{}, error) {
}

if err := crdbpgx.ExecuteTx(
ctx, s.mcp.Get(), s.config.txOpts,
ctx, s.mcp.Get(), pgx.TxOptions{},
func(tx pgx.Tx) error {
var dNextOID int
if err := s.selectDNextOID.QueryRowTx(
Expand Down
22 changes: 5 additions & 17 deletions pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
gosql "database/sql"
"fmt"
"net/url"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -43,8 +42,8 @@ type tpcc struct {
activeWarehouses int
nowString []byte
numConns int

idleConns int
idleConns int
isoLevel string

// Used in non-uniform random data generation. cLoad is the value of C at load
// time. cCustomerID is the value of C for the customer id generator. cItemID
Expand Down Expand Up @@ -88,10 +87,6 @@ type tpcc struct {
// testing purposes.
localWarehouses bool

usePostgres bool
serializable bool
txOpts pgx.TxOptions

expensiveChecks bool

replicateStaticColumns bool
Expand Down Expand Up @@ -170,12 +165,12 @@ var tpccMeta = workload.Meta{
`zones`: {RuntimeOnly: true},
`active-warehouses`: {RuntimeOnly: true},
`scatter`: {RuntimeOnly: true},
`serializable`: {RuntimeOnly: true},
`split`: {RuntimeOnly: true},
`wait`: {RuntimeOnly: true},
`workers`: {RuntimeOnly: true},
`conns`: {RuntimeOnly: true},
`idle-conns`: {RuntimeOnly: true},
`isolation-level`: {RuntimeOnly: true},
`expensive-checks`: {RuntimeOnly: true, CheckConsistencyOnly: true},
`local-warehouses`: {RuntimeOnly: true},
`regions`: {RuntimeOnly: true},
Expand Down Expand Up @@ -203,6 +198,7 @@ var tpccMeta = workload.Meta{
numConnsPerWarehouse,
))
g.flags.IntVar(&g.idleConns, `idle-conns`, 0, `Number of idle connections. Defaults to 0`)
g.flags.StringVar(&g.isoLevel, `isolation-level`, ``, `Isolation level to run workload transactions under [serializable, snapshot, read_committed]. If unset, the workload will run with the default isolation level of the database.`)
g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables`)
g.flags.IntVar(&g.clientPartitions, `client-partitions`, 0, `Make client behave as if the tables are partitioned, but does not actually partition underlying data. Requires --partition-affinity.`)
g.flags.IntSliceVar(&g.affinityPartitions, `partition-affinity`, nil, `Run load generator against specific partition (requires partitions). `+
Expand All @@ -214,7 +210,6 @@ var tpccMeta = workload.Meta{
g.flags.Var(&g.multiRegionCfg.survivalGoal, "survival-goal", "Survival goal to use for multi-region setups. Allowed values: [zone, region].")
g.flags.IntVar(&g.activeWarehouses, `active-warehouses`, 0, `Run the load generator against a specific number of warehouses. Defaults to --warehouses'`)
g.flags.BoolVar(&g.scatter, `scatter`, false, `Scatter ranges`)
g.flags.BoolVar(&g.serializable, `serializable`, false, `Force serializable mode`)
g.flags.BoolVar(&g.split, `split`, false, `Split tables`)
g.flags.BoolVar(&g.expensiveChecks, `expensive-checks`, false, `Run expensive checks`)
g.flags.BoolVar(&g.separateColumnFamilies, `families`, false, `Use separate column families for dynamic and static columns`)
Expand Down Expand Up @@ -344,10 +339,6 @@ func (w *tpcc) Hooks() workload.Hooks {
w.activeWarehouses, w.warehouses*NumWorkersPerWarehouse)
}

if w.serializable {
w.txOpts = pgx.TxOptions{IsoLevel: pgx.Serializable}
}

w.auditor = newAuditor(w.activeWarehouses)

// Create a partitioner to help us partition the warehouses. The base-case is
Expand Down Expand Up @@ -755,13 +746,10 @@ func (w *tpcc) Ops(
if err != nil {
return workload.QueryLoad{}, err
}
parsedURL, err := url.Parse(urls[0])
if err != nil {
if err := workload.SetDefaultIsolationLevel(urls, w.isoLevel); err != nil {
return workload.QueryLoad{}, err
}

w.usePostgres = parsedURL.Port() == "5432"

// We can't use a single MultiConnPool because we want to implement partition
// affinity. Instead we have one MultiConnPool per server.
cfg := workload.NewMultiConnPoolCfgFromFlags(w.connFlags)
Expand Down

0 comments on commit dd28090

Please sign in to comment.