Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

properly suppress begin...commit in autocommit logs #4827

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,13 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltypes.Result, error)) (reply *sqltypes.Result, err error) {
conn, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options)
conn, beginSQL, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options)
if err != nil {
return nil, err
}
defer qre.tsv.te.txPool.LocalConclude(qre.ctx, conn)
//
// A better future improvement would be for LocalBegin to return the set of
// executed statements to capture the isolation level setting as well.
if qre.options.GetTransactionIsolation() != querypb.ExecuteOptions_AUTOCOMMIT {
qre.logStats.AddRewrittenSQL("begin", time.Now())
if beginSQL != "" {
qre.logStats.AddRewrittenSQL(beginSQL, time.Now())
}

reply, err = f(conn)
Expand All @@ -316,11 +313,11 @@ func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltype
qre.logStats.AddRewrittenSQL("rollback", start)
return nil, err
}
err = qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager)
commitSQL, err := qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager)

// As above LocalCommit is a no-op for autocommmit so don't log anything.
if qre.options.GetTransactionIsolation() != querypb.ExecuteOptions_AUTOCOMMIT {
qre.logStats.AddRewrittenSQL("commit", start)
if commitSQL != "" {
qre.logStats.AddRewrittenSQL(commitSQL, start)
}

if err != nil {
Expand Down
50 changes: 39 additions & 11 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,15 @@ type TxPoolController interface {
// StopGently will change the state to NotServing but first wait for transactions to wrap up
StopGently()

// Begin begins a transaction, and returns the associated transaction id.
// Begin begins a transaction, and returns the associated transaction id and the
// statement(s) used to execute the begin (if any).
//
// Subsequent statements can access the connection through the transaction id.
Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error)
Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error)

// Commit commits the specified transaction.
Commit(ctx context.Context, transactionID int64, mc messageCommitter) error
// Commit commits the specified transaction, returning the statement used to execute
// the commit or "" in autocommit settings.
Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error)

// Rollback rolls back the specified transaction.
Rollback(ctx context.Context, transactionID int64) error
Expand Down Expand Up @@ -773,13 +776,25 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti
"Begin", "begin", nil,
target, options, true /* isBegin */, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tabletenv.QueryStats.Record("BEGIN", time.Now())
startTime := time.Now()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a slight behavior change -- previously the stats were incremented when the txThrottler rejects the operation, but now it's only counted if the begin actually occurs.

I think that's an improvement personally.

if tsv.txThrottler.Throttle() {
// TODO(erez): I think this should be RESOURCE_EXHAUSTED.
return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "Transaction throttled")
}
transactionID, err = tsv.teCtrl.Begin(ctx, options)
var beginSQL string
transactionID, beginSQL, err = tsv.teCtrl.Begin(ctx, options)
logStats.TransactionID = transactionID

// Record the actual statements that were executed in the logStats.
// If nothing was actually executed, don't count the operation in
// the tablet metrics, and clear out the logStats Method so that
// handlePanicAndSendLogStats doesn't log the no-op.
logStats.OriginalSQL = beginSQL
if beginSQL != "" {
tabletenv.QueryStats.Record("BEGIN", startTime)
} else {
logStats.Method = ""
}
return err
},
)
Expand All @@ -793,9 +808,21 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra
"Commit", "commit", nil,
target, nil, false /* isBegin */, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tabletenv.QueryStats.Record("COMMIT", time.Now())
startTime := time.Now()
logStats.TransactionID = transactionID
return tsv.teCtrl.Commit(ctx, transactionID, tsv.messager)

var commitSQL string
commitSQL, err = tsv.teCtrl.Commit(ctx, transactionID, tsv.messager)

// If nothing was actually executed, don't count the operation in
// the tablet metrics, and clear out the logStats Method so that
// handlePanicAndSendLogStats doesn't log the no-op.
if commitSQL != "" {
tabletenv.QueryStats.Record("COMMIT", startTime)
} else {
logStats.Method = ""
}
return err
},
)
}
Expand Down Expand Up @@ -1471,6 +1498,7 @@ func (tsv *TabletServer) handlePanicAndSendLogStats(
// Examples where we don't send the log stats:
// - ExecuteBatch() (logStats == nil)
// - beginWaitForSameRangeTransactions() (Method == "")
// - Begin / Commit in autocommit mode
if logStats != nil && logStats.Method != "" {
logStats.Send()
}
Expand Down Expand Up @@ -1860,9 +1888,9 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real
target := tsv.target
tsv.mu.Unlock()
shr := &querypb.StreamHealthResponse{
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
TabletExternallyReparentedTimestamp: terTimestamp,
RealtimeStats: stats,
}
Expand Down
14 changes: 8 additions & 6 deletions go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,22 +264,24 @@ func (te *TxEngine) AcceptReadOnly() error {
}
}

