Skip to content

Commit

Permalink
sql/opt: avoid swallowing TransactionAbortedErrors
Browse files Browse the repository at this point in the history
An optimizer code path could, in rare cases, fail to propagate a
transaction aborted error. This proved disastrous as, due to a footgun
in our transaction API (cockroachdb#22615), swallowing a transaction aborted error
results in proceeding with a brand new transaction that has no knowledge
of the earlier operations performed on the original transaction.

This presented as a rare and confusing bug in splits, as the split
transaction uses an internal executor. The internal executor would
occasionally silently return a new transaction that only had half of the
necessary operations performed on it, and committing that partial
transaction would result in a "range does not match splits" error.

Fixes cockroachdb#32784.

Release note: None
  • Loading branch information
benesch committed Dec 21, 2018
1 parent 5ea8933 commit 1649565
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 16 deletions.
89 changes: 89 additions & 0 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

func TestInternalExecutor(t *testing.T) {
Expand Down Expand Up @@ -290,6 +294,91 @@ func testInternalExecutorAppNameInitialization(
}
}

// TestInternalExecutorTxnAbortNotSwallowed reproduces a rare bug where the
// internal executor could swallow transaction aborted errors. Specifically, an
// optimizer code path was not propagating errors, violating the contract of our
// transaction API, and causing partial split transactions that resulted in
// replica corruption errors (#32784).
//
// Note that a fix to our transaction API to eliminate this class of errors is
// proposed in #22615.
func TestInternalExecutorTxnAbortNotSwallowed(t *testing.T) {
defer leaktest.AfterTest(t)()

// Notify a channel whenever a HeartbeatTxn request notices that a txn has
// been aborted.
heartbeatSawAbortedTxn := make(chan uuid.UUID)
params, _ := tests.CreateTestServerParams()
params.Knobs.Store = &storage.StoreTestingKnobs{
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for i, r := range ba.Requests {
if r.GetHeartbeatTxn() != nil && br.Responses[i].GetHeartbeatTxn().Txn.Status == roachpb.ABORTED {
go func() {
heartbeatSawAbortedTxn <- ba.Txn.ID
}()
}
}
return nil
},
}

ctx := context.Background()
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
ie := s.InternalExecutor().(*sql.InternalExecutor)

if _, err := sqlDB.Exec("CREATE TABLE t (a INT)"); err != nil {
t.Fatal(err)
}

// Create a new txn, and perform a write inside of it so that its txn record
// is written and the heartbeat loop is started. The particular key doesn't
// matter.
txn := client.NewTxn(ctx, kvDB, s.NodeID(), client.RootTxn)
origTxnID := txn.ID()
if err := txn.Put(ctx, "key-foo", []byte("bar")); err != nil {
t.Fatal(err)
}

// Abort the txn directly with a PushTxnRequest. This happens in practice
// when, e.g., deadlock between two txns is detected.
txnProto := txn.GetTxnCoordMeta(ctx).Txn
if _, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txnProto.Key},
PusheeTxn: txnProto.TxnMeta,
PusherTxn: roachpb.Transaction{TxnMeta: enginepb.TxnMeta{Priority: roachpb.MaxTxnPriority}},
PushType: roachpb.PUSH_ABORT,
}); pErr != nil {
t.Fatal(pErr)
}

// Wait for one of the txn's heartbeats to notice that the heartbeat has
// failed.
for txnID := range heartbeatSawAbortedTxn {
if txnID == origTxnID {
break
}
}

// Execute a SQL statement in the txn using an internal executor. Importantly,
// we're accessing a system table, which bypasses the descriptor cache and
// forces the optimizer to perform raw KV lookups during name resolution.
_, err := ie.Exec(ctx, t.Name(), txn, "INSERT INTO system.zones VALUES ($1, $2)", 50, "")

// Double-check that the client.Txn has "helpfully" given us a brand new
// KV txn to replace our aborted txn. (#22615)
if origTxnID == txn.ID() {
t.Fatal("test bug: txn ID did not change after executing SQL statement on aborted txn")
}

// We now have proof that the client.Txn saw the aborted error; the internal
// executor had better have bubbled this error up so that we know to retry our
// txn from the beginning.
if !testutils.IsError(err, "TransactionAbortedError") {
t.Fatalf("expected query execution on aborted txn to fail, but got %+v", err)
}
}

