Skip to content

Commit

Permalink
Merge #76792 #76928
Browse files Browse the repository at this point in the history
76792: sql: use an implicit txn during the extended protocol r=otan a=rafiss

fixes #71665

In the Postgres wire protocol, entering the extended protocol also
starts an implicit transaction. This transaction is committed when the
Sync message is received.

Previously, we were starting 3 separate "implicit" transactions for the
Prepare, Bind, and ExecPortal commands. (Implicit is in scare quotes
because they were created entirely ad-hoc.)

Now, the same implicit txn is used for the duration of the extended
protocol.

If BEGIN is executed as a prepared statement, then the implicit
trasnaction is upgraded to an explicit one. This behavior matches
Postgres.

Release note (bug fix): Fixed a bug where the different stages of
preparing, binding, and executing a prepared statement would use
different implicit transactions. Now these stages all share the same
implicit transaction.

76928: clusterversion,storage: revert pebblev2 table format bump r=erikgrinaker a=nicktrav

Partial revert of 9ecc619.

Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Nick Travers <[email protected]>
  • Loading branch information
3 people committed Feb 23, 2022
3 parents 715520b + 4d18a12 + 70ee669 commit 75f0936
Show file tree
Hide file tree
Showing 19 changed files with 537 additions and 221 deletions.
20 changes: 2 additions & 18 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,6 @@ const (
// EnableLeaseHolderRemoval enables removing a leaseholder and transferring the lease
// during joint configuration, including to VOTER_INCOMING replicas.
EnableLeaseHolderRemoval
// EnsurePebbleFormatVersionRangeKeys is the first step of a two-part
// migration that bumps Pebble's format major version to a version that
// supports range keys.
EnsurePebbleFormatVersionRangeKeys
// EnablePebbleFormatVersionRangeKeys is the second of a two-part migration
// and is used as the feature gate for use of range keys. Any node at this
// version is guaranteed to reside in a cluster where all nodes support range
// keys at the Pebble layer.
EnablePebbleFormatVersionRangeKeys
// BackupResolutionInJob defaults to resolving backup destinations during the
// execution of a backup job rather than during planning.
BackupResolutionInJob
Expand Down Expand Up @@ -483,15 +474,8 @@ var versionsSingleton = keyedVersions{
Key: EnableLeaseHolderRemoval,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 70},
},
{
Key: EnsurePebbleFormatVersionRangeKeys,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 72},
},
{
Key: EnablePebbleFormatVersionRangeKeys,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 74},
},

// Internal: 72 was reverted (EnsurePebbleFormatVersionRangeKeys)
// Internal: 74 was reverted (EnablePebbleFormatVersionRangeKeys)
{
Key: BackupResolutionInJob,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 76},
Expand Down
10 changes: 4 additions & 6 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions pkg/sql/as_of_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
Expand Down Expand Up @@ -413,3 +414,38 @@ func TestShowTraceAsOfTime(t *testing.T) {
t.Fatalf("expected to find one matching row, got %v", i)
}
}

func TestAsOfResolveEnum(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true})
defer srv.Stopper().Stop(context.Background())
defer db.Close()

runner := sqlutils.MakeSQLRunner(db)
var tsBeforeTypExists string
runner.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsBeforeTypExists)
runner.Exec(t, "CREATE TYPE typ AS ENUM('hi', 'hello')")

// Use a prepared statement.
runner.ExpectErr(
t,
"type with ID [0-9]+ does not exist",
fmt.Sprintf(
"SELECT $1::typ FROM generate_series(1,1) AS OF SYSTEM TIME %s",
tsBeforeTypExists,
),
"hi",
)

