diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index 1a52410ae896..6e38c14fa684 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" @@ -28,9 +29,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) func TestInternalExecutor(t *testing.T) { @@ -290,6 +294,91 @@ func testInternalExecutorAppNameInitialization( } } +// TestInternalExecutorTxnAbortNotSwallowed reproduces a rare bug where the +// internal executor could swallow transaction aborted errors. Specifically, an +// optimizer code path was not propagating errors, violating the contract of our +// transaction API, and causing partial split transactions that resulted in +// replica corruption errors (#32784). +// +// Note that a fix to our transaction API to eliminate this class of errors is +// proposed in #22615. +func TestInternalExecutorTxnAbortNotSwallowed(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Notify a channel whenever a HeartbeatTxn request notices that a txn has + // been aborted. + heartbeatSawAbortedTxn := make(chan uuid.UUID) + params, _ := tests.CreateTestServerParams() + params.Knobs.Store = &storage.StoreTestingKnobs{ + TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { + for i, r := range ba.Requests { + if r.GetHeartbeatTxn() != nil && br.Responses[i].GetHeartbeatTxn().Txn.Status == roachpb.ABORTED { + go func() { + heartbeatSawAbortedTxn <- ba.Txn.ID + }() + } + } + return nil + }, + } + + ctx := context.Background() + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + ie := s.InternalExecutor().(*sql.InternalExecutor) + + if _, err := sqlDB.Exec("CREATE TABLE t (a INT)"); err != nil { + t.Fatal(err) + } + + // Create a new txn, and perform a write inside of it so that its txn record + // is written and the heartbeat loop is started. The particular key doesn't + // matter. + txn := client.NewTxn(ctx, kvDB, s.NodeID(), client.RootTxn) + origTxnID := txn.ID() + if err := txn.Put(ctx, "key-foo", []byte("bar")); err != nil { + t.Fatal(err) + } + + // Abort the txn directly with a PushTxnRequest. This happens in practice + // when, e.g., deadlock between two txns is detected. + txnProto := txn.GetTxnCoordMeta(ctx).Txn + if _, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), &roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{Key: txnProto.Key}, + PusheeTxn: txnProto.TxnMeta, + PusherTxn: roachpb.Transaction{TxnMeta: enginepb.TxnMeta{Priority: roachpb.MaxTxnPriority}}, + PushType: roachpb.PUSH_ABORT, + }); pErr != nil { + t.Fatal(pErr) + } + + // Wait for one of the txn's heartbeats to notice that the heartbeat has + // failed. + for txnID := range heartbeatSawAbortedTxn { + if txnID == origTxnID { + break + } + } + + // Execute a SQL statement in the txn using an internal executor. Importantly, + // we're accessing a system table, which bypasses the descriptor cache and + // forces the optimizer to perform raw KV lookups during name resolution. + _, err := ie.Exec(ctx, t.Name(), txn, "INSERT INTO system.zones VALUES ($1, $2)", 50, "") + + // Double-check that the client.Txn has "helpfully" given us a brand new + // KV txn to replace our aborted txn. (#22615) + if origTxnID == txn.ID() { + t.Fatal("test bug: txn ID did not change after executing SQL statement on aborted txn") + } + + // We now have proof that the client.Txn saw the aborted error; the internal + // executor had better have bubbled this error up so that we know to retry our + // txn from the beginning. + if !testutils.IsError(err, "TransactionAbortedError") { + t.Fatalf("expected query execution on aborted txn to fail, but got %+v", err) + } +} + // TODO(andrei): Test that descriptor leases are released by the // InternalExecutor, with and without a higher-level txn. When there is no // higher-level txn, the leases are released normally by the txn finishing. When diff --git a/pkg/sql/opt/memo/memo.go b/pkg/sql/opt/memo/memo.go index ea975be6ba73..3ef60582dc42 100644 --- a/pkg/sql/opt/memo/memo.go +++ b/pkg/sql/opt/memo/memo.go @@ -247,29 +247,28 @@ func (m *Memo) HasPlaceholders() bool { // 5. Data source privileges: current user may no longer have access to one or // more data sources. // -func (m *Memo) IsStale(ctx context.Context, evalCtx *tree.EvalContext, catalog cat.Catalog) bool { +func (m *Memo) IsStale( + ctx context.Context, evalCtx *tree.EvalContext, catalog cat.Catalog, +) (bool, error) { // Memo is stale if the current database has changed. if m.dbName != evalCtx.SessionData.Database { - return true + return true, nil } // Memo is stale if the search path has changed. if !m.searchPath.Equals(&evalCtx.SessionData.SearchPath) { - return true + return true, nil } // Memo is stale if the location has changed. if m.locName != evalCtx.GetLocation().String() { - return true + return true, nil } // Memo is stale if the fingerprint of any data source in the memo's metadata // has changed, or if the current user no longer has sufficient privilege to // access the data source. - if !m.Metadata().CheckDependencies(ctx, catalog) { - return true - } - return false + return m.Metadata().CheckDependencies(ctx, catalog) } // InternPhysicalProps adds the given physical props to the memo if they haven't diff --git a/pkg/sql/opt/metadata.go b/pkg/sql/opt/metadata.go index 9f4cb9a9e0d7..a86a1184ccd1 100644 --- a/pkg/sql/opt/metadata.go +++ b/pkg/sql/opt/metadata.go @@ -119,17 +119,17 @@ func (md *Metadata) AddDependency(ds cat.DataSource, priv privilege.Kind) { // in order to check that the fully qualified data source names still resolve to // the same version of the same data source, and that the user still has // sufficient privileges to access the data source. -func (md *Metadata) CheckDependencies(ctx context.Context, catalog cat.Catalog) bool { +func (md *Metadata) CheckDependencies(ctx context.Context, catalog cat.Catalog) (bool, error) { for dep, privs := range md.deps { ds, err := catalog.ResolveDataSource(ctx, dep.Name()) if err != nil { - return false + return false, err } if dep.ID() != ds.ID() { - return false + return false, nil } if dep.Version() != ds.Version() { - return false + return false, nil } for privs != 0 { @@ -140,7 +140,7 @@ func (md *Metadata) CheckDependencies(ctx context.Context, catalog cat.Catalog) priv := privilege.Kind(bits.TrailingZeros32(uint32(privs))) if priv != 0 { if err = catalog.CheckPrivilege(ctx, ds, priv); err != nil { - return false + return false, err } } @@ -148,7 +148,7 @@ func (md *Metadata) CheckDependencies(ctx context.Context, catalog cat.Catalog) privs &= ^(1 << priv) } } - return true + return true, nil } // AddTable indexes a new reference to a table within the query. Separate diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index e21f5e6b0b49..48c065bd4bb2 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -439,7 +439,9 @@ func (p *planner) makeOptimizerPlan(ctx context.Context, stmt Statement) (planFl // If the prepared memo has been invalidated by schema or other changes, // re-prepare it. - if stmt.Prepared.Memo.IsStale(ctx, p.EvalContext(), &catalog) { + if ok, err := stmt.Prepared.Memo.IsStale(ctx, p.EvalContext(), &catalog); err != nil { + return 0, err + } else if !ok { stmt.Prepared.Memo, err = p.prepareMemo(ctx, &catalog, stmt) if err != nil { return 0, err @@ -451,7 +453,9 @@ func (p *planner) makeOptimizerPlan(ctx context.Context, stmt Statement) (planFl // Consult the query cache. cachedData, ok := p.execCfg.QueryCache.Find(stmt.SQL) if ok { - if cachedData.Memo.IsStale(ctx, p.EvalContext(), &catalog) { + if ok, err := cachedData.Memo.IsStale(ctx, p.EvalContext(), &catalog); err != nil { + return 0, err + } else if !ok { cachedData.Memo, err = p.prepareMemo(ctx, &catalog, stmt) if err != nil { return 0, err