Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: use an implicit txn during the extended protocol #76792

Merged
merged 3 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions pkg/sql/as_of_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
Expand Down Expand Up @@ -413,3 +414,38 @@ func TestShowTraceAsOfTime(t *testing.T) {
t.Fatalf("expected to find one matching row, got %v", i)
}
}

func TestAsOfResolveEnum(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{Insecure: true})
defer srv.Stopper().Stop(context.Background())
defer db.Close()

runner := sqlutils.MakeSQLRunner(db)
var tsBeforeTypExists string
runner.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsBeforeTypExists)
runner.Exec(t, "CREATE TYPE typ AS ENUM('hi', 'hello')")

// Use a prepared statement.
runner.ExpectErr(
t,
"type with ID [0-9]+ does not exist",
fmt.Sprintf(
"SELECT $1::typ FROM generate_series(1,1) AS OF SYSTEM TIME %s",
tsBeforeTypExists,
),
"hi",
)

// Use a simple query.
runner.ExpectErr(
t,
"type \"typ\" does not exist",
fmt.Sprintf(
"SELECT 'hi'::typ FROM generate_series(1,1) AS OF SYSTEM TIME %s",
tsBeforeTypExists,
),
)
}
26 changes: 16 additions & 10 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,13 +1951,11 @@ func (ex *connExecutor) execCmd() error {
// inside a BEGIN/COMMIT transaction block (“close” meaning to commit if no
// error, or roll back if error)."
// In other words, Sync is treated as commit for implicit transactions.
if op, ok := ex.machine.CurState().(stateOpen); ok {
if op.ImplicitTxn.Get() {
// Note that the handling of ev in the case of Sync is massaged a bit
// later - Sync is special in that, if it encounters an error, that does
// *not *cause the session to ignore all commands until the next Sync.
ev, payload = ex.handleAutoCommit(ctx, &tree.CommitTransaction{})
}
if ex.implicitTxn() {
// Note that the handling of ev in the case of Sync is massaged a bit
// later - Sync is special in that, if it encounters an error, that does
// *not *cause the session to ignore all commands until the next Sync.
ev, payload = ex.handleAutoCommit(ctx, &tree.CommitTransaction{})
}
// Note that the Sync result will flush results to the network connection.
res = ex.clientComm.CreateSyncResult(pos)
Expand Down Expand Up @@ -2525,7 +2523,6 @@ func (ex *connExecutor) setTransactionModes(
if err := ex.state.setHistoricalTimestamp(ctx, asOfTs); err != nil {
return err
}
ex.state.sqlTimestamp = asOfTs.GoTime()
if rwMode == tree.UnspecifiedReadWriteMode {
rwMode = tree.ReadOnly
}
Expand Down Expand Up @@ -2629,10 +2626,16 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
// statement is supposed to have a different timestamp, the evalCtx generally
// shouldn't be reused across statements.
func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, stmtTS time.Time) {
newTxn := txn == nil || evalCtx.Txn != txn
evalCtx.TxnState = ex.getTransactionState()
evalCtx.TxnReadOnly = ex.state.readOnly
evalCtx.TxnImplicit = ex.implicitTxn()
evalCtx.StmtTimestamp = stmtTS
if newTxn || !ex.implicitTxn() {
// Only update the stmt timestamp if in a new txn or an explicit txn. This is because this gets
// called multiple times during an extended protocol implicit txn, but we
// want all those stages to share the same stmtTS.
evalCtx.StmtTimestamp = stmtTS
}
evalCtx.TxnTimestamp = ex.state.sqlTimestamp
evalCtx.Placeholders = nil
evalCtx.Annotations = nil
Expand All @@ -2655,7 +2658,10 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn,
nextMax := minTSErr.MinTimestampBound
ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax)
evalCtx.AsOfSystemTime.MaxTimestampBound = nextMax
} else {
} else if newTxn {
// Otherwise, only change the historical timestamps if this is a new txn.
// This is because resetPlanner can be called multiple times for the same
// txn during the extended protocol.
ex.extraTxnState.descCollection.ResetMaxTimestampBound()
evalCtx.AsOfSystemTime = nil
}
Expand Down
178 changes: 99 additions & 79 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,12 @@ func (ex *connExecutor) execStmtInOpenState(

switch s := ast.(type) {
case *tree.BeginTransaction:
// BEGIN is always an error when in the Open state. It's legitimate only in
// the NoTxn state.
// BEGIN is only allowed if we are in an implicit txn that was started
// in the extended protocol.
if isExtendedProtocol && os.ImplicitTxn.Get() {
ex.sessionDataStack.PushTopClone()
return eventTxnUpgradeToExplicit{}, nil, nil
}
return makeErrEvent(errTransactionInProgress)

case *tree.CommitTransaction:
Expand Down Expand Up @@ -599,57 +603,8 @@ func (ex *connExecutor) execStmtInOpenState(
// For regular statements (the ones that get to this point), we
// don't return any event unless an error happens.

if os.ImplicitTxn.Get() {
// If AS OF SYSTEM TIME is already set, this has to be a bounded staleness
// read with nearest_only=True during a retry.
if p.extendedEvalCtx.AsOfSystemTime == nil {
asOf, err := p.isAsOf(ctx, ast)
if err != nil {
return makeErrEvent(err)
}
if asOf != nil {
p.extendedEvalCtx.AsOfSystemTime = asOf
if !asOf.BoundedStaleness {
p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime())
if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil {
return makeErrEvent(err)
}
}
}
} else {
if !p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness ||
p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() {
return makeErrEvent(errors.AssertionFailedf(
"expected bounded_staleness set with a max_timestamp_bound",
))
}
}
} else {
// If we're in an explicit txn, we allow AOST but only if it matches with
// the transaction's timestamp. This is useful for running AOST statements
// using the InternalExecutor inside an external transaction; one might want
// to do that to force p.avoidLeasedDescriptors to be set below.
asOf, err := p.isAsOf(ctx, ast)
if err != nil {
return makeErrEvent(err)
}
if asOf != nil {
if asOf.BoundedStaleness {
return makeErrEvent(
pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot use a bounded staleness query in a transaction",
),
)
}
if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs {
err = pgerror.Newf(pgcode.Syntax,
"inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs)
err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME")
return makeErrEvent(err)
}
p.extendedEvalCtx.AsOfSystemTime = asOf
}
if err := ex.handleAOST(ctx, ast); err != nil {
return makeErrEvent(err)
}

// The first order of business is to ensure proper sequencing
Expand Down Expand Up @@ -776,6 +731,66 @@ func (ex *connExecutor) execStmtInOpenState(
return nil, nil, nil
}

// handleAOST gets the AsOfSystemTime clause from the statement, and sets
// the timestamps of the transaction accordingly.
func (ex *connExecutor) handleAOST(ctx context.Context, stmt tree.Statement) error {
if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn {
return errors.AssertionFailedf(
"cannot handle AOST clause without a transaction",
)
}
p := &ex.planner
asOf, err := p.isAsOf(ctx, stmt)
if err != nil {
return err
}
if asOf == nil {
return nil
}
if ex.implicitTxn() {
if p.extendedEvalCtx.AsOfSystemTime == nil {
p.extendedEvalCtx.AsOfSystemTime = asOf
if !asOf.BoundedStaleness {
p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime())
if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil {
return err
}
}
} else if p.extendedEvalCtx.AsOfSystemTime.BoundedStaleness {
// This has to be a bounded staleness read with nearest_only=True during
// a retry. The AOST read timestamps are expected to differ.
if p.extendedEvalCtx.AsOfSystemTime.MaxTimestampBound.IsEmpty() {
return errors.AssertionFailedf(
"expected bounded_staleness set with a max_timestamp_bound",
)
}
} else if *p.extendedEvalCtx.AsOfSystemTime != *asOf {
return errors.AssertionFailedf(
"cannot specify AS OF SYSTEM TIME with different timestamps",
)
}
} else {
// If we're in an explicit txn, we allow AOST but only if it matches with
// the transaction's timestamp. This is useful for running AOST statements
// using the InternalExecutor inside an external transaction; one might want
// to do that to force p.avoidLeasedDescriptors to be set below.
if asOf.BoundedStaleness {
return pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot use a bounded staleness query in a transaction",
)
}
if readTs := ex.state.getReadTimestamp(); asOf.Timestamp != readTs {
err = pgerror.Newf(pgcode.Syntax,
"inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs)
err = errors.WithHint(err, "try SET TRANSACTION AS OF SYSTEM TIME")
return err
}
p.extendedEvalCtx.AsOfSystemTime = asOf
}
return nil
}

func formatWithPlaceholders(ast tree.Statement, evalCtx *tree.EvalContext) string {
var fmtCtx *tree.FmtCtx
fmtFlags := tree.FmtSimple
Expand Down Expand Up @@ -1463,16 +1478,7 @@ func (ex *connExecutor) beginTransactionTimestampsAndReadMode(
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
p := &ex.planner

// NB: this use of p.txn is totally bogus. The planner's txn should
// definitely be finalized at this point. We preserve it here because we
// need to make sure that the planner's txn is not made to be nil in the
// case of an error below. The planner's txn is never written to nil at
// any other point after the first prepare or exec has been run. We abuse
// this transaction in bind and some other contexts for resolving types and
// oids. Avoiding set this to nil side-steps a nil pointer panic but is still
// awful. Instead we ought to clear the planner state when we clear the reset
// the connExecutor in finishTxn.
ex.resetPlanner(ctx, p, p.txn, now)
ex.resetPlanner(ctx, p, nil, now)
asOf, err := p.EvalAsOfTimestamp(ctx, asOfClause)
if err != nil {
return 0, time.Time{}, nil, err
Expand Down Expand Up @@ -1527,25 +1533,39 @@ func (ex *connExecutor) execStmtInNoTxnState(
*tree.RollbackTransaction, *tree.SetTransaction, *tree.Savepoint:
return ex.makeErrEvent(errNoTransactionInProgress, ast)
default:
// NB: Implicit transactions are created with the session's default
// historical timestamp even though the statement itself might contain
// an AOST clause. In these cases the clause is evaluated and applied
// execStmtInOpenState.
noBeginStmt := (*tree.BeginTransaction)(nil)
mode, sqlTs, historicalTs, err := ex.beginTransactionTimestampsAndReadMode(ctx, noBeginStmt)
if err != nil {
return ex.makeErrEvent(err, s)
}
return eventStartImplicitTxn,
makeEventTxnStartPayload(
ex.txnPriorityWithSessionDefault(tree.UnspecifiedUserPriority),
mode,
sqlTs,
historicalTs,
ex.transitionCtx)
return ex.beginImplicitTxn(ctx, ast)
}
}

// beginImplicitTxn starts an implicit transaction. The fsm.Event that is
// returned does not cause the state machine to advance, so the same command
// will be executed again, but with an implicit transaction.
// Implicit transactions are created with the session's default
// historical timestamp even though the statement itself might contain
// an AOST clause. In these cases the clause is evaluated and applied
// when the command is executed again.
func (ex *connExecutor) beginImplicitTxn(
ctx context.Context, ast tree.Statement,
) (fsm.Event, fsm.EventPayload) {
// NB: Implicit transactions are created with the session's default
// historical timestamp even though the statement itself might contain
// an AOST clause. In these cases the clause is evaluated and applied
// when the command is evaluated again.
noBeginStmt := (*tree.BeginTransaction)(nil)
mode, sqlTs, historicalTs, err := ex.beginTransactionTimestampsAndReadMode(ctx, noBeginStmt)
if err != nil {
return ex.makeErrEvent(err, ast)
}
return eventStartImplicitTxn,
makeEventTxnStartPayload(
ex.txnPriorityWithSessionDefault(tree.UnspecifiedUserPriority),
mode,
sqlTs,
historicalTs,
ex.transitionCtx,
)
}

// execStmtInAbortedState executes a statement in a txn that's in state
// Aborted or RestartWait. All statements result in error events except:
// - COMMIT / ROLLBACK: aborts the current transaction.
Expand Down
Loading