// Begin begins a transaction, and returns the associated transaction id.
// Begin begins a transaction, and returns the associated transaction id and the
// statement(s) used to execute the begin (if any).
//
// Subsequent statements can access the connection through the transaction id.
func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) {
func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) {
te.stateLock.Lock()

canOpenTransactions := te.state == AcceptingReadOnly || te.state == AcceptingReadAndWrite
if !canOpenTransactions {
// We are not in a state where we can start new transactions. Abort.
te.stateLock.Unlock()
return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state)
return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state)
}

isWriteTransaction := options == nil || options.TransactionIsolation != querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY
if te.state == AcceptingReadOnly && isWriteTransaction {
te.stateLock.Unlock()
return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state")
return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state")
}

// By Add() to beginRequests, we block others from initiating state
Expand All @@ -292,7 +294,7 @@ func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions)
}

// Commit commits the specified transaction.
func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) error {
func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) {
return te.txPool.Commit(ctx, transactionID, mc)
}

Expand Down Expand Up @@ -466,7 +468,7 @@ outer:
if txid > maxid {
maxid = txid
}
conn, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
conn, _, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
if err != nil {
allErr.RecordError(err)
continue
Expand Down
15 changes: 9 additions & 6 deletions go/vt/vttablet/tabletserver/tx_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ func TestTxEngineClose(t *testing.T) {

// Normal close with timeout wait.
te.open()
c, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
c, beginSQL, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
if err != nil {
t.Fatal(err)
}
if beginSQL != "begin" {
t.Errorf("beginSQL: %q, want 'begin'", beginSQL)
}
c.Recycle()
start = time.Now()
te.close(false)
Expand All @@ -68,7 +71,7 @@ func TestTxEngineClose(t *testing.T) {

// Immediate close.
te.open()
c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
if err != nil {
t.Fatal(err)
}
Expand All @@ -82,7 +85,7 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with short grace period.
te.shutdownGracePeriod = 250 * time.Millisecond
te.open()
c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
if err != nil {
t.Fatal(err)
}
Expand All @@ -99,7 +102,7 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with short grace period, but pool gets empty early.
te.shutdownGracePeriod = 250 * time.Millisecond
te.open()
c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
if err != nil {
t.Fatal(err)
}
Expand All @@ -123,7 +126,7 @@ func TestTxEngineClose(t *testing.T) {

// Immediate close, but connection is in use.
te.open()
c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -492,6 +495,6 @@ func startTransaction(te *TxEngine, writeTransaction bool) error {
} else {
options.TransactionIsolation = querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY
}
_, err := te.Begin(context.Background(), options)
_, _, err := te.Begin(context.Background(), options)
return err
}
31 changes: 17 additions & 14 deletions go/vt/vttablet/tabletserver/tx_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err)
}

localConn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
localConn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
if err != nil {
return err
}
Expand All @@ -79,7 +79,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error {
return err
}

err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager)
_, err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager)
if err != nil {
return err
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error {
txe.markFailed(ctx, dtid)
return err
}
err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager)
_, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager)
if err != nil {
txe.markFailed(ctx, dtid)
return err
Expand All @@ -130,7 +130,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error {
func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
tabletenv.InternalErrors.Add("TwopcCommit", 1)
txe.te.preparedPool.SetFailed(dtid)
conn, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
conn, _, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{})
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
Expand All @@ -142,7 +142,7 @@ func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
return
}

if err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil {
if _, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil {
log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err)
}
}
Expand Down Expand Up @@ -170,7 +170,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer tabletenv.QueryStats.Record("ROLLBACK_PREPARED", time.Now())
conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
if err != nil {
goto returnConn
}
Expand All @@ -181,7 +181,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error {
goto returnConn
}

err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)
_, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)

returnConn:
if preparedConn := txe.te.preparedPool.FetchForRollback(dtid); preparedConn != nil {
Expand All @@ -200,7 +200,7 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer tabletenv.QueryStats.Record("CREATE_TRANSACTION", time.Now())
conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
if err != nil {
return err
}
Expand All @@ -210,7 +210,8 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta
if err != nil {
return err
}
return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)
_, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)
return err
}

// StartCommit atomically commits the transaction along with the
Expand All @@ -232,7 +233,8 @@ func (txe *TxExecutor) StartCommit(transactionID int64, dtid string) error {
if err != nil {
return err
}
return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)
_, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)
return err
}

// SetRollback transitions the 2pc transaction to the Rollback state.
Expand All @@ -248,7 +250,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error {
txe.te.txPool.Rollback(txe.ctx, transactionID)
}

conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
if err != nil {
return err
}
Expand All @@ -259,7 +261,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error {
return err
}

err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)
_, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)
if err != nil {
return err
}
Expand All @@ -275,7 +277,7 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error {
}
defer tabletenv.QueryStats.Record("RESOLVE", time.Now())

conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{})
if err != nil {
return err
}
Expand All @@ -285,7 +287,8 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error {
if err != nil {
return err
}
return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)
_, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager)
return err
}

// ReadTransaction returns the metadata for the sepcified dtid.
Expand Down
Loading