From 9fdb39b49e95320561ea58b8cac087afec62014d Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Thu, 17 Feb 2022 10:55:29 -0500 Subject: [PATCH 1/3] sql: use an implicit txn during the extended protocol In the Postgres wire protocol, entering the extended protocol also starts an implicit transaction. This transaction is committed when the Sync message is received. Previously, we were starting 3 separate "implicit" transactions for the Prepare, Bind, and ExecPortal commands. (Implicit is in scare quotes because they were created entirely ad-hoc.) Now, the same implicit txn is used for the duration of the extended protocol. If BEGIN is executed as a prepared statement, then the implicit trasnaction is upgraded to an explicit one. This behavior matches Postgres. Release note (bug fix): Fixed a bug where the different stages of preparing, binding, and executing a prepared statement would use different implicit transactions. Now these stages all share the same implicit transaction. --- pkg/sql/conn_executor.go | 26 ++- pkg/sql/conn_executor_exec.go | 178 ++++++++++-------- pkg/sql/conn_executor_prepare.go | 46 ++--- pkg/sql/conn_fsm.go | 30 ++- .../pgwire/testdata/pgtest/as_of_system_time | 120 ++++++++++++ pkg/sql/pgwire/testdata/pgtest/implicit_txn | 78 ++++++++ pkg/sql/pgwire/testdata/pgtest/portals | 27 +++ pkg/sql/tests/enum_test.go | 11 +- pkg/sql/ttl/ttljob/ttljob.go | 13 +- pkg/sql/ttl/ttljob/ttljob_query_builder.go | 7 +- pkg/sql/txn_state.go | 1 + pkg/sql/txnstatetransitions_diagram.gv | 1 + pkg/sql/txnstatetransitions_report.txt | 5 + 13 files changed, 408 insertions(+), 135 deletions(-) create mode 100644 pkg/sql/pgwire/testdata/pgtest/as_of_system_time create mode 100644 pkg/sql/pgwire/testdata/pgtest/implicit_txn diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 889f9d30c0cc..c1ca02a1d8fb 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1951,13 +1951,11 @@ func (ex *connExecutor) execCmd() error { // inside a BEGIN/COMMIT transaction block (“close” meaning to commit if no // error, or roll back if error)." // In other words, Sync is treated as commit for implicit transactions. - if op, ok := ex.machine.CurState().(stateOpen); ok { - if op.ImplicitTxn.Get() { - // Note that the handling of ev in the case of Sync is massaged a bit - // later - Sync is special in that, if it encounters an error, that does - // *not *cause the session to ignore all commands until the next Sync. - ev, payload = ex.handleAutoCommit(ctx, &tree.CommitTransaction{}) - } + if ex.implicitTxn() { + // Note that the handling of ev in the case of Sync is massaged a bit + // later - Sync is special in that, if it encounters an error, that does + // *not *cause the session to ignore all commands until the next Sync. + ev, payload = ex.handleAutoCommit(ctx, &tree.CommitTransaction{}) } // Note that the Sync result will flush results to the network connection. res = ex.clientComm.CreateSyncResult(pos) @@ -2525,7 +2523,6 @@ func (ex *connExecutor) setTransactionModes( if err := ex.state.setHistoricalTimestamp(ctx, asOfTs); err != nil { return err } - ex.state.sqlTimestamp = asOfTs.GoTime() if rwMode == tree.UnspecifiedReadWriteMode { rwMode = tree.ReadOnly } @@ -2629,10 +2626,16 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo // statement is supposed to have a different timestamp, the evalCtx generally // shouldn't be reused across statements. func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, stmtTS time.Time) { + newTxn := txn == nil || evalCtx.Txn != txn evalCtx.TxnState = ex.getTransactionState() evalCtx.TxnReadOnly = ex.state.readOnly evalCtx.TxnImplicit = ex.implicitTxn() - evalCtx.StmtTimestamp = stmtTS + if newTxn || !ex.implicitTxn() { + // Only update the stmt timestamp if in a new txn or an explicit txn. This is because this gets + // called multiple times during an extended protocol implicit txn, but we + // want all those stages to share the same stmtTS. + evalCtx.StmtTimestamp = stmtTS + } evalCtx.TxnTimestamp = ex.state.sqlTimestamp evalCtx.Placeholders = nil evalCtx.Annotations = nil @@ -2655,7 +2658,10 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, nextMax := minTSErr.MinTimestampBound ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax) evalCtx.AsOfSystemTime.MaxTimestampBound = nextMax - } else { + } else if newTxn { + // Otherwise, only change the historical timestamps if this is a new txn. + // This is because resetPlanner can be called multiple times for the same + // txn during the extended protocol. ex.extraTxnState.descCollection.ResetMaxTimestampBound() evalCtx.AsOfSystemTime = nil } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 7e577c57bf5a..a674d2c1b623 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -521,8 +521,12 @@ func (ex *connExecutor) execStmtInOpenState( switch s := ast.(type) { case *tree.BeginTransaction: - // BEGIN is always an error when in the Open state. It's legitimate only in - // the NoTxn state. + // BEGIN is only allowed if we are in an implicit txn that was started + // in the extended protocol. + if isExtendedProtocol && os.ImplicitTxn.Get() { + ex.sessionDataStack.PushTopClone() + return eventTxnUpgradeToExplicit{}, nil, nil + } return makeErrEvent(errTransactionInProgress) case *tree.CommitTransaction: @@ -599,57 +603,8 @@ func (ex *connExecutor) execStmtInOpenState( // For regular statements (the ones that get to this point), we // don't return any event unless an error happens. - if os.ImplicitTxn.Get() { - // If AS OF SYSTEM TIME is already set, this has to be a bounded staleness - // read with nearest_only=True during a retry. - if p.extendedEvalCtx.AsOfSystemTime == nil { - asOf, err := p.isAsOf(ctx, ast) - if err != nil { - return makeErrEvent(err) - } - if asOf != nil { - p.extendedEvalCtx.AsOfSystemTime = asOf - if !asOf.BoundedStaleness { - p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime()) - if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil { - return makeErrEvent(err) - } - } - } - } else { - if !p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness || - p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() { - return makeErrEvent(errors.AssertionFailedf( - "expected bounded_staleness set with a max_timestamp_bound", - )) - } - } - } else { - // If we're in an explicit txn, we allow AOST but only if it matches with - // the transaction's timestamp. This is useful for running AOST statements - // using the InternalExecutor inside an external transaction; one might want - // to do that to force p.avoidLeasedDescriptors to be set below. - asOf, err := p.isAsOf(ctx, ast) - if err != nil { - return makeErrEvent(err) - } - if asOf != nil { - if asOf.BoundedStaleness { - return makeErrEvent( - pgerror.Newf( - pgcode.FeatureNotSupported, - "cannot use a bounded staleness query in a transaction", - ), - ) - } - if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs { - err = pgerror.Newf(pgcode.Syntax, - "inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs) - err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME") - return makeErrEvent(err) - } - p.extendedEvalCtx.AsOfSystemTime = asOf - } + if err := ex.handleAOST(ctx, ast); err != nil { + return makeErrEvent(err) } // The first order of business is to ensure proper sequencing @@ -776,6 +731,66 @@ func (ex *connExecutor) execStmtInOpenState( return nil, nil, nil } +// handleAOST gets the AsOfSystemTime clause from the statement, and sets +// the timestamps of the transaction accordingly. +func (ex *connExecutor) handleAOST(ctx context.Context, stmt tree.Statement) error { + if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { + return errors.AssertionFailedf( + "cannot handle AOST clause without a transaction", + ) + } + p := &ex.planner + asOf, err := p.isAsOf(ctx, stmt) + if err != nil { + return err + } + if asOf == nil { + return nil + } + if ex.implicitTxn() { + if p.extendedEvalCtx.AsOfSystemTime == nil { + p.extendedEvalCtx.AsOfSystemTime = asOf + if !asOf.BoundedStaleness { + p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime()) + if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil { + return err + } + } + } else if p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness { + // This has to be a bounded staleness read with nearest_only=True during + // a retry. The AOST read timestamps are expected to differ. + if p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() { + return errors.AssertionFailedf( + "expected bounded_staleness set with a max_timestamp_bound", + ) + } + } else if *p.extendedEvalCtx.AsOfSystemTime != *asOf { + return errors.AssertionFailedf( + "cannot specify AS OF SYSTEM TIME with different timestamps", + ) + } + } else { + // If we're in an explicit txn, we allow AOST but only if it matches with + // the transaction's timestamp. This is useful for running AOST statements + // using the InternalExecutor inside an external transaction; one might want + // to do that to force p.avoidLeasedDescriptors to be set below. + if asOf.BoundedStaleness { + return pgerror.Newf( + pgcode.FeatureNotSupported, + "cannot use a bounded staleness query in a transaction", + ) + } + if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs { + err = pgerror.Newf(pgcode.Syntax, + "inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs) + err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME") + return err + } + p.extendedEvalCtx.AsOfSystemTime = asOf + } + return nil +} + func formatWithPlaceholders(ast tree.Statement, evalCtx *tree.EvalContext) string { var fmtCtx *tree.FmtCtx fmtFlags := tree.FmtSimple @@ -1463,16 +1478,7 @@ func (ex *connExecutor) beginTransactionTimestampsAndReadMode( ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) p := &ex.planner - // NB: this use of p.txn is totally bogus. The planner's txn should - // definitely be finalized at this point. We preserve it here because we - // need to make sure that the planner's txn is not made to be nil in the - // case of an error below. The planner's txn is never written to nil at - // any other point after the first prepare or exec has been run. We abuse - // this transaction in bind and some other contexts for resolving types and - // oids. Avoiding set this to nil side-steps a nil pointer panic but is still - // awful. Instead we ought to clear the planner state when we clear the reset - // the connExecutor in finishTxn. - ex.resetPlanner(ctx, p, p.txn, now) + ex.resetPlanner(ctx, p, nil, now) asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause) if err != nil { return 0, time.Time{}, nil, err @@ -1527,25 +1533,39 @@ func (ex *connExecutor) execStmtInNoTxnState( *tree.RollbackTransaction, *tree.SetTransaction, *tree.Savepoint: return ex.makeErrEvent(errNoTransactionInProgress, ast) default: - // NB: Implicit transactions are created with the session's default - // historical timestamp even though the statement itself might contain - // an AOST clause. In these cases the clause is evaluated and applied - // execStmtInOpenState. - noBeginStmt := (*tree.BeginTransaction)(nil) - mode, sqlTs, historicalTs, err := ex.beginTransactionTimestampsAndReadMode(ctx, noBeginStmt) - if err != nil { - return ex.makeErrEvent(err, s) - } - return eventStartImplicitTxn, - makeEventTxnStartPayload( - ex.txnPriorityWithSessionDefault(tree.UnspecifiedUserPriority), - mode, - sqlTs, - historicalTs, - ex.transitionCtx) + return ex.beginImplicitTxn(ctx, ast) } } +// beginImplicitTxn starts an implicit transaction. The fsm.Event that is +// returned does not cause the state machine to advance, so the same command +// will be executed again, but with an implicit transaction. +// Implicit transactions are created with the session's default +// historical timestamp even though the statement itself might contain +// an AOST clause. In these cases the clause is evaluated and applied +// when the command is executed again. +func (ex *connExecutor) beginImplicitTxn( + ctx context.Context, ast tree.Statement, +) (fsm.Event, fsm.EventPayload) { + // NB: Implicit transactions are created with the session's default + // historical timestamp even though the statement itself might contain + // an AOST clause. In these cases the clause is evaluated and applied + // when the command is evaluated again. + noBeginStmt := (*tree.BeginTransaction)(nil) + mode, sqlTs, historicalTs, err := ex.beginTransactionTimestampsAndReadMode(ctx, noBeginStmt) + if err != nil { + return ex.makeErrEvent(err, ast) + } + return eventStartImplicitTxn, + makeEventTxnStartPayload( + ex.txnPriorityWithSessionDefault(tree.UnspecifiedUserPriority), + mode, + sqlTs, + historicalTs, + ex.transitionCtx, + ) +} + // execStmtInAbortedState executes a statement in a txn that's in state // Aborted or RestartWait. All statements result in error events except: // - COMMIT / ROLLBACK: aborts the current transaction. diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index e9c09a98262c..1233ea6e863a 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -31,13 +31,17 @@ import ( func (ex *connExecutor) execPrepare( ctx context.Context, parseCmd PrepareStmt, ) (fsm.Event, fsm.EventPayload) { - ctx, sp := tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "prepare stmt") - defer sp.Finish() - retErr := func(err error) (fsm.Event, fsm.EventPayload) { return ex.makeErrEvent(err, parseCmd.AST) } + if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { + return ex.beginImplicitTxn(ctx, parseCmd.AST) + } + + ctx, sp := tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "prepare stmt") + defer sp.Finish() + // The anonymous statement can be overwritten. if parseCmd.Name != "" { if _, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[parseCmd.Name]; ok { @@ -173,7 +177,7 @@ func (ex *connExecutor) prepare( // instrumented the planner to collect execution statistics, and // resetting the planner here would break the assumptions of the // instrumentation. - ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */) + ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime()) } if placeholderHints == nil { @@ -265,19 +269,9 @@ func (ex *connExecutor) populatePrepared( return 0, err } p.extendedEvalCtx.PrepareOnly = true - - asOf, err := p.isAsOf(ctx, stmt.AST) - if err != nil { + if err := ex.handleAOST(ctx, p.stmt.AST); err != nil { return 0, err } - if asOf != nil { - p.extendedEvalCtx.AsOfSystemTime = asOf - if !asOf.BoundedStaleness { - if err := txn.SetFixedTimestamp(ctx, asOf.Timestamp); err != nil { - return 0, err - } - } - } // PREPARE has a limited subset of statements it can be run with. Postgres // only allows SELECT, INSERT, UPDATE, DELETE and VALUES statements to be @@ -300,11 +294,21 @@ func (ex *connExecutor) populatePrepared( func (ex *connExecutor) execBind( ctx context.Context, bindCmd BindStmt, ) (fsm.Event, fsm.EventPayload) { - retErr := func(err error) (fsm.Event, fsm.EventPayload) { return eventNonRetriableErr{IsCommit: fsm.False}, eventNonRetriableErrPayload{err: err} } + ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[bindCmd.PreparedStatementName] + if !ok { + return retErr(pgerror.Newf( + pgcode.InvalidSQLStatementName, + "unknown prepared statement %q", bindCmd.PreparedStatementName)) + } + + if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { + return ex.beginImplicitTxn(ctx, ps.AST) + } + portalName := bindCmd.PortalName // The unnamed portal can be freely overwritten. if portalName != "" { @@ -317,13 +321,6 @@ func (ex *connExecutor) execBind( ex.deletePortal(ctx, "") } - ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[bindCmd.PreparedStatementName] - if !ok { - return retErr(pgerror.Newf( - pgcode.InvalidSQLStatementName, - "unknown prepared statement %q", bindCmd.PreparedStatementName)) - } - numQArgs := uint16(len(ps.InferredTypes)) // Decode the arguments, except for internal queries for which we just verify @@ -388,6 +385,9 @@ func (ex *connExecutor) execBind( ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) p := &ex.planner ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */) + if err := ex.handleAOST(ctx, ps.AST); err != nil { + return err + } } for i, arg := range bindCmd.Args { diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index ba0c2bc820fc..07a4bdb72d17 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -128,6 +128,8 @@ func makeEventTxnStartPayload( } } +type eventTxnUpgradeToExplicit struct{} + type eventTxnFinishCommitted struct{} type eventTxnFinishAborted struct{} @@ -191,14 +193,15 @@ type payloadWithError interface { errorCause() error } -func (eventTxnStart) Event() {} -func (eventTxnFinishCommitted) Event() {} -func (eventTxnFinishAborted) Event() {} -func (eventSavepointRollback) Event() {} -func (eventNonRetriableErr) Event() {} -func (eventRetriableErr) Event() {} -func (eventTxnRestart) Event() {} -func (eventTxnReleased) Event() {} +func (eventTxnStart) Event() {} +func (eventTxnFinishCommitted) Event() {} +func (eventTxnFinishAborted) Event() {} +func (eventSavepointRollback) Event() {} +func (eventNonRetriableErr) Event() {} +func (eventRetriableErr) Event() {} +func (eventTxnRestart) Event() {} +func (eventTxnReleased) Event() {} +func (eventTxnUpgradeToExplicit) Event() {} // Other constants. @@ -293,6 +296,17 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateNoTxn{}, Action: cleanupAndFinishOnError, }, + eventTxnUpgradeToExplicit{}: { + Next: stateOpen{ImplicitTxn: fsm.False}, + Action: func(args fsm.Args) error { + args.Extended.(*txnState).setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: noEvent}, + ) + return nil + }, + }, }, // Handle the errors in explicit txns. They move us to Aborted. stateOpen{ImplicitTxn: fsm.False}: { diff --git a/pkg/sql/pgwire/testdata/pgtest/as_of_system_time b/pkg/sql/pgwire/testdata/pgtest/as_of_system_time new file mode 100644 index 000000000000..457fda1fd55b --- /dev/null +++ b/pkg/sql/pgwire/testdata/pgtest/as_of_system_time @@ -0,0 +1,120 @@ +# This test relies on a CoockroachDB-specific feature, so everything +# is marked as crdb_only. + +send crdb_only +Query {"String": "SELECT pg_sleep(0.1)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":16,"DataTypeSize":1,"TypeModifier":-1,"Format":0}]} +{"Type":"DataRow","Values":[{"text":"t"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "CREATE TABLE tab(a INT8)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Make sure AOST is handled during Parse. Preparing the statement should +# fail since the table did not exist in the past. +send crdb_only +Parse {"Name": "s0", "Query": "SELECT * FROM tab AS OF SYSTEM TIME '-0.05s'"} +Sync +---- + +until crdb_only +ErrorResponse +ReadyForQuery +---- +{"Type":"ErrorResponse","Code":"42P01"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "SELECT pg_sleep(0.1)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":16,"DataTypeSize":1,"TypeModifier":-1,"Format":0}]} +{"Type":"DataRow","Values":[{"text":"t"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + + +send crdb_only +Query {"String": "INSERT INTO tab VALUES(1)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"INSERT 0 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Make sure AOST is handled consistently during Parse/Bind/Execute. This should +# succeed, but should not be able to read the data that was added to the table. +send crdb_only +Parse {"Name": "historical_stmt", "Query": "SELECT * FROM tab AS OF SYSTEM TIME '-0.05s'"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "historical_stmt"} +Execute {"Portal": "p1"} +Sync +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"SELECT 0"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Make sure AOST is handled consistently during Bind/Execute. This also should +# not be able to see the data in the table, since this is a historical read. +send crdb_only +Bind {"DestinationPortal": "p2", "PreparedStatement": "historical_stmt"} +Execute {"Portal": "p2"} +Sync +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"SELECT 0"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "SELECT pg_sleep(0.1)"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":16,"DataTypeSize":1,"TypeModifier":-1,"Format":0}]} +{"Type":"DataRow","Values":[{"text":"t"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Now, Bind/Execute again should be able to see the data. +send crdb_only +Bind {"DestinationPortal": "p3", "PreparedStatement": "historical_stmt"} +Execute {"Portal": "p3"} +Sync +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/pgwire/testdata/pgtest/implicit_txn b/pkg/sql/pgwire/testdata/pgtest/implicit_txn new file mode 100644 index 000000000000..a43510fbec95 --- /dev/null +++ b/pkg/sql/pgwire/testdata/pgtest/implicit_txn @@ -0,0 +1,78 @@ +# Prepare a statement that will start an explicit transaction. +send +Parse {"Name": "begin_stmt", "Query": "BEGIN"} +Sync +---- + +# At this point, TxStatus is still "idle" since the BEGIN was not +# executed yet. +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Executing should start the explicit transaction. +send +Bind {"DestinationPortal": "p1", "PreparedStatement": "begin_stmt"} +Execute {"Portal": "p1"} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +# Preparing another BEGIN is allowed. +send +Parse {"Name": "another_begin_stmt", "Query": "BEGIN"} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +# But we can't execute the other BEGIN. +send +Bind {"DestinationPortal": "p2", "PreparedStatement": "another_begin_stmt"} +Execute {"Portal": "p2"} +Sync +---- + +# Postgres allows BEGIN inside an explicit transaction, but shows a warning. +until noncrdb_only +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Severity":"WARNING","SeverityUnlocalized":"WARNING","Code":"25001","Message":"there is already a transaction in progress","Detail":"","Hint":"","Position":0,"InternalPosition":0,"InternalQuery":"","Where":"","SchemaName":"","TableName":"","ColumnName":"","DataTypeName":"","ConstraintName":"","File":"xact.c","Line":3689,"Routine":"BeginTransactionBlock","UnknownFields":null} +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +until crdb_only +ErrorResponse +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"ErrorResponse","Code":"XXUUU"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +send +Parse {"Name": "rollback_stmt", "Query": "ROLLBACK"} +Bind {"DestinationPortal": "p3", "PreparedStatement": "rollback_stmt"} +Execute {"Portal": "p3"} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/pgwire/testdata/pgtest/portals b/pkg/sql/pgwire/testdata/pgtest/portals index df7b1a176fb7..62d483514f70 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals +++ b/pkg/sql/pgwire/testdata/pgtest/portals @@ -1143,3 +1143,30 @@ ReadyForQuery {"Type":"DataRow","Values":[{"text":"3"}]} {"Type":"CommandComplete","CommandTag":"SELECT 2"} {"Type":"ReadyForQuery","TxStatus":"I"} + +# Verify that Sync deletes all named portals, even if they haven't been +# executed. (Regression test of #71665.) +send +Parse {"Name": "s16", "Query": "SELECT 1"} +Bind {"DestinationPortal": "p16", "PreparedStatement": "s16"} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Execute {"Portal": "p16"} +Sync +---- + +until +ErrorResponse +ReadyForQuery +---- +{"Type":"ErrorResponse","Code":"34000"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/tests/enum_test.go b/pkg/sql/tests/enum_test.go index f92867ce3f1c..96ecca8d4b9a 100644 --- a/pkg/sql/tests/enum_test.go +++ b/pkg/sql/tests/enum_test.go @@ -133,10 +133,13 @@ func TestEnumPlaceholderWithAsOfSystemTime(t *testing.T) { require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, q, "a")) db.Exec(t, "ALTER TYPE typ RENAME VALUE 'a' TO 'd'") db.Exec(t, "ALTER TYPE typ RENAME VALUE 'b' TO 'a'") - // The AOST does not apply to the transaction that binds 'a' to the - // placeholder. - require.Equal(t, [][]string{}, db.QueryStr(t, q, "a")) - require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, q, "d")) + // The AOST should apply to the transaction that binds the placeholder, + // since the same implicit transaction is used for binding and executing. + require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, q, "a")) + require.Equal(t, [][]string{}, db.QueryStr(t, q, "b")) + db.ExpectErr(t, "invalid input value for enum typ: \"d\"", q, "d") + require.Equal(t, [][]string{}, db.QueryStr(t, "SELECT k FROM tab WHERE v = $1", "a")) + require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, "SELECT k FROM tab WHERE v = $1", "d")) } // TestEnumDropValueCheckConstraint tests that check constraints containing diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 7194a67496e4..1b5437f5f19d 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -499,15 +499,10 @@ func runTTLOnRange( for { // Step 1. Fetch some rows we want to delete using a historical // SELECT query. - var expiredRowsPKs []tree.Datums - - if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - var err error - start := timeutil.Now() - expiredRowsPKs, err = selectBuilder.run(ctx, ie, txn) - metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start))) - return err - }); err != nil { + start := timeutil.Now() + expiredRowsPKs, err := selectBuilder.run(ctx, ie) + metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start))) + if err != nil { return errors.Wrapf(err, "error selecting rows to delete") } metrics.RowSelections.Inc(int64(len(expiredRowsPKs))) diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder.go b/pkg/sql/ttl/ttljob/ttljob_query_builder.go index b9e9bff20aa3..cda34ea78b30 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder.go @@ -150,13 +150,16 @@ func (b *selectQueryBuilder) nextQuery() (string, []interface{}) { } func (b *selectQueryBuilder) run( - ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, + ctx context.Context, ie *sql.InternalExecutor, ) ([]tree.Datums, error) { q, args := b.nextQuery() + // Use a nil txn so that the AOST clause is handled correctly. Currently, + // the internal executor will treat a passed-in txn as an explicit txn, so + // the AOST clause on the SELECT query would not be interpreted correctly. ret, err := ie.QueryBuffered( ctx, "ttl_scanner", - txn, + nil, /* txn */ q, args..., ) diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index f70d03553481..dd5f7977f103 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -286,6 +286,7 @@ func (ts *txnState) setHistoricalTimestamp( if err := ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp); err != nil { return err } + ts.sqlTimestamp = historicalTimestamp.GoTime() ts.isHistorical = true return nil } diff --git a/pkg/sql/txnstatetransitions_diagram.gv b/pkg/sql/txnstatetransitions_diagram.gv index 6830d39b59d2..64e31bc7d849 100644 --- a/pkg/sql/txnstatetransitions_diagram.gv +++ b/pkg/sql/txnstatetransitions_diagram.gv @@ -47,4 +47,5 @@ digraph finite_state_machine { "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] + "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:false}" [label = "TxnUpgradeToExplicit{}"] } diff --git a/pkg/sql/txnstatetransitions_report.txt b/pkg/sql/txnstatetransitions_report.txt index 7644f9ba6be9..e04d7d82582b 100644 --- a/pkg/sql/txnstatetransitions_report.txt +++ b/pkg/sql/txnstatetransitions_report.txt @@ -16,6 +16,7 @@ Aborted{} TxnReleased{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} CommitWait{} handled events: NonRetriableErr{IsCommit:false} @@ -32,6 +33,7 @@ CommitWait{} TxnRestart{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} NoTxn{} handled events: NonRetriableErr{IsCommit:false} @@ -48,6 +50,7 @@ NoTxn{} TxnFinishCommitted{} TxnReleased{} TxnRestart{} + TxnUpgradeToExplicit{} Open{ImplicitTxn:false} handled events: NonRetriableErr{IsCommit:false} @@ -64,6 +67,7 @@ Open{ImplicitTxn:false} SavepointRollback{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} Open{ImplicitTxn:true} handled events: NonRetriableErr{IsCommit:false} @@ -74,6 +78,7 @@ Open{ImplicitTxn:true} RetriableErr{CanAutoRetry:true, IsCommit:true} TxnFinishAborted{} TxnFinishCommitted{} + TxnUpgradeToExplicit{} missing events: SavepointRollback{} TxnReleased{} From 946308d2208243616d59494cb0c09f1e608b378c Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Wed, 16 Feb 2022 16:40:28 -0500 Subject: [PATCH 2/3] pgwire: add test that verifies behavior of simple queries Release note: None --- pkg/sql/as_of_test.go | 36 ++++++++++++++ pkg/sql/pgwire/testdata/pgtest/pgjdbc | 71 +++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/pkg/sql/as_of_test.go b/pkg/sql/as_of_test.go index b14cf7218aa4..cd7880efb4e4 100644 --- a/pkg/sql/as_of_test.go +++ b/pkg/sql/as_of_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -413,3 +414,38 @@ func TestShowTraceAsOfTime(t *testing.T) { t.Fatalf("expected to find one matching row, got %v", i) } } + +func TestAsOfResolveEnum(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true}) + defer srv.Stopper().Stop(context.Background()) + defer db.Close() + + runner := sqlutils.MakeSQLRunner(db) + var tsBeforeTypExists string + runner.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsBeforeTypExists) + runner.Exec(t, "CREATE TYPE typ AS ENUM('hi', 'hello')") + + // Use a prepared statement. + runner.ExpectErr( + t, + "type with ID [0-9]+ does not exist", + fmt.Sprintf( + "SELECT $1::typ FROM generate_series(1,1) AS OF SYSTEM TIME %s", + tsBeforeTypExists, + ), + "hi", + ) + + // Use a simple query. + runner.ExpectErr( + t, + "type \"typ\" does not exist", + fmt.Sprintf( + "SELECT 'hi'::typ FROM generate_series(1,1) AS OF SYSTEM TIME %s", + tsBeforeTypExists, + ), + ) +} diff --git a/pkg/sql/pgwire/testdata/pgtest/pgjdbc b/pkg/sql/pgwire/testdata/pgtest/pgjdbc index 9ab57d6bf372..d7d2c4329197 100644 --- a/pkg/sql/pgwire/testdata/pgtest/pgjdbc +++ b/pkg/sql/pgwire/testdata/pgtest/pgjdbc @@ -72,3 +72,74 @@ ReadyForQuery {"Type":"CommandComplete","CommandTag":"BEGIN"} {"Type":"CommandComplete","CommandTag":"SAVEPOINT"} {"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "COMMIT"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"COMMIT"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "DROP TABLE IF EXISTS t"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "CREATE TABLE IF NOT EXISTS t(a INT8 UNIQUE)"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} + + +# Test that a simple query in the middle of the extended protocol +# causes the earlier statement to commit. Executing the prepared statement +# again will cause a constraint error. +send +Parse {"Name": "S_4", "Query": "INSERT INTO t VALUES(1)"} +Bind {"PreparedStatement": "S_4"} +Execute +Query {"String": "SELECT 1"} +Bind {"PreparedStatement": "S_4"} +Execute +Sync +---- + +until ignore=RowDescription +CommandComplete +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"CommandComplete","CommandTag":"INSERT 0 1"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"BindComplete"} +{"Type":"ErrorResponse","Code":"23505","ConstraintName":"t_a_key"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "SELECT * FROM t"} +---- + +until ignore=RowDescription +ReadyForQuery +---- +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"I"} From 4d18a12fb68cc7bce3df28f1ce60481b9eb9f08f Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Sun, 20 Feb 2022 17:15:17 -0500 Subject: [PATCH 3/3] sql: cleanup dead code for implicit txns in prepare/bind Release note: None --- pkg/sql/conn_executor_prepare.go | 73 ++++++++------------------------ 1 file changed, 18 insertions(+), 55 deletions(-) diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index 1233ea6e863a..700f6fb834e2 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -35,6 +35,9 @@ func (ex *connExecutor) execPrepare( return ex.makeErrEvent(err, parseCmd.AST) } + // Preparing needs a transaction because it needs to retrieve db/table + // descriptors for type checking. This implicit txn will be open until + // the Sync message is handled. if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { return ex.beginImplicitTxn(ctx, parseCmd.AST) } @@ -157,18 +160,8 @@ func (ex *connExecutor) prepare( return prepared, nil } - // Preparing needs a transaction because it needs to retrieve db/table - // descriptors for type checking. If we already have an open transaction for - // this planner, use it. Using the user's transaction here is critical for - // proper deadlock detection. At the time of writing, it is the case that any - // data read on behalf of this transaction is not cached for use in other - // transactions. It's critical that this fact remain true but nothing really - // enforces it. If we create a new transaction (newTxn is true), we'll need to - // finish it before we return. - var flags planFlags prepare := func(ctx context.Context, txn *kv.Txn) (err error) { - ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) p := &ex.planner if origin != PreparedStatementOriginSQL { // If the PREPARE command was issued as a SQL statement, then we @@ -177,6 +170,7 @@ func (ex *connExecutor) prepare( // instrumented the planner to collect execution statistics, and // resetting the planner here would break the assumptions of the // instrumentation. + ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime()) } @@ -305,6 +299,12 @@ func (ex *connExecutor) execBind( "unknown prepared statement %q", bindCmd.PreparedStatementName)) } + // We need to make sure type resolution happens within a transaction. + // Otherwise, for user-defined types we won't take the correct leases and + // will get back out of date type information. + // This code path is only used by the wire-level Bind command. The + // SQL EXECUTE command (which also needs to bind and resolve types) is + // handled separately in conn_executor_exec. if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { return ex.beginImplicitTxn(ctx, ps.AST) } @@ -372,22 +372,12 @@ func (ex *connExecutor) execBind( "expected %d arguments, got %d", numQArgs, len(bindCmd.Args))) } - // We need to make sure type resolution happens within a transaction. - // Otherwise, for user-defined types we won't take the correct leases and - // will get back out of date type information. However, if there are no - // user-defined types to resolve, then a transaction is not needed, so - // txn is allowed to be nil. - // This code path is only used by the wire-level Bind command. The - // SQL EXECUTE command (which also needs to bind and resolve types) is - // handled separately in conn_executor_exec. resolve := func(ctx context.Context, txn *kv.Txn) (err error) { - if txn != nil { - ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) - p := &ex.planner - ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */) - if err := ex.handleAOST(ctx, ps.AST); err != nil { - return err - } + ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) + p := &ex.planner + ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */) + if err := ex.handleAOST(ctx, ps.AST); err != nil { + return err } for i, arg := range bindCmd.Args { @@ -420,36 +410,9 @@ func (ex *connExecutor) execBind( return nil } - needsTxn := false - for i, arg := range bindCmd.Args { - t := ps.InferredTypes[i] - // User-defined types and OID types both need a transaction. - if typ, ok := types.OidToType[t]; arg != nil && (!ok || typ.Family() == types.OidFamily) { - needsTxn = true - break - } - } - - if !needsTxn { - if err := resolve(ctx, nil /* txn */); err != nil { - return retErr(err) - } - } else if txn := ex.state.mu.txn; txn != nil && txn.IsOpen() { - // Use the existing transaction. - if err := resolve(ctx, txn); err != nil { - return retErr(err) - } - } else { - // Use a new transaction. This will handle retriable errors here rather - // than bubbling them up to the connExecutor state machine. - if err := ex.server.cfg.DB.Txn(ctx, resolve); err != nil { - return retErr(err) - } - // Bind with an implicit transaction will end up creating - // a new transaction. Once this transaction is complete, - // we can safely release the leases, otherwise we will - // incorrectly hold leases for later operations. - ex.extraTxnState.descCollection.ReleaseAll(ctx) + // Use the existing transaction. + if err := resolve(ctx, ex.state.mu.txn); err != nil { + return retErr(err) } }