diff --git a/pkg/sql/as_of_test.go b/pkg/sql/as_of_test.go index b14cf7218aa4..cd7880efb4e4 100644 --- a/pkg/sql/as_of_test.go +++ b/pkg/sql/as_of_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -413,3 +414,38 @@ func TestShowTraceAsOfTime(t *testing.T) { t.Fatalf("expected to find one matching row, got %v", i) } } + +func TestAsOfResolveEnum(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true}) + defer srv.Stopper().Stop(context.Background()) + defer db.Close() + + runner := sqlutils.MakeSQLRunner(db) + var tsBeforeTypExists string + runner.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsBeforeTypExists) + runner.Exec(t, "CREATE TYPE typ AS ENUM('hi', 'hello')") + + // Use a prepared statement. + runner.ExpectErr( + t, + "type with ID [0-9]+ does not exist", + fmt.Sprintf( + "SELECT $1::typ FROM generate_series(1,1) AS OF SYSTEM TIME %s", + tsBeforeTypExists, + ), + "hi", + ) + + // Use a simple query. + runner.ExpectErr( + t, + "type \"typ\" does not exist", + fmt.Sprintf( + "SELECT 'hi'::typ FROM generate_series(1,1) AS OF SYSTEM TIME %s", + tsBeforeTypExists, + ), + ) +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 889f9d30c0cc..c1ca02a1d8fb 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1951,13 +1951,11 @@ func (ex *connExecutor) execCmd() error { // inside a BEGIN/COMMIT transaction block (“close” meaning to commit if no // error, or roll back if error)." // In other words, Sync is treated as commit for implicit transactions. - if op, ok := ex.machine.CurState().(stateOpen); ok { - if op.ImplicitTxn.Get() { - // Note that the handling of ev in the case of Sync is massaged a bit - // later - Sync is special in that, if it encounters an error, that does - // *not *cause the session to ignore all commands until the next Sync. - ev, payload = ex.handleAutoCommit(ctx, &tree.CommitTransaction{}) - } + if ex.implicitTxn() { + // Note that the handling of ev in the case of Sync is massaged a bit + // later - Sync is special in that, if it encounters an error, that does + // *not *cause the session to ignore all commands until the next Sync. + ev, payload = ex.handleAutoCommit(ctx, &tree.CommitTransaction{}) } // Note that the Sync result will flush results to the network connection. res = ex.clientComm.CreateSyncResult(pos) @@ -2525,7 +2523,6 @@ func (ex *connExecutor) setTransactionModes( if err := ex.state.setHistoricalTimestamp(ctx, asOfTs); err != nil { return err } - ex.state.sqlTimestamp = asOfTs.GoTime() if rwMode == tree.UnspecifiedReadWriteMode { rwMode = tree.ReadOnly } @@ -2629,10 +2626,16 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo // statement is supposed to have a different timestamp, the evalCtx generally // shouldn't be reused across statements. func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, stmtTS time.Time) { + newTxn := txn == nil || evalCtx.Txn != txn evalCtx.TxnState = ex.getTransactionState() evalCtx.TxnReadOnly = ex.state.readOnly evalCtx.TxnImplicit = ex.implicitTxn() - evalCtx.StmtTimestamp = stmtTS + if newTxn || !ex.implicitTxn() { + // Only update the stmt timestamp if in a new txn or an explicit txn. This is because this gets + // called multiple times during an extended protocol implicit txn, but we + // want all those stages to share the same stmtTS. + evalCtx.StmtTimestamp = stmtTS + } evalCtx.TxnTimestamp = ex.state.sqlTimestamp evalCtx.Placeholders = nil evalCtx.Annotations = nil @@ -2655,7 +2658,10 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, nextMax := minTSErr.MinTimestampBound ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax) evalCtx.AsOfSystemTime.MaxTimestampBound = nextMax - } else { + } else if newTxn { + // Otherwise, only change the historical timestamps if this is a new txn. + // This is because resetPlanner can be called multiple times for the same + // txn during the extended protocol. ex.extraTxnState.descCollection.ResetMaxTimestampBound() evalCtx.AsOfSystemTime = nil } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 7e577c57bf5a..a674d2c1b623 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -521,8 +521,12 @@ func (ex *connExecutor) execStmtInOpenState( switch s := ast.(type) { case *tree.BeginTransaction: - // BEGIN is always an error when in the Open state. It's legitimate only in - // the NoTxn state. + // BEGIN is only allowed if we are in an implicit txn that was started + // in the extended protocol. + if isExtendedProtocol && os.ImplicitTxn.Get() { + ex.sessionDataStack.PushTopClone() + return eventTxnUpgradeToExplicit{}, nil, nil + } return makeErrEvent(errTransactionInProgress) case *tree.CommitTransaction: @@ -599,57 +603,8 @@ func (ex *connExecutor) execStmtInOpenState( // For regular statements (the ones that get to this point), we // don't return any event unless an error happens. - if os.ImplicitTxn.Get() { - // If AS OF SYSTEM TIME is already set, this has to be a bounded staleness - // read with nearest_only=True during a retry. - if p.extendedEvalCtx.AsOfSystemTime == nil { - asOf, err := p.isAsOf(ctx, ast) - if err != nil { - return makeErrEvent(err) - } - if asOf != nil { - p.extendedEvalCtx.AsOfSystemTime = asOf - if !asOf.BoundedStaleness { - p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime()) - if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil { - return makeErrEvent(err) - } - } - } - } else { - if !p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness || - p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() { - return makeErrEvent(errors.AssertionFailedf( - "expected bounded_staleness set with a max_timestamp_bound", - )) - } - } - } else { - // If we're in an explicit txn, we allow AOST but only if it matches with - // the transaction's timestamp. This is useful for running AOST statements - // using the InternalExecutor inside an external transaction; one might want - // to do that to force p.avoidLeasedDescriptors to be set below. - asOf, err := p.isAsOf(ctx, ast) - if err != nil { - return makeErrEvent(err) - } - if asOf != nil { - if asOf.BoundedStaleness { - return makeErrEvent( - pgerror.Newf( - pgcode.FeatureNotSupported, - "cannot use a bounded staleness query in a transaction", - ), - ) - } - if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs { - err = pgerror.Newf(pgcode.Syntax, - "inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs) - err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME") - return makeErrEvent(err) - } - p.extendedEvalCtx.AsOfSystemTime = asOf - } + if err := ex.handleAOST(ctx, ast); err != nil { + return makeErrEvent(err) } // The first order of business is to ensure proper sequencing @@ -776,6 +731,66 @@ func (ex *connExecutor) execStmtInOpenState( return nil, nil, nil } +// handleAOST gets the AsOfSystemTime clause from the statement, and sets +// the timestamps of the transaction accordingly. +func (ex *connExecutor) handleAOST(ctx context.Context, stmt tree.Statement) error { + if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { + return errors.AssertionFailedf( + "cannot handle AOST clause without a transaction", + ) + } + p := &ex.planner + asOf, err := p.isAsOf(ctx, stmt) + if err != nil { + return err + } + if asOf == nil { + return nil + } + if ex.implicitTxn() { + if p.extendedEvalCtx.AsOfSystemTime == nil { + p.extendedEvalCtx.AsOfSystemTime = asOf + if !asOf.BoundedStaleness { + p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime()) + if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil { + return err + } + } + } else if p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness { + // This has to be a bounded staleness read with nearest_only=True during + // a retry. The AOST read timestamps are expected to differ. + if p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() { + return errors.AssertionFailedf( + "expected bounded_staleness set with a max_timestamp_bound", + ) + } + } else if *p.extendedEvalCtx.AsOfSystemTime != *asOf { + return errors.AssertionFailedf( + "cannot specify AS OF SYSTEM TIME with different timestamps", + ) + } + } else { + // If we're in an explicit txn, we allow AOST but only if it matches with + // the transaction's timestamp. This is useful for running AOST statements + // using the InternalExecutor inside an external transaction; one might want + // to do that to force p.avoidLeasedDescriptors to be set below. + if asOf.BoundedStaleness { + return pgerror.Newf( + pgcode.FeatureNotSupported, + "cannot use a bounded staleness query in a transaction", + ) + } + if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs { + err = pgerror.Newf(pgcode.Syntax, + "inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs) + err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME") + return err + } + p.extendedEvalCtx.AsOfSystemTime = asOf + } + return nil +} + func formatWithPlaceholders(ast tree.Statement, evalCtx *tree.EvalContext) string { var fmtCtx *tree.FmtCtx fmtFlags := tree.FmtSimple @@ -1463,16 +1478,7 @@ func (ex *connExecutor) beginTransactionTimestampsAndReadMode( ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) p := &ex.planner - // NB: this use of p.txn is totally bogus. The planner's txn should - // definitely be finalized at this point. We preserve it here because we - // need to make sure that the planner's txn is not made to be nil in the - // case of an error below. The planner's txn is never written to nil at - // any other point after the first prepare or exec has been run. We abuse - // this transaction in bind and some other contexts for resolving types and - // oids. Avoiding set this to nil side-steps a nil pointer panic but is still - // awful. Instead we ought to clear the planner state when we clear the reset - // the connExecutor in finishTxn. - ex.resetPlanner(ctx, p, p.txn, now) + ex.resetPlanner(ctx, p, nil, now) asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause) if err != nil { return 0, time.Time{}, nil, err @@ -1527,25 +1533,39 @@ func (ex *connExecutor) execStmtInNoTxnState( *tree.RollbackTransaction, *tree.SetTransaction, *tree.Savepoint: return ex.makeErrEvent(errNoTransactionInProgress, ast) default: - // NB: Implicit transactions are created with the session's default - // historical timestamp even though the statement itself might contain - // an AOST clause. In these cases the clause is evaluated and applied - // execStmtInOpenState. - noBeginStmt := (*tree.BeginTransaction)(nil) - mode, sqlTs, historicalTs, err := ex.beginTransactionTimestampsAndReadMode(ctx, noBeginStmt) - if err != nil { - return ex.makeErrEvent(err, s) - } - return eventStartImplicitTxn, - makeEventTxnStartPayload( - ex.txnPriorityWithSessionDefault(tree.UnspecifiedUserPriority), - mode, - sqlTs, - historicalTs, - ex.transitionCtx) + return ex.beginImplicitTxn(ctx, ast) } } +// beginImplicitTxn starts an implicit transaction. The fsm.Event that is +// returned does not cause the state machine to advance, so the same command +// will be executed again, but with an implicit transaction. +// Implicit transactions are created with the session's default +// historical timestamp even though the statement itself might contain +// an AOST clause. In these cases the clause is evaluated and applied +// when the command is executed again. +func (ex *connExecutor) beginImplicitTxn( + ctx context.Context, ast tree.Statement, +) (fsm.Event, fsm.EventPayload) { + // NB: Implicit transactions are created with the session's default + // historical timestamp even though the statement itself might contain + // an AOST clause. In these cases the clause is evaluated and applied + // when the command is evaluated again. + noBeginStmt := (*tree.BeginTransaction)(nil) + mode, sqlTs, historicalTs, err := ex.beginTransactionTimestampsAndReadMode(ctx, noBeginStmt) + if err != nil { + return ex.makeErrEvent(err, ast) + } + return eventStartImplicitTxn, + makeEventTxnStartPayload( + ex.txnPriorityWithSessionDefault(tree.UnspecifiedUserPriority), + mode, + sqlTs, + historicalTs, + ex.transitionCtx, + ) +} + // execStmtInAbortedState executes a statement in a txn that's in state // Aborted or RestartWait. All statements result in error events except: // - COMMIT / ROLLBACK: aborts the current transaction. diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index e9c09a98262c..700f6fb834e2 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -31,13 +31,20 @@ import ( func (ex *connExecutor) execPrepare( ctx context.Context, parseCmd PrepareStmt, ) (fsm.Event, fsm.EventPayload) { - ctx, sp := tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "prepare stmt") - defer sp.Finish() - retErr := func(err error) (fsm.Event, fsm.EventPayload) { return ex.makeErrEvent(err, parseCmd.AST) } + // Preparing needs a transaction because it needs to retrieve db/table + // descriptors for type checking. This implicit txn will be open until + // the Sync message is handled. + if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { + return ex.beginImplicitTxn(ctx, parseCmd.AST) + } + + ctx, sp := tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "prepare stmt") + defer sp.Finish() + // The anonymous statement can be overwritten. if parseCmd.Name != "" { if _, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[parseCmd.Name]; ok { @@ -153,18 +160,8 @@ func (ex *connExecutor) prepare( return prepared, nil } - // Preparing needs a transaction because it needs to retrieve db/table - // descriptors for type checking. If we already have an open transaction for - // this planner, use it. Using the user's transaction here is critical for - // proper deadlock detection. At the time of writing, it is the case that any - // data read on behalf of this transaction is not cached for use in other - // transactions. It's critical that this fact remain true but nothing really - // enforces it. If we create a new transaction (newTxn is true), we'll need to - // finish it before we return. - var flags planFlags prepare := func(ctx context.Context, txn *kv.Txn) (err error) { - ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) p := &ex.planner if origin != PreparedStatementOriginSQL { // If the PREPARE command was issued as a SQL statement, then we @@ -173,7 +170,8 @@ func (ex *connExecutor) prepare( // instrumented the planner to collect execution statistics, and // resetting the planner here would break the assumptions of the // instrumentation. - ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */) + ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) + ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime()) } if placeholderHints == nil { @@ -265,19 +263,9 @@ func (ex *connExecutor) populatePrepared( return 0, err } p.extendedEvalCtx.PrepareOnly = true - - asOf, err := p.isAsOf(ctx, stmt.AST) - if err != nil { + if err := ex.handleAOST(ctx, p.stmt.AST); err != nil { return 0, err } - if asOf != nil { - p.extendedEvalCtx.AsOfSystemTime = asOf - if !asOf.BoundedStaleness { - if err := txn.SetFixedTimestamp(ctx, asOf.Timestamp); err != nil { - return 0, err - } - } - } // PREPARE has a limited subset of statements it can be run with. Postgres // only allows SELECT, INSERT, UPDATE, DELETE and VALUES statements to be @@ -300,11 +288,27 @@ func (ex *connExecutor) populatePrepared( func (ex *connExecutor) execBind( ctx context.Context, bindCmd BindStmt, ) (fsm.Event, fsm.EventPayload) { - retErr := func(err error) (fsm.Event, fsm.EventPayload) { return eventNonRetriableErr{IsCommit: fsm.False}, eventNonRetriableErrPayload{err: err} } + ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[bindCmd.PreparedStatementName] + if !ok { + return retErr(pgerror.Newf( + pgcode.InvalidSQLStatementName, + "unknown prepared statement %q", bindCmd.PreparedStatementName)) + } + + // We need to make sure type resolution happens within a transaction. + // Otherwise, for user-defined types we won't take the correct leases and + // will get back out of date type information. + // This code path is only used by the wire-level Bind command. The + // SQL EXECUTE command (which also needs to bind and resolve types) is + // handled separately in conn_executor_exec. + if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { + return ex.beginImplicitTxn(ctx, ps.AST) + } + portalName := bindCmd.PortalName // The unnamed portal can be freely overwritten. if portalName != "" { @@ -317,13 +321,6 @@ func (ex *connExecutor) execBind( ex.deletePortal(ctx, "") } - ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[bindCmd.PreparedStatementName] - if !ok { - return retErr(pgerror.Newf( - pgcode.InvalidSQLStatementName, - "unknown prepared statement %q", bindCmd.PreparedStatementName)) - } - numQArgs := uint16(len(ps.InferredTypes)) // Decode the arguments, except for internal queries for which we just verify @@ -375,19 +372,12 @@ func (ex *connExecutor) execBind( "expected %d arguments, got %d", numQArgs, len(bindCmd.Args))) } - // We need to make sure type resolution happens within a transaction. - // Otherwise, for user-defined types we won't take the correct leases and - // will get back out of date type information. However, if there are no - // user-defined types to resolve, then a transaction is not needed, so - // txn is allowed to be nil. - // This code path is only used by the wire-level Bind command. The - // SQL EXECUTE command (which also needs to bind and resolve types) is - // handled separately in conn_executor_exec. resolve := func(ctx context.Context, txn *kv.Txn) (err error) { - if txn != nil { - ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) - p := &ex.planner - ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */) + ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) + p := &ex.planner + ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */) + if err := ex.handleAOST(ctx, ps.AST); err != nil { + return err } for i, arg := range bindCmd.Args { @@ -420,36 +410,9 @@ func (ex *connExecutor) execBind( return nil } - needsTxn := false - for i, arg := range bindCmd.Args { - t := ps.InferredTypes[i] - // User-defined types and OID types both need a transaction. - if typ, ok := types.OidToType[t]; arg != nil && (!ok || typ.Family() == types.OidFamily) { - needsTxn = true - break - } - } - - if !needsTxn { - if err := resolve(ctx, nil /* txn */); err != nil { - return retErr(err) - } - } else if txn := ex.state.mu.txn; txn != nil && txn.IsOpen() { - // Use the existing transaction. - if err := resolve(ctx, txn); err != nil { - return retErr(err) - } - } else { - // Use a new transaction. This will handle retriable errors here rather - // than bubbling them up to the connExecutor state machine. - if err := ex.server.cfg.DB.Txn(ctx, resolve); err != nil { - return retErr(err) - } - // Bind with an implicit transaction will end up creating - // a new transaction. Once this transaction is complete, - // we can safely release the leases, otherwise we will - // incorrectly hold leases for later operations. - ex.extraTxnState.descCollection.ReleaseAll(ctx) + // Use the existing transaction. + if err := resolve(ctx, ex.state.mu.txn); err != nil { + return retErr(err) } } diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index ba0c2bc820fc..07a4bdb72d17 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -128,6 +128,8 @@ func makeEventTxnStartPayload( } } +type eventTxnUpgradeToExplicit struct{} + type eventTxnFinishCommitted struct{} type eventTxnFinishAborted struct{} @@ -191,14 +193,15 @@ type payloadWithError interface { errorCause() error } -func (eventTxnStart) Event() {} -func (eventTxnFinishCommitted) Event() {} -func (eventTxnFinishAborted) Event() {} -func (eventSavepointRollback) Event() {} -func (eventNonRetriableErr) Event() {} -func (eventRetriableErr) Event() {} -func (eventTxnRestart) Event() {} -func (eventTxnReleased) Event() {} +func (eventTxnStart) Event() {} +func (eventTxnFinishCommitted) Event() {} +func (eventTxnFinishAborted) Event() {} +func (eventSavepointRollback) Event() {} +func (eventNonRetriableErr) Event() {} +func (eventRetriableErr) Event() {} +func (eventTxnRestart) Event() {} +func (eventTxnReleased) Event() {} +func (eventTxnUpgradeToExplicit) Event() {} // Other constants. @@ -293,6 +296,17 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateNoTxn{}, Action: cleanupAndFinishOnError, }, + eventTxnUpgradeToExplicit{}: { + Next: stateOpen{ImplicitTxn: fsm.False}, + Action: func(args fsm.Args) error { + args.Extended.(*txnState).setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: noEvent}, + ) + return nil + }, + }, }, // Handle the errors in explicit txns. They move us to Aborted. stateOpen{ImplicitTxn: fsm.False}: { diff --git a/pkg/sql/pgwire/testdata/pgtest/as_of_system_time b/pkg/sql/pgwire/testdata/pgtest/as_of_system_time new file mode 100644 index 000000000000..457fda1fd55b --- /dev/null +++ b/pkg/sql/pgwire/testdata/pgtest/as_of_system_time @@ -0,0 +1,120 @@ +# This test relies on a CoockroachDB-specific feature, so everything +# is marked as crdb_only. + +send crdb_only +Query {"String": "SELECT pg_sleep(0.1)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":16,"DataTypeSize":1,"TypeModifier":-1,"Format":0}]} +{"Type":"DataRow","Values":[{"text":"t"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "CREATE TABLE tab(a INT8)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Make sure AOST is handled during Parse. Preparing the statement should +# fail since the table did not exist in the past. +send crdb_only +Parse {"Name": "s0", "Query": "SELECT * FROM tab AS OF SYSTEM TIME '-0.05s'"} +Sync +---- + +until crdb_only +ErrorResponse +ReadyForQuery +---- +{"Type":"ErrorResponse","Code":"42P01"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "SELECT pg_sleep(0.1)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":16,"DataTypeSize":1,"TypeModifier":-1,"Format":0}]} +{"Type":"DataRow","Values":[{"text":"t"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + + +send crdb_only +Query {"String": "INSERT INTO tab VALUES(1)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"INSERT 0 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Make sure AOST is handled consistently during Parse/Bind/Execute. This should +# succeed, but should not be able to read the data that was added to the table. +send crdb_only +Parse {"Name": "historical_stmt", "Query": "SELECT * FROM tab AS OF SYSTEM TIME '-0.05s'"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "historical_stmt"} +Execute {"Portal": "p1"} +Sync +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"SELECT 0"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Make sure AOST is handled consistently during Bind/Execute. This also should +# not be able to see the data in the table, since this is a historical read. +send crdb_only +Bind {"DestinationPortal": "p2", "PreparedStatement": "historical_stmt"} +Execute {"Portal": "p2"} +Sync +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"SELECT 0"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "SELECT pg_sleep(0.1)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":16,"DataTypeSize":1,"TypeModifier":-1,"Format":0}]} +{"Type":"DataRow","Values":[{"text":"t"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Now, Bind/Execute again should be able to see the data. +send crdb_only +Bind {"DestinationPortal": "p3", "PreparedStatement": "historical_stmt"} +Execute {"Portal": "p3"} +Sync +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/pgwire/testdata/pgtest/implicit_txn b/pkg/sql/pgwire/testdata/pgtest/implicit_txn new file mode 100644 index 000000000000..a43510fbec95 --- /dev/null +++ b/pkg/sql/pgwire/testdata/pgtest/implicit_txn @@ -0,0 +1,78 @@ +# Prepare a statement that will start an explicit transaction. +send +Parse {"Name": "begin_stmt", "Query": "BEGIN"} +Sync +---- + +# At this point, TxStatus is still "idle" since the BEGIN was not +# executed yet. +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Executing should start the explicit transaction. +send +Bind {"DestinationPortal": "p1", "PreparedStatement": "begin_stmt"} +Execute {"Portal": "p1"} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +# Preparing another BEGIN is allowed. +send +Parse {"Name": "another_begin_stmt", "Query": "BEGIN"} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +# But we can't execute the other BEGIN. +send +Bind {"DestinationPortal": "p2", "PreparedStatement": "another_begin_stmt"} +Execute {"Portal": "p2"} +Sync +---- + +# Postgres allows BEGIN inside an explicit transaction, but shows a warning. +until noncrdb_only +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Severity":"WARNING","SeverityUnlocalized":"WARNING","Code":"25001","Message":"there is already a transaction in progress","Detail":"","Hint":"","Position":0,"InternalPosition":0,"InternalQuery":"","Where":"","SchemaName":"","TableName":"","ColumnName":"","DataTypeName":"","ConstraintName":"","File":"xact.c","Line":3689,"Routine":"BeginTransactionBlock","UnknownFields":null} +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +until crdb_only +ErrorResponse +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"ErrorResponse","Code":"XXUUU"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +send +Parse {"Name": "rollback_stmt", "Query": "ROLLBACK"} +Bind {"DestinationPortal": "p3", "PreparedStatement": "rollback_stmt"} +Execute {"Portal": "p3"} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/pgwire/testdata/pgtest/pgjdbc b/pkg/sql/pgwire/testdata/pgtest/pgjdbc index 9ab57d6bf372..d7d2c4329197 100644 --- a/pkg/sql/pgwire/testdata/pgtest/pgjdbc +++ b/pkg/sql/pgwire/testdata/pgtest/pgjdbc @@ -72,3 +72,74 @@ ReadyForQuery {"Type":"CommandComplete","CommandTag":"BEGIN"} {"Type":"CommandComplete","CommandTag":"SAVEPOINT"} {"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "COMMIT"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"COMMIT"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "DROP TABLE IF EXISTS t"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "CREATE TABLE IF NOT EXISTS t(a INT8 UNIQUE)"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} + + +# Test that a simple query in the middle of the extended protocol +# causes the earlier statement to commit. Executing the prepared statement +# again will cause a constraint error. +send +Parse {"Name": "S_4", "Query": "INSERT INTO t VALUES(1)"} +Bind {"PreparedStatement": "S_4"} +Execute +Query {"String": "SELECT 1"} +Bind {"PreparedStatement": "S_4"} +Execute +Sync +---- + +until ignore=RowDescription +CommandComplete +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"INSERT 0 1"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"BindComplete"} +{"Type":"ErrorResponse","Code":"23505","ConstraintName":"t_a_key"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "SELECT * FROM t"} +---- + +until ignore=RowDescription +ReadyForQuery +---- +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/pgwire/testdata/pgtest/portals b/pkg/sql/pgwire/testdata/pgtest/portals index df7b1a176fb7..62d483514f70 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals +++ b/pkg/sql/pgwire/testdata/pgtest/portals @@ -1143,3 +1143,30 @@ ReadyForQuery {"Type":"DataRow","Values":[{"text":"3"}]} {"Type":"CommandComplete","CommandTag":"SELECT 2"} {"Type":"ReadyForQuery","TxStatus":"I"} + +# Verify that Sync deletes all named portals, even if they haven't been +# executed. (Regression test of #71665.) +send +Parse {"Name": "s16", "Query": "SELECT 1"} +Bind {"DestinationPortal": "p16", "PreparedStatement": "s16"} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Execute {"Portal": "p16"} +Sync +---- + +until +ErrorResponse +ReadyForQuery +---- +{"Type":"ErrorResponse","Code":"34000"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/tests/enum_test.go b/pkg/sql/tests/enum_test.go index f92867ce3f1c..96ecca8d4b9a 100644 --- a/pkg/sql/tests/enum_test.go +++ b/pkg/sql/tests/enum_test.go @@ -133,10 +133,13 @@ func TestEnumPlaceholderWithAsOfSystemTime(t *testing.T) { require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, q, "a")) db.Exec(t, "ALTER TYPE typ RENAME VALUE 'a' TO 'd'") db.Exec(t, "ALTER TYPE typ RENAME VALUE 'b' TO 'a'") - // The AOST does not apply to the transaction that binds 'a' to the - // placeholder. - require.Equal(t, [][]string{}, db.QueryStr(t, q, "a")) - require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, q, "d")) + // The AOST should apply to the transaction that binds the placeholder, + // since the same implicit transaction is used for binding and executing. + require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, q, "a")) + require.Equal(t, [][]string{}, db.QueryStr(t, q, "b")) + db.ExpectErr(t, "invalid input value for enum typ: \"d\"", q, "d") + require.Equal(t, [][]string{}, db.QueryStr(t, "SELECT k FROM tab WHERE v = $1", "a")) + require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, "SELECT k FROM tab WHERE v = $1", "d")) } // TestEnumDropValueCheckConstraint tests that check constraints containing diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 7194a67496e4..1b5437f5f19d 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -499,15 +499,10 @@ func runTTLOnRange( for { // Step 1. Fetch some rows we want to delete using a historical // SELECT query. - var expiredRowsPKs []tree.Datums - - if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - var err error - start := timeutil.Now() - expiredRowsPKs, err = selectBuilder.run(ctx, ie, txn) - metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start))) - return err - }); err != nil { + start := timeutil.Now() + expiredRowsPKs, err := selectBuilder.run(ctx, ie) + metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start))) + if err != nil { return errors.Wrapf(err, "error selecting rows to delete") } metrics.RowSelections.Inc(int64(len(expiredRowsPKs))) diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder.go b/pkg/sql/ttl/ttljob/ttljob_query_builder.go index b9e9bff20aa3..cda34ea78b30 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder.go @@ -150,13 +150,16 @@ func (b *selectQueryBuilder) nextQuery() (string, []interface{}) { } func (b *selectQueryBuilder) run( - ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, + ctx context.Context, ie *sql.InternalExecutor, ) ([]tree.Datums, error) { q, args := b.nextQuery() + // Use a nil txn so that the AOST clause is handled correctly. Currently, + // the internal executor will treat a passed-in txn as an explicit txn, so + // the AOST clause on the SELECT query would not be interpreted correctly. ret, err := ie.QueryBuffered( ctx, "ttl_scanner", - txn, + nil, /* txn */ q, args..., ) diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index f70d03553481..dd5f7977f103 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -286,6 +286,7 @@ func (ts *txnState) setHistoricalTimestamp( if err := ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp); err != nil { return err } + ts.sqlTimestamp = historicalTimestamp.GoTime() ts.isHistorical = true return nil } diff --git a/pkg/sql/txnstatetransitions_diagram.gv b/pkg/sql/txnstatetransitions_diagram.gv index 6830d39b59d2..64e31bc7d849 100644 --- a/pkg/sql/txnstatetransitions_diagram.gv +++ b/pkg/sql/txnstatetransitions_diagram.gv @@ -47,4 +47,5 @@ digraph finite_state_machine { "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] + "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:false}" [label = "TxnUpgradeToExplicit{}"] } diff --git a/pkg/sql/txnstatetransitions_report.txt b/pkg/sql/txnstatetransitions_report.txt index 7644f9ba6be9..e04d7d82582b 100644 --- a/pkg/sql/txnstatetransitions_report.txt +++ b/pkg/sql/txnstatetransitions_report.txt @@ -16,6 +16,7 @@ Aborted{} TxnReleased{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} CommitWait{} handled events: NonRetriableErr{IsCommit:false} @@ -32,6 +33,7 @@ CommitWait{} TxnRestart{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} NoTxn{} handled events: NonRetriableErr{IsCommit:false} @@ -48,6 +50,7 @@ NoTxn{} TxnFinishCommitted{} TxnReleased{} TxnRestart{} + TxnUpgradeToExplicit{} Open{ImplicitTxn:false} handled events: NonRetriableErr{IsCommit:false} @@ -64,6 +67,7 @@ Open{ImplicitTxn:false} SavepointRollback{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} Open{ImplicitTxn:true} handled events: NonRetriableErr{IsCommit:false} @@ -74,6 +78,7 @@ Open{ImplicitTxn:true} RetriableErr{CanAutoRetry:true, IsCommit:true} TxnFinishAborted{} TxnFinishCommitted{} + TxnUpgradeToExplicit{} missing events: SavepointRollback{} TxnReleased{}