// Use a simple query.
runner.ExpectErr(
t,
"type \"typ\" does not exist",
fmt.Sprintf(
"SELECT 'hi'::typ FROM generate_series(1,1) AS OF SYSTEM TIME %s",
tsBeforeTypExists,
),
)
}
26 changes: 16 additions & 10 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,13 +1951,11 @@ func (ex *connExecutor) execCmd() error {
// inside a BEGIN/COMMIT transaction block (“close” meaning to commit if no
// error, or roll back if error)."
// In other words, Sync is treated as commit for implicit transactions.
if op, ok := ex.machine.CurState().(stateOpen); ok {
if op.ImplicitTxn.Get() {
// Note that the handling of ev in the case of Sync is massaged a bit
// later - Sync is special in that, if it encounters an error, that does
// *not *cause the session to ignore all commands until the next Sync.
ev, payload = ex.handleAutoCommit(ctx, &tree.CommitTransaction{})
}
if ex.implicitTxn() {
// Note that the handling of ev in the case of Sync is massaged a bit
// later - Sync is special in that, if it encounters an error, that does
// *not *cause the session to ignore all commands until the next Sync.
ev, payload = ex.handleAutoCommit(ctx, &tree.CommitTransaction{})
}
// Note that the Sync result will flush results to the network connection.
res = ex.clientComm.CreateSyncResult(pos)
Expand Down Expand Up @@ -2525,7 +2523,6 @@ func (ex *connExecutor) setTransactionModes(
if err := ex.state.setHistoricalTimestamp(ctx, asOfTs); err != nil {
return err
}
ex.state.sqlTimestamp = asOfTs.GoTime()
if rwMode == tree.UnspecifiedReadWriteMode {
rwMode = tree.ReadOnly
}
Expand Down Expand Up @@ -2629,10 +2626,16 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
// statement is supposed to have a different timestamp, the evalCtx generally
// shouldn't be reused across statements.
func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, stmtTS time.Time) {
newTxn := txn == nil || evalCtx.Txn != txn
evalCtx.TxnState = ex.getTransactionState()
evalCtx.TxnReadOnly = ex.state.readOnly
evalCtx.TxnImplicit = ex.implicitTxn()
evalCtx.StmtTimestamp = stmtTS
if newTxn || !ex.implicitTxn() {
// Only update the stmt timestamp if in a new txn or an explicit txn. This is because this gets
// called multiple times during an extended protocol implicit txn, but we
// want all those stages to share the same stmtTS.
evalCtx.StmtTimestamp = stmtTS
}
evalCtx.TxnTimestamp = ex.state.sqlTimestamp
evalCtx.Placeholders = nil
evalCtx.Annotations = nil
Expand All @@ -2655,7 +2658,10 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn,
nextMax := minTSErr.MinTimestampBound
ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax)
evalCtx.AsOfSystemTime.MaxTimestampBound = nextMax
} else {
} else if newTxn {
// Otherwise, only change the historical timestamps if this is a new txn.
// This is because resetPlanner can be called multiple times for the same
// txn during the extended protocol.
ex.extraTxnState.descCollection.ResetMaxTimestampBound()
evalCtx.AsOfSystemTime = nil
}
Expand Down
178 changes: 99 additions & 79 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,12 @@ func (ex *connExecutor) execStmtInOpenState(

switch s := ast.(type) {
case *tree.BeginTransaction:
// BEGIN is always an error when in the Open state. It's legitimate only in
// the NoTxn state.
// BEGIN is only allowed if we are in an implicit txn that was started
// in the extended protocol.
if isExtendedProtocol && os.ImplicitTxn.Get() {
ex.sessionDataStack.PushTopClone()
return eventTxnUpgradeToExplicit{}, nil, nil
}
return makeErrEvent(errTransactionInProgress)

case *tree.CommitTransaction:
Expand Down Expand Up @@ -599,57 +603,8 @@ func (ex *connExecutor) execStmtInOpenState(
// For regular statements (the ones that get to this point), we
// don't return any event unless an error happens.

if os.ImplicitTxn.Get() {
// If AS OF SYSTEM TIME is already set, this has to be a bounded staleness
// read with nearest_only=True during a retry.
if p.extendedEvalCtx.AsOfSystemTime == nil {
asOf, err := p.isAsOf(ctx, ast)
if err != nil {
return makeErrEvent(err)
}
if asOf != nil {
p.extendedEvalCtx.AsOfSystemTime = asOf
if !asOf.BoundedStaleness {
p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime())
if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil {
return makeErrEvent(err)
}
}
}
} else {
if !p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness ||
p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() {
return makeErrEvent(errors.AssertionFailedf(
"expected bounded_staleness set with a max_timestamp_bound",
))
}
}
} else {
// If we're in an explicit txn, we allow AOST but only if it matches with
// the transaction's timestamp. This is useful for running AOST statements
// using the InternalExecutor inside an external transaction; one might want
// to do that to force p.avoidLeasedDescriptors to be set below.
asOf, err := p.isAsOf(ctx, ast)
if err != nil {
return makeErrEvent(err)
}
if asOf != nil {
if asOf.BoundedStaleness {
return makeErrEvent(
pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot use a bounded staleness query in a transaction",
),
)
}
if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs {
err = pgerror.Newf(pgcode.Syntax,
"inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs)
err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME")
return makeErrEvent(err)
}
p.extendedEvalCtx.AsOfSystemTime = asOf
}
if err := ex.handleAOST(ctx, ast); err != nil {
return makeErrEvent(err)
}

// The first order of business is to ensure proper sequencing
Expand Down Expand Up @@ -776,6 +731,66 @@ func (ex *connExecutor) execStmtInOpenState(
return nil, nil, nil
}

// handleAOST gets the AsOfSystemTime clause from the statement, and sets
// the timestamps of the transaction accordingly.
func (ex *connExecutor) handleAOST(ctx context.Context, stmt tree.Statement) error {
if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn {
return errors.AssertionFailedf(
"cannot handle AOST clause without a transaction",
)
}
p := &ex.planner
asOf, err := p.isAsOf(ctx, stmt)
if err != nil {
return err
}
if asOf == nil {
return nil
}
if ex.implicitTxn() {
if p.extendedEvalCtx.AsOfSystemTime == nil {
p.extendedEvalCtx.AsOfSystemTime = asOf
if !asOf.BoundedStaleness {
p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime())
if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil {
return err
}
}
} else if p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness {
// This has to be a bounded staleness read with nearest_only=True during
// a retry. The AOST read timestamps are expected to differ.
if p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() {
return errors.AssertionFailedf(
"expected bounded_staleness set with a max_timestamp_bound",
)
}
} else if *p.extendedEvalCtx.AsOfSystemTime != *asOf {
return errors.AssertionFailedf(
"cannot specify AS OF SYSTEM TIME with different timestamps",
)
}
} else {
// If we're in an explicit txn, we allow AOST but only if it matches with
// the transaction's timestamp. This is useful for running AOST statements
// using the InternalExecutor inside an external transaction; one might want
// to do that to force p.avoidLeasedDescriptors to be set below.
if asOf.BoundedStaleness {
return pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot use a bounded staleness query in a transaction",
)
}
if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs {
err = pgerror.Newf(pgcode.Syntax,
"inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs)
err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME")
return err
}
p.extendedEvalCtx.AsOfSystemTime = asOf
}
return nil
}

func formatWithPlaceholders(ast tree.Statement, evalCtx *tree.EvalContext) string {
var fmtCtx *tree.FmtCtx
fmtFlags := tree.FmtSimple
Expand Down Expand Up @@ -1463,16 +1478,7 @@ func (ex *connExecutor) beginTransactionTimestampsAndReadMode(
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
p := &ex.planner

// NB: this use of p.txn is totally bogus. The planner's txn should
// definitely be finalized at this point. We preserve it here because we
// need to make sure that the planner's txn is not made to be nil in the
// case of an error below. The planner's txn is never written to nil at
// any other point after the first prepare or exec has been run. We abuse
// this transaction in bind and some other contexts for resolving types and
// oids. Avoiding set this to nil side-steps a nil pointer panic but is still
// awful. Instead we ought to clear the planner state when we clear the reset
// the connExecutor in finishTxn.
ex.resetPlanner(ctx, p, p.txn, now)
ex.resetPlanner(ctx, p, nil, now)
asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause)
if err != nil {
return 0, time.Time{}, nil, err
Expand Down Expand Up @@ -1527,25 +1533,39 @@ func (ex *connExecutor) execStmtInNoTxnState(
*tree.RollbackTransaction, *tree.SetTransaction, *tree.Savepoint:
return ex.makeErrEvent(errNoTransactionInProgress, ast)
default:
// NB: Implicit transactions are created with the session's default
// historical timestamp even though the statement itself might contain
// an AOST clause. In these cases the clause is evaluated and applied
// execStmtInOpenState.
noBeginStmt := (*tree.BeginTransaction)(nil)
mode, sqlTs, historicalTs, err := ex.beginTransactionTimestampsAndReadMode(ctx, noBeginStmt)
if err != nil {
return ex.makeErrEvent(err, s)
}
return eventStartImplicitTxn,
makeEventTxnStartPayload(
ex.txnPriorityWithSessionDefault(tree.UnspecifiedUserPriority),
mode,
sqlTs,
historicalTs,
ex.transitionCtx)
return ex.beginImplicitTxn(ctx, ast)
}
}

// beginImplicitTxn starts an implicit transaction. The fsm.Event that is
// returned does not cause the state machine to advance, so the same command
// will be executed again, but with an implicit transaction.
// Implicit transactions are created with the session's default
// historical timestamp even though the statement itself might contain
// an AOST clause. In these cases the clause is evaluated and applied
// when the command is executed again.
func (ex *connExecutor) beginImplicitTxn(
ctx context.Context, ast tree.Statement,
) (fsm.Event, fsm.EventPayload) {
// NB: Implicit transactions are created with the session's default
// historical timestamp even though the statement itself might contain
// an AOST clause. In these cases the clause is evaluated and applied
// when the command is evaluated again.
noBeginStmt := (*tree.BeginTransaction)(nil)
mode, sqlTs, historicalTs, err := ex.beginTransactionTimestampsAndReadMode(ctx, noBeginStmt)
if err != nil {
return ex.makeErrEvent(err, ast)
}
return eventStartImplicitTxn,
makeEventTxnStartPayload(
ex.txnPriorityWithSessionDefault(tree.UnspecifiedUserPriority),
mode,
sqlTs,
historicalTs,
ex.transitionCtx,
)
}

// execStmtInAbortedState executes a statement in a txn that's in state
// Aborted or RestartWait. All statements result in error events except:
// - COMMIT / ROLLBACK: aborts the current transaction.
Expand Down
Loading

0 comments on commit 75f0936

Please sign in to comment.