Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: workload/tpcc: optionally disable txn retry loops #119268

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"fmt"
"strings"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -87,8 +86,8 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
oCarrierID := rng.Intn(10) + 1
olDeliveryD := timeutil.Now()

err := crdbpgx.ExecuteTx(
ctx, del.mcp.Get(), pgx.TxOptions{},
err := del.config.executeTx(
ctx, del.mcp.Get(),
func(tx pgx.Tx) error {
// 2.7.4.2. For each district:
dIDoIDPairs := make(map[int]int)
Expand Down
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/new_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"strings"
"time"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -211,8 +210,8 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {

d.oEntryD = timeutil.Now()

err := crdbpgx.ExecuteTx(
ctx, n.mcp.Get(), pgx.TxOptions{},
err := n.config.executeTx(
ctx, n.mcp.Get(),
func(tx pgx.Tx) error {
// Select the district tax rate and next available order number, bumping it.
var dNextOID int
Expand Down
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/order_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"time"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
Expand Down Expand Up @@ -133,8 +132,8 @@ func (o *orderStatus) run(ctx context.Context, wID int) (interface{}, error) {
d.cID = o.config.randCustomerID(rng)
}

if err := crdbpgx.ExecuteTx(
ctx, o.mcp.Get(), pgx.TxOptions{},
if err := o.config.executeTx(
ctx, o.mcp.Get(),
func(tx pgx.Tx) error {
// 2.6.2.2 explains this entire transaction.

Expand Down
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"fmt"
"time"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
Expand Down Expand Up @@ -189,8 +188,8 @@ func (p *payment) run(ctx context.Context, wID int) (interface{}, error) {
d.cID = p.config.randCustomerID(rng)
}

if err := crdbpgx.ExecuteTx(
ctx, p.mcp.Get(), pgx.TxOptions{},
if err := p.config.executeTx(
ctx, p.mcp.Get(),
func(tx pgx.Tx) error {
var wName, dName string
// Update warehouse with payment
Expand Down
5 changes: 2 additions & 3 deletions pkg/workload/tpcc/stock_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package tpcc
import (
"context"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -96,8 +95,8 @@ func (s *stockLevel) run(ctx context.Context, wID int) (interface{}, error) {
dID: rng.Intn(10) + 1,
}

if err := crdbpgx.ExecuteTx(
ctx, s.mcp.Get(), pgx.TxOptions{},
if err := s.config.executeTx(
ctx, s.mcp.Get(),
func(tx pgx.Tx) error {
var dNextOID int
if err := s.selectDNextOID.QueryRowTx(
Expand Down
25 changes: 25 additions & 0 deletions pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

crdbpgx "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -44,6 +45,7 @@ type tpcc struct {
numConns int
idleConns int
isoLevel string
txnRetries bool

// Used in non-uniform random data generation. cLoad is the value of C at load
// time. cCustomerID is the value of C for the customer id generator. cItemID
Expand Down Expand Up @@ -171,6 +173,7 @@ var tpccMeta = workload.Meta{
`conns`: {RuntimeOnly: true},
`idle-conns`: {RuntimeOnly: true},
`isolation-level`: {RuntimeOnly: true},
`txn-retries`: {RuntimeOnly: true},
`expensive-checks`: {RuntimeOnly: true, CheckConsistencyOnly: true},
`local-warehouses`: {RuntimeOnly: true},
`regions`: {RuntimeOnly: true},
Expand Down Expand Up @@ -199,6 +202,7 @@ var tpccMeta = workload.Meta{
))
g.flags.IntVar(&g.idleConns, `idle-conns`, 0, `Number of idle connections. Defaults to 0`)
g.flags.StringVar(&g.isoLevel, `isolation-level`, ``, `Isolation level to run workload transactions under [serializable, snapshot, read_committed]. If unset, the workload will run with the default isolation level of the database.`)
g.flags.BoolVar(&g.txnRetries, `txn-retries`, true, `Run transactions in a retry loop`)
g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables`)
g.flags.IntVar(&g.clientPartitions, `client-partitions`, 0, `Make client behave as if the tables are partitioned, but does not actually partition underlying data. Requires --partition-affinity.`)
g.flags.IntSliceVar(&g.affinityPartitions, `partition-affinity`, nil, `Run load generator against specific partition (requires partitions). `+
Expand Down Expand Up @@ -897,6 +901,27 @@ func (w *tpcc) Ops(
return ql, nil
}

// executeTx runs fn inside a transaction with retries, if enabled. On
// non-retryable failures, the transaction is aborted and rolled back; on
// success, the transaction is committed.
func (w *tpcc) executeTx(ctx context.Context, conn crdbpgx.Conn, fn func(pgx.Tx) error) error {
txOpts := pgx.TxOptions{}
if w.txnRetries {
return crdbpgx.ExecuteTx(ctx, conn, txOpts, fn)
}

tx, err := conn.BeginTx(ctx, txOpts)
if err != nil {
return err
}
err = fn(tx)
if err != nil {
_ = tx.Rollback(ctx)
return err
}
return tx.Commit(ctx)
}

func (w *tpcc) partitionAndScatter(urls []string) error {
db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `))
if err != nil {
Expand Down