Skip to content

Commit

Permalink
Merge pull request #119268 from nvanbenschoten/backport23.2-117096
Browse files Browse the repository at this point in the history
release-23.2: workload/tpcc: optionally disable txn retry loops
  • Loading branch information
nvanbenschoten authored Feb 20, 2024
2 parents 5ed123f + 6d18209 commit 2711ab9
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 15 deletions.
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"fmt"
"strings"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -87,8 +86,8 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
oCarrierID := rng.Intn(10) + 1
olDeliveryD := timeutil.Now()

err := crdbpgx.ExecuteTx(
ctx, del.mcp.Get(), pgx.TxOptions{},
err := del.config.executeTx(
ctx, del.mcp.Get(),
func(tx pgx.Tx) error {
// 2.7.4.2. For each district:
dIDoIDPairs := make(map[int]int)
Expand Down
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/new_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"strings"
"time"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -211,8 +210,8 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {

d.oEntryD = timeutil.Now()

err := crdbpgx.ExecuteTx(
ctx, n.mcp.Get(), pgx.TxOptions{},
err := n.config.executeTx(
ctx, n.mcp.Get(),
func(tx pgx.Tx) error {
// Select the district tax rate and next available order number, bumping it.
var dNextOID int
Expand Down
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/order_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"time"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
Expand Down Expand Up @@ -133,8 +132,8 @@ func (o *orderStatus) run(ctx context.Context, wID int) (interface{}, error) {
d.cID = o.config.randCustomerID(rng)
}

if err := crdbpgx.ExecuteTx(
ctx, o.mcp.Get(), pgx.TxOptions{},
if err := o.config.executeTx(
ctx, o.mcp.Get(),
func(tx pgx.Tx) error {
// 2.6.2.2 explains this entire transaction.

Expand Down
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"fmt"
"time"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
Expand Down Expand Up @@ -189,8 +188,8 @@ func (p *payment) run(ctx context.Context, wID int) (interface{}, error) {
d.cID = p.config.randCustomerID(rng)
}

if err := crdbpgx.ExecuteTx(
ctx, p.mcp.Get(), pgx.TxOptions{},
if err := p.config.executeTx(
ctx, p.mcp.Get(),
func(tx pgx.Tx) error {
var wName, dName string
// Update warehouse with payment
Expand Down
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/stock_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package tpcc
import (
"context"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -96,8 +95,8 @@ func (s *stockLevel) run(ctx context.Context, wID int) (interface{}, error) {
dID: rng.Intn(10) + 1,
}

if err := crdbpgx.ExecuteTx(
ctx, s.mcp.Get(), pgx.TxOptions{},
if err := s.config.executeTx(
ctx, s.mcp.Get(),
func(tx pgx.Tx) error {
var dNextOID int
if err := s.selectDNextOID.QueryRowTx(
Expand Down
25 changes: 25 additions & 0 deletions pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -44,6 +45,7 @@ type tpcc struct {
numConns int
idleConns int
isoLevel string
txnRetries bool

// Used in non-uniform random data generation. cLoad is the value of C at load
// time. cCustomerID is the value of C for the customer id generator. cItemID
Expand Down Expand Up @@ -171,6 +173,7 @@ var tpccMeta = workload.Meta{
`conns`: {RuntimeOnly: true},
`idle-conns`: {RuntimeOnly: true},
`isolation-level`: {RuntimeOnly: true},
`txn-retries`: {RuntimeOnly: true},
`expensive-checks`: {RuntimeOnly: true, CheckConsistencyOnly: true},
`local-warehouses`: {RuntimeOnly: true},
`regions`: {RuntimeOnly: true},
Expand Down Expand Up @@ -199,6 +202,7 @@ var tpccMeta = workload.Meta{
))
g.flags.IntVar(&g.idleConns, `idle-conns`, 0, `Number of idle connections. Defaults to 0`)
g.flags.StringVar(&g.isoLevel, `isolation-level`, ``, `Isolation level to run workload transactions under [serializable, snapshot, read_committed]. If unset, the workload will run with the default isolation level of the database.`)
g.flags.BoolVar(&g.txnRetries, `txn-retries`, true, `Run transactions in a retry loop`)
g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables`)
g.flags.IntVar(&g.clientPartitions, `client-partitions`, 0, `Make client behave as if the tables are partitioned, but does not actually partition underlying data. Requires --partition-affinity.`)
g.flags.IntSliceVar(&g.affinityPartitions, `partition-affinity`, nil, `Run load generator against specific partition (requires partitions). `+
Expand Down Expand Up @@ -897,6 +901,27 @@ func (w *tpcc) Ops(
return ql, nil
}

// executeTx runs fn inside a transaction with retries, if enabled. On
// non-retryable failures, the transaction is aborted and rolled back; on
// success, the transaction is committed.
func (w *tpcc) executeTx(ctx context.Context, conn crdbpgx.Conn, fn func(pgx.Tx) error) error {
txOpts := pgx.TxOptions{}
if w.txnRetries {
return crdbpgx.ExecuteTx(ctx, conn, txOpts, fn)
}

tx, err := conn.BeginTx(ctx, txOpts)
if err != nil {
return err
}
err = fn(tx)
if err != nil {
_ = tx.Rollback(ctx)
return err
}
return tx.Commit(ctx)
}

func (w *tpcc) partitionAndScatter(urls []string) error {
db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `))
if err != nil {
Expand Down

0 comments on commit 2711ab9

Please sign in to comment.