Skip to content

Commit

Permalink
Merge #45966
Browse files Browse the repository at this point in the history
45966: sql: fix internal executor usage in leaf txn r=knz,andreimatei a=andreimatei

The internal executor was setting a priority on a leaf txn, which is not
allowed. It was supposed to be a no-op since the priority was the same
as the existing one, but it crashed nonetheless. This patch makes the
no-op even more no-op.

Fixes #45924

Release note: None

Release justification: crash fix

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Sep 18, 2020
2 parents 176c08c + bd255e5 commit 9787fcd
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 29 deletions.
8 changes: 0 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,10 +1122,6 @@ func (tc *TxnCoordSender) PrepareRetryableError(ctx context.Context, msg string)

// Step is part of the TxnSender interface.
func (tc *TxnCoordSender) Step(ctx context.Context) error {
if tc.typ != kv.RootTxn {
return errors.WithContextTags(
errors.AssertionFailedf("cannot call Step() in leaf txn"), ctx)
}
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.interceptorAlloc.txnSeqNumAllocator.stepLocked(ctx)
Expand All @@ -1135,10 +1131,6 @@ func (tc *TxnCoordSender) Step(ctx context.Context) error {
func (tc *TxnCoordSender) ConfigureStepping(
ctx context.Context, mode kv.SteppingMode,
) (prevMode kv.SteppingMode) {
if tc.typ != kv.RootTxn {
panic(errors.WithContextTags(
errors.AssertionFailedf("cannot call ConfigureStepping() in leaf txn"), ctx))
}
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.interceptorAlloc.txnSeqNumAllocator.configureSteppingLocked(mode)
Expand Down
12 changes: 0 additions & 12 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,10 +1199,6 @@ func (txn *Txn) ManualRestart(ctx context.Context, ts hlc.Timestamp) {
// operation (usually, but not exclusively, by a high-priority txn with
// conflicting writes).
func (txn *Txn) IsSerializablePushAndRefreshNotPossible() bool {
if txn.typ != RootTxn {
panic(
errors.AssertionFailedf("IsSerializablePushAndRefreshNotPossible() called on leaf txn"))
}
return txn.mu.sender.IsSerializablePushAndRefreshNotPossible()
}

Expand Down Expand Up @@ -1246,10 +1242,6 @@ func (txn *Txn) Active() bool {
// In step-wise execution, reads operate at a snapshot established at
// the last step, instead of the latest write if not yet enabled.
func (txn *Txn) Step(ctx context.Context) error {
if txn.typ != RootTxn {
return errors.WithContextTags(
errors.AssertionFailedf("txn.Step() only allowed in RootTxn"), ctx)
}
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.Step(ctx)
Expand All @@ -1258,10 +1250,6 @@ func (txn *Txn) Step(ctx context.Context) error {
// ConfigureStepping configures step-wise execution in the
// transaction.
func (txn *Txn) ConfigureStepping(ctx context.Context, mode SteppingMode) (prevMode SteppingMode) {
if txn.typ != RootTxn {
panic(errors.WithContextTags(
errors.AssertionFailedf("txn.ConfigureStepping() only allowed in RootTxn"), ctx))
}
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.ConfigureStepping(ctx, mode)
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,11 @@ func (s *Server) newConnExecutorWithTxn(
appStats *appStats,
) *connExecutor {
ex := s.newConnExecutor(ctx, sd, sdDefaults, stmtBuf, clientComm, memMetrics, srvMetrics, appStats)
if txn.Type() == kv.LeafTxn {
// If the txn is a leaf txn it is not allowed to perform mutations. For
// sanity, set read only on the session.
ex.dataMutator.SetReadOnly(true)
}

// The new transaction stuff below requires active monitors and traces, so
// we need to activate the executor now.
Expand All @@ -713,7 +718,7 @@ func (s *Server) newConnExecutorWithTxn(
explicitTxn,
txn.ReadTimestamp().GoTime(),
nil, /* historicalTimestamp */
txn.UserPriority(),
roachpb.UnspecifiedUserPriority,
tree.ReadWrite,
txn,
ex.transitionCtx)
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -460,6 +462,26 @@ func testInternalExecutorAppNameInitialization(
}
}

func TestInternalExecutorInLeafTxnDoesNotPanic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

rootTxn := kvDB.NewTxn(ctx, "root-txn")

ltis := rootTxn.GetLeafTxnInputState(ctx)
leafTxn := kv.NewLeafTxn(ctx, kvDB, roachpb.NodeID(1), &ltis)

ie := s.InternalExecutor().(*sql.InternalExecutor)
_, err := ie.QueryEx(
ctx, "leaf-query", leafTxn, sessiondata.InternalExecutorOverride{User: security.RootUser}, "SELECT 1",
)
require.NoError(t, err)
}

// TODO(andrei): Test that descriptor leases are released by the
// InternalExecutor, with and without a higher-level txn. When there is no
// higher-level txn, the leases are released normally by the txn finishing. When
Expand Down
24 changes: 16 additions & 8 deletions pkg/sql/txn_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,12 @@ const (
// sqlTimestamp: The timestamp to report for current_timestamp(), now() etc.
// historicalTimestamp: If non-nil indicates that the transaction is historical
// and should be fixed to this timestamp.
// priority: The transaction's priority.
// priority: The transaction's priority. Pass roachpb.UnspecifiedUserPriority if the txn arg is
// not nil.
// readOnly: The read-only character of the new txn.
// txn: If not nil, this txn will be used instead of creating a new txn. If so,
// all the other arguments need to correspond to the attributes of this txn.
// all the other arguments need to correspond to the attributes of this txn
// (unless otherwise specified).
// tranCtx: A bag of extra execution context.
func (ts *txnState) resetForNewSQLTxn(
connCtx context.Context,
Expand Down Expand Up @@ -206,17 +208,20 @@ func (ts *txnState) resetForNewSQLTxn(
if txn == nil {
ts.mu.txn = kv.NewTxnWithSteppingEnabled(ts.Ctx, tranCtx.db, tranCtx.nodeIDOrZero)
ts.mu.txn.SetDebugName(opName)
if err := ts.setPriorityLocked(priority); err != nil {
panic(err)
}
} else {
if priority != roachpb.UnspecifiedUserPriority {
panic(errors.AssertionFailedf("unexpected priority when using an existing txn: %s", priority))
}
ts.mu.txn = txn
}
ts.mu.txnStart = timeutil.Now()
ts.mu.Unlock()
if historicalTimestamp != nil {
ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp)
}
if err := ts.setPriority(priority); err != nil {
panic(err)
}
if err := ts.setReadOnlyMode(readOnly); err != nil {
panic(err)
}
Expand Down Expand Up @@ -299,9 +304,12 @@ func (ts *txnState) getReadTimestamp() hlc.Timestamp {

func (ts *txnState) setPriority(userPriority roachpb.UserPriority) error {
ts.mu.Lock()
err := ts.mu.txn.SetUserPriority(userPriority)
ts.mu.Unlock()
if err != nil {
defer ts.mu.Unlock()
return ts.setPriorityLocked(userPriority)
}

func (ts *txnState) setPriorityLocked(userPriority roachpb.UserPriority) error {
if err := ts.mu.txn.SetUserPriority(userPriority); err != nil {
return err
}
ts.priority = userPriority
Expand Down

0 comments on commit 9787fcd

Please sign in to comment.