// TODO(andrei): Test that descriptor leases are released by the
// InternalExecutor, with and without a higher-level txn. When there is no
// higher-level txn, the leases are released normally by the txn finishing. When
Expand Down
15 changes: 7 additions & 8 deletions pkg/sql/opt/memo/memo.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,29 +247,28 @@ func (m *Memo) HasPlaceholders() bool {
// 5. Data source privileges: current user may no longer have access to one or
// more data sources.
//
func (m *Memo) IsStale(ctx context.Context, evalCtx *tree.EvalContext, catalog cat.Catalog) bool {
func (m *Memo) IsStale(
ctx context.Context, evalCtx *tree.EvalContext, catalog cat.Catalog,
) (bool, error) {
// Memo is stale if the current database has changed.
if m.dbName != evalCtx.SessionData.Database {
return true
return true, nil
}

// Memo is stale if the search path has changed.
if !m.searchPath.Equals(&evalCtx.SessionData.SearchPath) {
return true
return true, nil
}

// Memo is stale if the location has changed.
if m.locName != evalCtx.GetLocation().String() {
return true
return true, nil
}

// Memo is stale if the fingerprint of any data source in the memo's metadata
// has changed, or if the current user no longer has sufficient privilege to
// access the data source.
if !m.Metadata().CheckDependencies(ctx, catalog) {
return true
}
return false
return m.Metadata().CheckDependencies(ctx, catalog)
}

// InternPhysicalProps adds the given physical props to the memo if they haven't
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/opt/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ func (md *Metadata) AddDependency(ds cat.DataSource, priv privilege.Kind) {
// in order to check that the fully qualified data source names still resolve to
// the same version of the same data source, and that the user still has
// sufficient privileges to access the data source.
func (md *Metadata) CheckDependencies(ctx context.Context, catalog cat.Catalog) bool {
func (md *Metadata) CheckDependencies(ctx context.Context, catalog cat.Catalog) (bool, error) {
for dep, privs := range md.deps {
ds, err := catalog.ResolveDataSource(ctx, dep.Name())
if err != nil {
return false
return false, err
}
if dep.ID() != ds.ID() {
return false
return false, nil
}
if dep.Version() != ds.Version() {
return false
return false, nil
}

for privs != 0 {
Expand All @@ -140,15 +140,15 @@ func (md *Metadata) CheckDependencies(ctx context.Context, catalog cat.Catalog)
priv := privilege.Kind(bits.TrailingZeros32(uint32(privs)))
if priv != 0 {
if err = catalog.CheckPrivilege(ctx, ds, priv); err != nil {
return false
return false, err
}
}

// Set the just-handled privilege bit to zero and look for next.
privs &= ^(1 << priv)
}
}
return true
return true, nil
}

// AddTable indexes a new reference to a table within the query. Separate
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,9 @@ func (p *planner) makeOptimizerPlan(ctx context.Context, stmt Statement) (planFl

// If the prepared memo has been invalidated by schema or other changes,
// re-prepare it.
if stmt.Prepared.Memo.IsStale(ctx, p.EvalContext(), &catalog) {
if ok, err := stmt.Prepared.Memo.IsStale(ctx, p.EvalContext(), &catalog); err != nil {
return 0, err
} else if !ok {
stmt.Prepared.Memo, err = p.prepareMemo(ctx, &catalog, stmt)
if err != nil {
return 0, err
Expand All @@ -451,7 +453,9 @@ func (p *planner) makeOptimizerPlan(ctx context.Context, stmt Statement) (planFl
// Consult the query cache.
cachedData, ok := p.execCfg.QueryCache.Find(stmt.SQL)
if ok {
if cachedData.Memo.IsStale(ctx, p.EvalContext(), &catalog) {
if ok, err := cachedData.Memo.IsStale(ctx, p.EvalContext(), &catalog); err != nil {
return 0, err
} else if !ok {
cachedData.Memo, err = p.prepareMemo(ctx, &catalog, stmt)
if err != nil {
return 0, err
Expand Down

0 comments on commit 1649565

Please sign in to comment.