diff --git a/pkg/workload/tpcc/delivery.go b/pkg/workload/tpcc/delivery.go index 36ce23dc852d..31409385f344 100644 --- a/pkg/workload/tpcc/delivery.go +++ b/pkg/workload/tpcc/delivery.go @@ -16,7 +16,6 @@ import ( "fmt" "strings" - crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" @@ -87,8 +86,8 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) { oCarrierID := rng.Intn(10) + 1 olDeliveryD := timeutil.Now() - err := crdbpgx.ExecuteTx( - ctx, del.mcp.Get(), pgx.TxOptions{}, + err := del.config.executeTx( + ctx, del.mcp.Get(), func(tx pgx.Tx) error { // 2.7.4.2. For each district: dIDoIDPairs := make(map[int]int) diff --git a/pkg/workload/tpcc/new_order.go b/pkg/workload/tpcc/new_order.go index 36dbde4b4381..9233e8f56a0c 100644 --- a/pkg/workload/tpcc/new_order.go +++ b/pkg/workload/tpcc/new_order.go @@ -17,7 +17,6 @@ import ( "strings" "time" - crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" @@ -211,8 +210,8 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) { d.oEntryD = timeutil.Now() - err := crdbpgx.ExecuteTx( - ctx, n.mcp.Get(), pgx.TxOptions{}, + err := n.config.executeTx( + ctx, n.mcp.Get(), func(tx pgx.Tx) error { // Select the district tax rate and next available order number, bumping it. var dNextOID int diff --git a/pkg/workload/tpcc/order_status.go b/pkg/workload/tpcc/order_status.go index 54e1234c103a..ab3d63253394 100644 --- a/pkg/workload/tpcc/order_status.go +++ b/pkg/workload/tpcc/order_status.go @@ -14,7 +14,6 @@ import ( "context" "time" - crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" @@ -133,8 +132,8 @@ func (o *orderStatus) run(ctx context.Context, wID int) (interface{}, error) { d.cID = o.config.randCustomerID(rng) } - if err := crdbpgx.ExecuteTx( - ctx, o.mcp.Get(), pgx.TxOptions{}, + if err := o.config.executeTx( + ctx, o.mcp.Get(), func(tx pgx.Tx) error { // 2.6.2.2 explains this entire transaction. diff --git a/pkg/workload/tpcc/payment.go b/pkg/workload/tpcc/payment.go index a83ce3134c45..0ee865d5daad 100644 --- a/pkg/workload/tpcc/payment.go +++ b/pkg/workload/tpcc/payment.go @@ -15,7 +15,6 @@ import ( "fmt" "time" - crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" @@ -189,8 +188,8 @@ func (p *payment) run(ctx context.Context, wID int) (interface{}, error) { d.cID = p.config.randCustomerID(rng) } - if err := crdbpgx.ExecuteTx( - ctx, p.mcp.Get(), pgx.TxOptions{}, + if err := p.config.executeTx( + ctx, p.mcp.Get(), func(tx pgx.Tx) error { var wName, dName string // Update warehouse with payment diff --git a/pkg/workload/tpcc/stock_level.go b/pkg/workload/tpcc/stock_level.go index 9e873423ff30..68957e16a54c 100644 --- a/pkg/workload/tpcc/stock_level.go +++ b/pkg/workload/tpcc/stock_level.go @@ -13,7 +13,6 @@ package tpcc import ( "context" - crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/jackc/pgx/v5" @@ -96,8 +95,8 @@ func (s *stockLevel) run(ctx context.Context, wID int) (interface{}, error) { dID: rng.Intn(10) + 1, } - if err := crdbpgx.ExecuteTx( - ctx, s.mcp.Get(), pgx.TxOptions{}, + if err := s.config.executeTx( + ctx, s.mcp.Get(), func(tx pgx.Tx) error { var dNextOID int if err := s.selectDNextOID.QueryRowTx( diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 3e1262dfcc56..9b889579ad0e 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -19,6 +19,7 @@ import ( "sync" "time" + crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -44,6 +45,7 @@ type tpcc struct { numConns int idleConns int isoLevel string + txnRetries bool // Used in non-uniform random data generation. cLoad is the value of C at load // time. cCustomerID is the value of C for the customer id generator. cItemID @@ -171,6 +173,7 @@ var tpccMeta = workload.Meta{ `conns`: {RuntimeOnly: true}, `idle-conns`: {RuntimeOnly: true}, `isolation-level`: {RuntimeOnly: true}, + `txn-retries`: {RuntimeOnly: true}, `expensive-checks`: {RuntimeOnly: true, CheckConsistencyOnly: true}, `local-warehouses`: {RuntimeOnly: true}, `regions`: {RuntimeOnly: true}, @@ -199,6 +202,7 @@ var tpccMeta = workload.Meta{ )) g.flags.IntVar(&g.idleConns, `idle-conns`, 0, `Number of idle connections. Defaults to 0`) g.flags.StringVar(&g.isoLevel, `isolation-level`, ``, `Isolation level to run workload transactions under [serializable, snapshot, read_committed]. If unset, the workload will run with the default isolation level of the database.`) + g.flags.BoolVar(&g.txnRetries, `txn-retries`, true, `Run transactions in a retry loop`) g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables`) g.flags.IntVar(&g.clientPartitions, `client-partitions`, 0, `Make client behave as if the tables are partitioned, but does not actually partition underlying data. Requires --partition-affinity.`) g.flags.IntSliceVar(&g.affinityPartitions, `partition-affinity`, nil, `Run load generator against specific partition (requires partitions). `+ @@ -897,6 +901,27 @@ func (w *tpcc) Ops( return ql, nil } +// executeTx runs fn inside a transaction with retries, if enabled. On +// non-retryable failures, the transaction is aborted and rolled back; on +// success, the transaction is committed. +func (w *tpcc) executeTx(ctx context.Context, conn crdbpgx.Conn, fn func(pgx.Tx) error) error { + txOpts := pgx.TxOptions{} + if w.txnRetries { + return crdbpgx.ExecuteTx(ctx, conn, txOpts, fn) + } + + tx, err := conn.BeginTx(ctx, txOpts) + if err != nil { + return err + } + err = fn(tx) + if err != nil { + _ = tx.Rollback(ctx) + return err + } + return tx.Commit(ctx) +} + func (w *tpcc) partitionAndScatter(urls []string) error { db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `)) if err != nil {